Source code for airflow.providers.databricks.operators.databricks_sql
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Databricks operators."""from__future__importannotationsimportcsvimportjsonfromtypingimportTYPE_CHECKING,Any,Sequencefromdatabricks.sql.typesimportRowfromdatabricks.sql.utilsimportParamEscaperfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.common.sql.operators.sqlimportSQLExecuteQueryOperatorfromairflow.providers.databricks.hooks.databricks_sqlimportDatabricksSqlHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classDatabricksSqlOperator(SQLExecuteQueryOperator):""" Executes SQL code in a Databricks SQL endpoint or a Databricks cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DatabricksSqlOperator` :param databricks_conn_id: Reference to :ref:`Databricks connection id<howto/connection:databricks>` (templated) :param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster. If not specified, it should be either specified in the Databricks connection's extra parameters, or ``sql_endpoint_name`` must be specified. :param sql_endpoint_name: Optional name of Databricks SQL Endpoint. If not specified, ``http_path`` must be provided as described above. :param sql: the SQL code to be executed as a single string, or a list of str (sql statements), or a reference to a template file. (templated) Template references are recognized by str ending in '.sql' :param parameters: (optional) the parameters to render the SQL query with. :param session_configuration: An optional dictionary of Spark session parameters. Defaults to None. If not specified, it could be specified in the Databricks connection's extra parameters. :param client_parameters: Additional parameters internal to Databricks SQL Connector parameters :param http_headers: An optional list of (k, v) pairs that will be set as HTTP headers on every request. (templated) :param catalog: An optional initial catalog to use. Requires DBR version 9.0+ (templated) :param schema: An optional initial schema to use. Requires DBR version 9.0+ (templated) :param output_path: optional string specifying the file to which write selected data. (templated) :param output_format: format of output data if ``output_path` is specified. Possible values are ``csv``, ``json``, ``jsonl``. Default is ``csv``. :param csv_params: parameters that will be passed to the ``csv.DictWriter`` class used to write CSV data. """
def_should_run_output_processing(self)->bool:returnself.do_xcom_pushorbool(self._output_path)def_process_output(self,results:list[Any],descriptions:list[Sequence[Sequence]|None])->list[Any]:ifnotself._output_path:returnlist(zip(descriptions,[[make_serializable(row)forrowinres]forresinresults]))ifnotself._output_format:raiseAirflowException("Output format should be specified!")# Output to a file only the result of last querylast_description=descriptions[-1]last_results=results[-1]iflast_descriptionisNone:raiseAirflowException("There is missing description present for the output file. .")field_names=[field[0]forfieldinlast_description]ifself._output_format.lower()=="csv":withopen(self._output_path,"w",newline="")asfile:ifself._csv_params:csv_params=self._csv_paramselse:csv_params={}write_header=csv_params.get("header",True)if"header"incsv_params:delcsv_params["header"]writer=csv.DictWriter(file,fieldnames=field_names,**csv_params)ifwrite_header:writer.writeheader()forrowinlast_results:writer.writerow(row.asDict())elifself._output_format.lower()=="json":withopen(self._output_path,"w")asfile:file.write(json.dumps([row.asDict()forrowinlast_results]))elifself._output_format.lower()=="jsonl":withopen(self._output_path,"w")asfile:forrowinlast_results:file.write(json.dumps(row.asDict()))file.write("\n")else:raiseAirflowException(f"Unsupported output format: '{self._output_format}'")returnlist(zip(descriptions,[[make_serializable(row)forrowinres]forresinresults]))
[docs]classDatabricksCopyIntoOperator(BaseOperator):""" Executes COPY INTO command in a Databricks SQL endpoint or a Databricks cluster. COPY INTO command is constructed from individual pieces, that are described in `documentation <https://docs.databricks.com/sql/language-manual/delta-copy-into.html>`_. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DatabricksSqlCopyIntoOperator` :param table_name: Required name of the table. (templated) :param file_location: Required location of files to import. (templated) :param file_format: Required file format. Supported formats are ``CSV``, ``JSON``, ``AVRO``, ``ORC``, ``PARQUET``, ``TEXT``, ``BINARYFILE``. :param databricks_conn_id: Reference to :ref:`Databricks connection id<howto/connection:databricks>` (templated) :param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster. If not specified, it should be either specified in the Databricks connection's extra parameters, or ``sql_endpoint_name`` must be specified. :param sql_endpoint_name: Optional name of Databricks SQL Endpoint. If not specified, ``http_path`` must be provided as described above. :param session_configuration: An optional dictionary of Spark session parameters. Defaults to None. If not specified, it could be specified in the Databricks connection's extra parameters. :param http_headers: An optional list of (k, v) pairs that will be set as HTTP headers on every request :param catalog: An optional initial catalog to use. Requires DBR version 9.0+ :param schema: An optional initial schema to use. Requires DBR version 9.0+ :param client_parameters: Additional parameters internal to Databricks SQL Connector parameters :param files: optional list of files to import. Can't be specified together with ``pattern``. (templated) :param pattern: optional regex string to match file names to import. Can't be specified together with ``files``. :param expression_list: optional string that will be used in the ``SELECT`` expression. :param credential: optional credential configuration for authentication against a source location. :param storage_credential: optional Unity Catalog storage credential for destination. :param encryption: optional encryption configuration for a specified location. :param format_options: optional dictionary with options specific for a given file format. :param force_copy: optional bool to control forcing of data import (could be also specified in ``copy_options``). :param validate: optional configuration for schema & data validation. ``True`` forces validation of all rows, integer number - validate only N first rows :param copy_options: optional dictionary of copy options. Right now only ``force`` option is supported. """
def__init__(self,*,table_name:str,file_location:str,file_format:str,databricks_conn_id:str=DatabricksSqlHook.default_conn_name,http_path:str|None=None,sql_endpoint_name:str|None=None,session_configuration=None,http_headers:list[tuple[str,str]]|None=None,client_parameters:dict[str,Any]|None=None,catalog:str|None=None,schema:str|None=None,files:list[str]|None=None,pattern:str|None=None,expression_list:str|None=None,credential:dict[str,str]|None=None,storage_credential:str|None=None,encryption:dict[str,str]|None=None,format_options:dict[str,str]|None=None,force_copy:bool|None=None,copy_options:dict[str,str]|None=None,validate:bool|int|None=None,**kwargs,)->None:"""Creates a new ``DatabricksSqlOperator``."""super().__init__(**kwargs)iffilesisnotNoneandpatternisnotNone:raiseAirflowException("Only one of 'pattern' or 'files' should be specified")iftable_name=="":raiseAirflowException("table_name shouldn't be empty")iffile_location=="":raiseAirflowException("file_location shouldn't be empty")iffile_formatnotinCOPY_INTO_APPROVED_FORMATS:raiseAirflowException(f"file_format '{file_format}' isn't supported")self._files=filesself._pattern=patternself._file_format=file_formatself.databricks_conn_id=databricks_conn_idself._http_path=http_pathself._sql_endpoint_name=sql_endpoint_nameself.session_config=session_configurationself._table_name=table_nameself._catalog=catalogself._schema=schemaself._file_location=file_locationself._expression_list=expression_listself._credential=credentialself._storage_credential=storage_credentialself._encryption=encryptionself._format_options=format_optionsself._copy_options=copy_optionsor{}self._validate=validateself._http_headers=http_headersself._client_parameters=client_parametersor{}ifforce_copyisnotNone:self._copy_options["force"]="true"ifforce_copyelse"false"def_get_hook(self)->DatabricksSqlHook:returnDatabricksSqlHook(self.databricks_conn_id,http_path=self._http_path,session_configuration=self.session_config,sql_endpoint_name=self._sql_endpoint_name,http_headers=self._http_headers,catalog=self._catalog,schema=self._schema,caller="DatabricksCopyIntoOperator",**self._client_parameters,)@staticmethoddef_generate_options(name:str,escaper:ParamEscaper,opts:dict[str,str]|None=None,escape_key:bool=True,)->str:formatted_opts=""ifoptsisnotNoneandlen(opts)>0:pairs=[f"{escaper.escape_item(k)ifescape_keyelsek} = {escaper.escape_item(v)}"fork,vinopts.items()]formatted_opts=f"{name} ({', '.join(pairs)})"returnformatted_optsdef_create_sql_query(self)->str:escaper=ParamEscaper()maybe_with=""ifself._encryptionisnotNoneorself._credentialisnotNone:maybe_encryption=""ifself._encryptionisnotNone:maybe_encryption=self._generate_options("ENCRYPTION",escaper,self._encryption,False)maybe_credential=""ifself._credentialisnotNone:maybe_credential=self._generate_options("CREDENTIAL",escaper,self._credential,False)maybe_with=f" WITH ({maybe_credential}{maybe_encryption})"location=escaper.escape_item(self._file_location)+maybe_withifself._expression_listisnotNone:location=f"(SELECT {self._expression_list} FROM {location})"files_or_pattern=""ifself._patternisnotNone:files_or_pattern=f"PATTERN = {escaper.escape_item(self._pattern)}\n"elifself._filesisnotNone:files_or_pattern=f"FILES = {escaper.escape_item(self._files)}\n"format_options=self._generate_options("FORMAT_OPTIONS",escaper,self._format_options)+"\n"copy_options=self._generate_options("COPY_OPTIONS",escaper,self._copy_options)+"\n"storage_cred=""ifself._storage_credential:storage_cred=f" WITH (CREDENTIAL {self._storage_credential})"validation=""ifself._validateisnotNone:ifisinstance(self._validate,bool):ifself._validate:validation="VALIDATE ALL\n"elifisinstance(self._validate,int):ifself._validate<0:raiseAirflowException("Number of rows for validation should be positive, got: "+str(self._validate))validation=f"VALIDATE {self._validate} ROWS\n"else:raiseAirflowException("Incorrect data type for validate parameter: "+str(type(self._validate)))# TODO: think on how to make sure that table_name and expression_list aren't used for SQL injectionsql=f"""COPY INTO {self._table_name}{storage_cred}FROM {location}FILEFORMAT = {self._file_format}{validation}{files_or_pattern}{format_options}{copy_options}"""returnsql.strip()