Source code for airflow.contrib.hooks.pinot_hook

# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import six

from pinotdb import connect

from airflow.hooks.dbapi_hook import DbApiHook

[docs]class PinotDbApiHook(DbApiHook): """ Connect to pinot db( to issue pql """
[docs] conn_name_attr = 'pinot_broker_conn_id'
[docs] default_conn_name = 'pinot_broker_default'
[docs] supports_autocommit = False
def __init__(self, *args, **kwargs): super(PinotDbApiHook, self).__init__(*args, **kwargs)
[docs] def get_conn(self): """ Establish a connection to pinot broker through pinot dbqpi. """ conn = self.get_connection(self.pinot_broker_conn_id) pinot_broker_conn = connect(, port=conn.port, path=conn.extra_dejson.get('endpoint', '/pql'), scheme=conn.extra_dejson.get('schema', 'http') )'Get the connection to pinot ' 'broker on {host}'.format( return pinot_broker_conn
[docs] def get_uri(self): """ Get the connection uri for pinot broker. e.g: http://localhost:9000/pql """ conn = self.get_connection(getattr(self, self.conn_name_attr)) host = if conn.port is not None: host += ':{port}'.format(port=conn.port) conn_type = 'http' if not conn.conn_type else conn.conn_type endpoint = conn.extra_dejson.get('endpoint', 'pql') return '{conn_type}://{host}/{endpoint}'.format( conn_type=conn_type, host=host, endpoint=endpoint)
[docs] def get_records(self, sql): """ Executes the sql and returns a set of records. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :type sql: str """ if six.PY2: sql = sql.encode('utf-8') with self.get_conn() as cur: cur.execute(sql) return cur.fetchall()
[docs] def get_first(self, sql): """ Executes the sql and returns the first resulting row. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :type sql: str or list """ if six.PY2: sql = sql.encode('utf-8') with self.get_conn() as cur: cur.execute(sql) return cur.fetchone()
[docs] def set_autocommit(self, conn, autocommit): raise NotImplementedError()
[docs] def insert_rows(self, table, rows, target_fields=None, commit_every=1000): raise NotImplementedError()
Copy to clipboard

Was this entry helpful?