Source code for airflow.providers.apache.drill.hooks.drill
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportIterablefromtypingimportTYPE_CHECKING,Anyfromsqlalchemyimportcreate_enginefromairflow.providers.common.sql.hooks.sqlimportDbApiHookifTYPE_CHECKING:fromsqlalchemy.engineimportConnection
[docs]classDrillHook(DbApiHook):""" Interact with Apache Drill via sqlalchemy-drill. You can specify the SQLAlchemy dialect and driver that sqlalchemy-drill will employ to communicate with Drill in the extras field of your connection, e.g. ``{"dialect_driver": "drill+sadrill"}`` for communication over Drill's REST API. See the sqlalchemy-drill documentation for descriptions of the supported dialects and drivers. You can specify the default storage_plugin for the sqlalchemy-drill connection using the extras field e.g. ``{"storage_plugin": "dfs"}``. """
[docs]defget_conn(self)->Connection:"""Establish a connection to Drillbit."""conn_md=self.get_connection(self.get_conn_id())creds=f"{conn_md.login}:{conn_md.password}@"ifconn_md.loginelse""database_url=(f"{conn_md.extra_dejson.get('dialect_driver','drill+sadrill')}://{creds}"f"{conn_md.host}:{conn_md.port}/"f"{conn_md.extra_dejson.get('storage_plugin','dfs')}")if"?"indatabase_url:raiseValueError("Drill database_url should not contain a '?'")engine=create_engine(database_url)self.log.info("Connected to the Drillbit at %s:%s as user %s",conn_md.host,conn_md.port,conn_md.login)returnengine.raw_connection()
[docs]defget_uri(self)->str:""" Return the connection URI. e.g: ``drill://localhost:8047/dfs`` """conn_md=self.get_connection(self.get_conn_id())host=conn_md.hostifconn_md.portisnotNone:host+=f":{conn_md.port}"conn_type=conn_md.conn_typeor"drill"dialect_driver=conn_md.extra_dejson.get("dialect_driver","drill+sadrill")storage_plugin=conn_md.extra_dejson.get("storage_plugin","dfs")returnf"{conn_type}://{host}/{storage_plugin}?dialect_driver={dialect_driver}"
# The superclass DbApiHook's method implementation has a return type `None` and mypy fails saying# return type `NotImplementedError` is incompatible with it. Hence, we ignore the mypy error here.
[docs]defset_autocommit(# type: ignore[override]self,conn:Connection,autocommit:bool)->NotImplementedError:raiseNotImplementedError("There are no transactions in Drill.")
# The superclass DbApiHook's method implementation has a return type `None` and mypy fails saying# return type `NotImplementedError` is incompatible with it. Hence, we ignore the mypy error here.
[docs]definsert_rows(# type: ignore[override]self,table:str,rows:Iterable[tuple[str]],target_fields:Iterable[str]|None=None,commit_every:int=1000,replace:bool=False,**kwargs:Any,)->NotImplementedError:raiseNotImplementedError("There is no INSERT statement in Drill.")