Source code for airflow.providers.databricks.sensors.databricks_partition
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#"""This module contains Databricks sensors."""from__future__importannotationsfromcollections.abcimportSequencefromdatetimeimportdatetimefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,Callablefromdatabricks.sql.utilsimportParamEscaperfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.common.sql.hooks.handlersimportfetch_all_handlerfromairflow.providers.databricks.hooks.databricks_sqlimportDatabricksSqlHookfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:try:fromairflow.sdk.definitions.contextimportContextexceptImportError:# TODO: Remove once provider drops support for Airflow 2fromairflow.utils.contextimportContext
[docs]classDatabricksPartitionSensor(BaseSensorOperator):""" Sensor to detect the presence of table partitions in Databricks. :param databricks_conn_id: Reference to :ref:`Databricks connection id<howto/connection:databricks>` (templated), defaults to DatabricksSqlHook.default_conn_name. :param sql_warehouse_name: Optional name of Databricks SQL warehouse. If not specified, ``http_path`` must be provided as described below, defaults to None :param http_path: Optional string specifying HTTP path of Databricks SQL warehouse or All Purpose cluster. If not specified, it should be either specified in the Databricks connection's extra parameters, or ``sql_warehouse_name`` must be specified. :param session_configuration: An optional dictionary of Spark session parameters. If not specified, it could be specified in the Databricks connection's extra parameters, defaults to None :param http_headers: An optional list of (k, v) pairs that will be set as HTTP headers on every request. (templated). :param catalog: An optional initial catalog to use. Requires Databricks Runtime version 9.0+ (templated), defaults to "" :param schema: An optional initial schema to use. Requires Databricks Runtime version 9.0+ (templated), defaults to "default" :param table_name: Name of the table to check partitions. :param partitions: Name of the partitions to check. Example: {"date": "2023-01-03", "name": ["abc", "def"]} :param partition_operator: Optional comparison operator for partitions, such as >=. :param handler: Handler for DbApiHook.run() to return results, defaults to fetch_all_handler :param client_parameters: Additional parameters internal to Databricks SQL connector parameters. """
super().__init__(**kwargs)def_sql_sensor(self,sql):"""Execute the supplied SQL statement using the hook object."""hook=self._get_hooksql_result=hook.run(sql,handler=self.handlerifself.do_xcom_pushelseNone,)self.log.debug("SQL result: %s",sql_result)returnsql_result@cached_propertydef_get_hook(self)->DatabricksSqlHook:"""Create and return a DatabricksSqlHook object."""returnDatabricksSqlHook(self.databricks_conn_id,self._http_path,self._sql_warehouse_name,self.session_config,self.http_headers,self.catalog,self.schema,self.caller,**self.client_parameters,**self.hook_params,)def_check_table_partitions(self)->list:"""Generate the fully qualified table name, generate partition, and call the _sql_sensor method."""ifself.table_name.split(".")[0]=="delta":_fully_qualified_table_name=self.table_nameelse:_fully_qualified_table_name=f"{self.catalog}.{self.schema}.{self.table_name}"self.log.debug("Table name generated from arguments: %s",_fully_qualified_table_name)_joiner_val=" AND "_prefix=f"SELECT 1 FROM {_fully_qualified_table_name} WHERE"_suffix=" LIMIT 1"partition_sql=self._generate_partition_query(prefix=_prefix,suffix=_suffix,joiner_val=_joiner_val,opts=self.partitions,table_name=_fully_qualified_table_name,escape_key=False,)returnself._sql_sensor(partition_sql)def_generate_partition_query(self,prefix:str,suffix:str,joiner_val:str,table_name:str,opts:dict[str,str]|None=None,escape_key:bool=False,)->str:""" Query the table for available partitions. Generates the SQL query based on the partition data types. * For a list, it prepares the SQL in the format: column_name in (value1, value2,...) * For a numeric type, it prepares the format: column_name =(or other provided operator such as >=) value * For a date type, it prepares the format: column_name =(or other provided operator such as >=) value Once the filter predicates have been generated like above, the query is prepared to be executed using the prefix and suffix supplied, which are: "SELECT 1 FROM {_fully_qualified_table_name} WHERE" and "LIMIT 1". """partition_columns=self._sql_sensor(f"DESCRIBE DETAIL {table_name}")[0][7]self.log.debug("Partition columns: %s",partition_columns)iflen(partition_columns)<1:message=f"Table {table_name} does not have partitions"raiseAirflowException(message)formatted_opts=""ifopts:output_list=[]forpartition_col,partition_valueinopts.items():ifescape_key:partition_col=self.escaper.escape_item(partition_col)ifpartition_colinpartition_columns:ifisinstance(partition_value,list):output_list.append(f"""{partition_col} in {tuple(partition_value)}""")self.log.debug("List formatting for partitions: %s",output_list)ifisinstance(partition_value,(int,float,complex)):output_list.append(f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}""")ifisinstance(partition_value,(str,datetime)):output_list.append(f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}""")else:message=f"Column {partition_col} not part of table partitions: {partition_columns}"raiseAirflowException(message)else:# Raises exception if the table does not have any partitions.message="No partitions specified to check with the sensor."raiseAirflowException(message)formatted_opts=f"{prefix}{joiner_val.join(output_list)}{suffix}"self.log.debug("Formatted options: %s",formatted_opts)returnformatted_opts.strip()
[docs]defpoke(self,context:Context)->bool:"""Check the table partitions and return the results."""partition_result=self._check_table_partitions()self.log.debug("Partition sensor result: %s",partition_result)ifpartition_result:returnTrueelse:message=f"Specified partition(s): {self.partitions} were not found."raiseAirflowException(message)