airflow.hooks.hive_hooks¶
Module Contents¶
- 
airflow.hooks.hive_hooks.HIVE_QUEUE_PRIORITIES= ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][source]¶
- 
airflow.hooks.hive_hooks.get_context_from_env_var()[source]¶
- 
Extract context from env variable, e.g. dag_id, task_id and execution_date,
- 
so that they can be used inside BashOperator and PythonOperator.
- Returns
- The context of interest. 
 
- 
class airflow.hooks.hive_hooks.HiveCliHook(hive_cli_conn_id='hive_cli_default', run_as=None, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None)[source]¶
- Bases: - airflow.hooks.base_hook.BaseHook- Simple wrapper around the hive CLI. - It also supports the - beelinea lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enable- beeline, set the use_beeline param in the extra field of your connection as in- { "use_beeline": true }- Note that you can also set default hive CLI parameters using the - hive_cli_paramsto be used in your connection as in- {"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}Parameters passed here can be overridden by run_cli’s hive_conf param- The extra connection parameter - authgets passed as in the- jdbcconnection string as is.- Parameters
 - 
_get_proxy_user(self)[source]¶
- This function set the proper proxy_user value in case the user overwtire the default. 
 - 
static _prepare_hiveconf(d)[source]¶
- This function prepares a list of hiveconf params from a dictionary of key value pairs. - Parameters
- d (dict) – 
 - >>> hh = HiveCliHook() >>> hive_conf = {"hive.exec.dynamic.partition": "true", ... "hive.exec.dynamic.partition.mode": "nonstrict"} >>> hh._prepare_hiveconf(hive_conf) ["-hiveconf", "hive.exec.dynamic.partition=true", "-hiveconf", "hive.exec.dynamic.partition.mode=nonstrict"] 
 - 
run_cli(self, hql, schema=None, verbose=True, hive_conf=None)[source]¶
- Run an hql statement using the hive cli. If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf - Parameters
- hive_conf (dict) – if specified these key value pairs will be passed to hive as - -hiveconf "key"="value". Note that they will be passed after the- hive_cli_paramsand thus will override whatever values are specified in the database.
 - >>> hh = HiveCliHook() >>> result = hh.run_cli("USE airflow;") >>> ("OK" in result) True 
 - 
load_df(self, df, table, field_dict=None, delimiter=',', encoding='utf8', pandas_kwargs=None, **kwargs)[source]¶
- Loads a pandas DataFrame into hive. - Hive data types will be inferred if not passed but column names will not be sanitized. - Parameters
- df (pandas.DataFrame) – DataFrame to load into a Hive table 
- table (str) – target Hive table, use dot notation to target a specific database 
- field_dict (collections.OrderedDict) – mapping from column name to hive data type. Note that it must be OrderedDict so as to keep columns’ order. 
- delimiter (str) – field delimiter in the file 
- encoding (str) – str encoding to use when writing DataFrame to file 
- pandas_kwargs (dict) – passed to DataFrame.to_csv 
- kwargs – passed to self.load_file 
 
 
 - 
load_file(self, filepath, table, delimiter=',', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[source]¶
- Loads a local file into Hive - Note that the table generated in Hive uses - STORED AS textfilewhich isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a- HiveOperator.- Parameters
- filepath (str) – local filepath of the file to load 
- table (str) – target Hive table, use dot notation to target a specific database 
- delimiter (str) – field delimiter in the file 
- field_dict (collections.OrderedDict) – A dictionary of the fields name in the file as keys and their Hive types as values. Note that it must be OrderedDict so as to keep columns’ order. 
- create (bool) – whether to create the table if it doesn’t exist 
- overwrite (bool) – whether to overwrite the data in table or partition 
- partition (dict) – target partition as a dict of partition columns and values 
- recreate (bool) – whether to drop and recreate the table at every execution 
- tblproperties (dict) – TBLPROPERTIES of the hive table being created 
 
 
 
- 
class airflow.hooks.hive_hooks.HiveMetastoreHook(metastore_conn_id='metastore_default')[source]¶
- Bases: - airflow.hooks.base_hook.BaseHook- Wrapper to interact with the Hive Metastore - 
check_for_partition(self, schema, table, partition)[source]¶
- Checks whether a partition exists - Parameters
- schema (str) – Name of hive schema (database) @table belongs to 
- table – Name of hive table @partition belongs to 
 
- Partition
- Expression that matches the partitions to check for (eg a = ‘b’ AND c = ‘d’) 
- Return type
 - >>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' >>> hh.check_for_partition('airflow', t, "ds='2015-01-01'") True 
 - 
check_for_named_partition(self, schema, table, partition_name)[source]¶
- Checks whether a partition with a given name exists - Parameters
- schema (str) – Name of hive schema (database) @table belongs to 
- table – Name of hive table @partition belongs to 
 
- Partition
- Name of the partitions to check for (eg a=b/c=d) 
- Return type
 - >>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01") True >>> hh.check_for_named_partition('airflow', t, "ds=xxx") False 
 - 
get_table(self, table_name, db='default')[source]¶
- Get a metastore table object - >>> hh = HiveMetastoreHook() >>> t = hh.get_table(db='airflow', table_name='static_babynames') >>> t.tableName 'static_babynames' >>> [col.name for col in t.sd.cols] ['state', 'year', 'name', 'gender', 'num'] 
 - 
get_partitions(self, schema, table_name, filter=None)[source]¶
- Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this. - >>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' >>> parts = hh.get_partitions(schema='airflow', table_name=t) >>> len(parts) 1 >>> parts [{'ds': '2015-01-01'}] 
 - 
static _get_max_partition_from_part_specs(part_specs, partition_key, filter_map)[source]¶
- Helper method to get max partition of partitions with partition_key from part specs. key:value pair in filter_map will be used to filter out partitions. - Parameters
- part_specs (list) – list of partition specs. 
- partition_key (str) – partition key name. 
- filter_map (map) – partition_key:partition_value map used for partition filtering, e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’}. Only partitions matching all partition_key:partition_value pairs will be considered as candidates of max partition. 
 
- Returns
- Max partition or None if part_specs is empty. 
- Return type
- basestring 
 
 - 
max_partition(self, schema, table_name, field=None, filter_map=None)[source]¶
- Returns the maximum value for all partitions with given field in a table. If only one partition key exist in the table, the key will be used as field. filter_map should be a partition_key:partition_value map and will be used to filter out partitions. - Parameters
 - >>> hh = HiveMetastoreHook() >>> filter_map = {'ds': '2015-01-01', 'ds': '2014-01-01'} >>> t = 'static_babynames_partitioned' >>> hh.max_partition(schema='airflow', ... table_name=t, field='ds', filter_map=filter_map) '2015-01-01' 
 
- 
- 
class airflow.hooks.hive_hooks.HiveServer2Hook(hiveserver2_conn_id='hiveserver2_default')[source]¶
- Bases: - airflow.hooks.base_hook.BaseHook- Wrapper around the pyhive library - Notes: * the default authMechanism is PLAIN, to override it you can specify it in the - extraof your connection in the UI * the default for run_set_variable_statements is true, if you are using impala you may need to set it to false in the- extraof your connection in the UI- 
get_results(self, hql, schema='default', fetch_size=None, hive_conf=None)[source]¶
- Get results of the provided hql in target schema. - Parameters
- Returns
- results of hql execution, dict with data (list of results) and header 
- Return type
 
 - 
to_csv(self, hql, csv_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)[source]¶
- Execute hql in target schema and write results to a csv file. - Parameters
- csv_filepath (str) – filepath of csv to write results into. 
- schema (str) – target schema, default to ‘default’. 
- delimiter (str) – delimiter of the csv file, default to ‘,’. 
- lineterminator (str) – lineterminator of the csv file. 
- output_header (bool) – header of the csv file, default to True. 
- fetch_size (int) – number of result rows to write into the csv file, default to 1000. 
- hive_conf (dict) – hive_conf to execute alone with the hql. 
 
 
 - 
get_records(self, hql, schema='default')[source]¶
- Get a set of records from a Hive query. - Parameters
- Returns
- result of hive execution 
- Return type
 - >>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> len(hh.get_records(sql)) 100 
 - 
get_pandas_df(self, hql, schema='default', **kwargs)[source]¶
- Get a pandas dataframe from a Hive query - Parameters
- Returns
- result of hql execution 
- Return type
- DataFrame 
 - >>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> df = hh.get_pandas_df(sql) >>> len(df.index) 100 - Returns
- pandas.DateFrame 
 
 
-