# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportMapping,SequencefromtypingimportTYPE_CHECKING,Anyfromsqlalchemy.engineimportURLfromydb_dbapiimportConnectionasDbApiConnectionimportydbfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.common.sql.hooks.sqlimportDbApiHookfromairflow.providers.ydb.utils.credentialsimportget_credentials_from_connectionfromairflow.providers.ydb.utils.defaultsimportCONN_NAME_ATTR,CONN_TYPE,DEFAULT_CONN_NAME
[docs]defexecute(self,sql:str,parameters:Mapping[str,Any]|None=None):ifparametersisnotNone:raiseAirflowException("parameters is not supported yet")ifself.is_ddl:returnself.delegatee.execute_scheme(sql,parameters)returnself.delegatee.execute(sql,parameters)
conn:Connection=self.connectionhost:str|None=conn.hostifnothost:raiseValueError("YDB host must be specified")port:int=conn.portorDEFAULT_YDB_GRPCS_PORTconnection_extra:dict[str,Any]=conn.extra_dejsondatabase:str|None=connection_extra.get("database")ifnotdatabase:raiseValueError("YDB database must be specified")
endpoint=f"{host}:{port}"credentials=get_credentials_from_connection(endpoint=endpoint,database=database,connection=conn,connection_extra=connection_extra)driver_config=ydb.DriverConfig(endpoint=endpoint,database=database,query_client_settings=YDBHook._get_query_client_settings(),credentials=credentials,)driver=ydb.Driver(driver_config)# wait until driver become initializeddriver.wait(fail_fast=True,timeout=10)
[docs]defget_connection_form_widgets(cls)->dict[str,Any]:"""Return connection widgets to add to YDB connection form."""fromflask_appbuilder.fieldwidgetsimportBS3PasswordFieldWidget,BS3TextFieldWidgetfromflask_babelimportlazy_gettextfromwtformsimportBooleanField,PasswordField,StringFieldreturn{"database":StringField(lazy_gettext("Database name"),widget=BS3TextFieldWidget(),description="Required. YDB database name",),"service_account_json":PasswordField(lazy_gettext("Service account auth JSON"),widget=BS3PasswordFieldWidget(),description="Service account auth JSON. Looks like "'{"id": "...", "service_account_id": "...", "private_key": "..."}. '"Will be used instead of IAM token and SA JSON file path field if specified.",),"service_account_json_path":StringField(lazy_gettext("Service account auth JSON file path"),widget=BS3TextFieldWidget(),description="Service account auth JSON file path. File content looks like "'{"id": "...", "service_account_id": "...", "private_key": "..."}. ',),"token":PasswordField(lazy_gettext("IAM token"),widget=BS3PasswordFieldWidget(),description="User account IAM token.",),"use_vm_metadata":BooleanField(lazy_gettext("Use VM metadata"),default=False,description="Optional. Whether to use VM metadata to retrieve IAM token",),}
@classmethod
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom UI field behaviour for YDB connection."""return{"hidden_fields":["schema","extra"],"relabeling":{},"placeholders":{"host":"eg. grpcs://my_host or ydb.serverless.yandexcloud.net or lb.etn9txxxx.ydb.mdb.yandexcloud.net","login":"root","password":"my_password","database":"e.g. /local or /ru-central1/b1gtl2kg13him37quoo6/etndqstq7ne4v68n6c9b","service_account_json":'e.g. {"id": "...", "service_account_id": "...", "private_key": "..."}',"token":"t1.9....AAQ",},}
[docs]defget_conn(self)->YDBConnection:"""Establish a connection to a YDB database."""returnYDBConnection(self.database,self.ydb_session_pool,is_ddl=self.is_ddl)
[docs]defbulk_upsert(self,table_name:str,rows:Sequence,column_types:ydb.BulkUpsertColumns):""" BulkUpsert into database. More optimal way to insert rows into db. .. seealso:: https://ydb.tech/docs/en/recipes/ydb-sdk/bulk-upsert """self.get_conn().bulk_upsert(table_name,rows,column_types)