Source code for airflow.providers.apache.hive.transfers.s3_to_hive
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains an operator to move data from an S3 bucket to Hive."""from__future__importannotationsimportbz2importgzipimportosimportshutilimporttempfilefromcollections.abcimportSequencefromtempfileimportNamedTemporaryFile,TemporaryDirectoryfromtypingimportTYPE_CHECKING,Anyfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.apache.hive.hooks.hiveimportHiveCliHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classS3ToHiveOperator(BaseOperator):""" Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table. If the ``create`` or ``recreate`` arguments are set to ``True``, a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated. Hive data types are inferred from the cursor's metadata from. Note that the table generated in Hive uses ``STORED AS textfile`` which isn't the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a ``HiveOperator``. :param s3_key: The key to be retrieved from S3. (templated) :param field_dict: A dictionary of the fields name in the file as keys and their Hive types as values :param hive_table: target Hive table, use dot notation to target a specific database. (templated) :param delimiter: field delimiter in the file :param create: whether to create the table if it doesn't exist :param recreate: whether to drop and recreate the table at every execution :param partition: target partition as a dict of partition columns and values. (templated) :param headers: whether the file contains column names on the first line :param check_headers: whether the column names on the first line should be checked against the keys of field_dict :param wildcard_match: whether the s3_key should be interpreted as a Unix wildcard pattern :param aws_conn_id: source s3 connection :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :param hive_cli_conn_id: Reference to the :ref:`Hive CLI connection id <howto/connection:hive_cli>`. :param input_compressed: Boolean to determine if file decompression is required to process headers :param tblproperties: TBLPROPERTIES of the hive table being created :param select_expression: S3 Select expression """
ifself.check_headersandnot(self.field_dictisnotNoneandself.headers):raiseAirflowException("To check_headers provide field_dict and headers")
[docs]defexecute(self,context:Context):# Downloading file from S3s3_hook=S3Hook(aws_conn_id=self.aws_conn_id,verify=self.verify)hive_hook=HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id,auth=self.hive_auth)self.log.info("Downloading S3 file")ifself.wildcard_match:ifnots3_hook.check_for_wildcard_key(self.s3_key):raiseAirflowException(f"No key matches {self.s3_key}")s3_key_object=s3_hook.get_wildcard_key(self.s3_key)elifs3_hook.check_for_key(self.s3_key):s3_key_object=s3_hook.get_key(self.s3_key)else:raiseAirflowException(f"The key {self.s3_key} does not exists")ifTYPE_CHECKING:asserts3_key_object_,file_ext=os.path.splitext(s3_key_object.key)ifself.select_expressionandself.input_compressedandfile_ext.lower()!=".gz":raiseAirflowException("GZIP is the only compression format Amazon S3 Select supports")with(TemporaryDirectory(prefix="tmps32hive_")astmp_dir,NamedTemporaryFile(mode="wb",dir=tmp_dir,suffix=file_ext)asf,):self.log.info("Dumping S3 key %s contents to local file %s",s3_key_object.key,f.name)ifself.select_expression:option={}ifself.headers:option["FileHeaderInfo"]="USE"ifself.delimiter:option["FieldDelimiter"]=self.delimiterinput_serialization:dict[str,Any]={"CSV":option}ifself.input_compressed:input_serialization["CompressionType"]="GZIP"content=s3_hook.select_key(bucket_name=s3_key_object.bucket_name,key=s3_key_object.key,expression=self.select_expression,input_serialization=input_serialization,)f.write(content.encode("utf-8"))else:s3_key_object.download_fileobj(f)f.flush()ifself.select_expressionornotself.headers:self.log.info("Loading file %s into Hive",f.name)hive_hook.load_file(f.name,self.hive_table,field_dict=self.field_dict,create=self.create,partition=self.partition,delimiter=self.delimiter,recreate=self.recreate,tblproperties=self.tblproperties,)else:# Decompressing fileifself.input_compressed:self.log.info("Uncompressing file %s",f.name)fn_uncompressed=uncompress_file(f.name,file_ext,tmp_dir)self.log.info("Uncompressed to %s",fn_uncompressed)# uncompressed file available now so deleting# compressed file to save disk spacef.close()else:fn_uncompressed=f.name# Testing if header matches field_dictifself.check_headers:self.log.info("Matching file header against field_dict")header_list=self._get_top_row_as_list(fn_uncompressed)ifnotself._match_headers(header_list):raiseAirflowException("Header check failed")# Deleting top header rowself.log.info("Removing header from file %s",fn_uncompressed)headless_file=self._delete_top_row_and_compress(fn_uncompressed,file_ext,tmp_dir)self.log.info("Headless file %s",headless_file)self.log.info("Loading file %s into Hive",headless_file)hive_hook.load_file(headless_file,self.hive_table,field_dict=self.field_dict,create=self.create,partition=self.partition,delimiter=self.delimiter,recreate=self.recreate,tblproperties=self.tblproperties,)
def_get_top_row_as_list(self,file_name):withopen(file_name)asfile:header_line=file.readline().strip()returnheader_line.split(self.delimiter)def_match_headers(self,header_list):ifnotheader_list:raiseAirflowException("Unable to retrieve header row from file")field_names=self.field_dict.keys()iflen(field_names)!=len(header_list):self.log.warning("Headers count mismatch File headers:\n%s\nField names: \n%s\n",header_list,field_names)returnFalsetest_field_match=all(h1.lower()==h2.lower()forh1,h2inzip(header_list,field_names))iftest_field_match:returnTrueelse:self.log.warning("Headers do not match field names File headers:\n%s\nField names: \n%s\n",header_list,field_names,)returnFalse@staticmethoddef_delete_top_row_and_compress(input_file_name,output_file_ext,dest_dir):# When output_file_ext is not defined, file is not compressedopen_fn=openifoutput_file_ext.lower()==".gz":open_fn=gzip.GzipFileelifoutput_file_ext.lower()==".bz2":open_fn=bz2.BZ2File_,fn_output=tempfile.mkstemp(suffix=output_file_ext,dir=dest_dir)withopen(input_file_name,"rb")asf_in,open_fn(fn_output,"wb")asf_out:f_in.seek(0)next(f_in)forlineinf_in:f_out.write(line)returnfn_output
[docs]defuncompress_file(input_file_name,file_extension,dest_dir):"""Uncompress gz and bz2 files."""iffile_extension.lower()notin(".gz",".bz2"):raiseNotImplementedError(f"Received {file_extension} format. Only gz and bz2 files can currently be uncompressed.")iffile_extension.lower()==".gz":fmodule=gzip.GzipFileeliffile_extension.lower()==".bz2":fmodule=bz2.BZ2Filewith(fmodule(input_file_name,mode="rb")asf_compressed,NamedTemporaryFile(dir=dest_dir,mode="wb",delete=False)asf_uncompressed,):shutil.copyfileobj(f_compressed,f_uncompressed)returnf_uncompressed.name