airflow.hooks.hive_hooks
¶
Module Contents¶
-
airflow.hooks.hive_hooks.
HIVE_QUEUE_PRIORITIES
= ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][source]¶
-
airflow.hooks.hive_hooks.
get_context_from_env_var
()[source]¶ Extract context from env variable, e.g. dag_id, task_id and execution_date, so that they can be used inside BashOperator and PythonOperator. :return: The context of interest.
-
class
airflow.hooks.hive_hooks.
HiveCliHook
(hive_cli_conn_id='hive_cli_default', run_as=None, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None)[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Simple wrapper around the hive CLI.
It also supports the
beeline
a lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enablebeeline
, set the use_beeline param in the extra field of your connection as in{ "use_beeline": true }
Note that you can also set default hive CLI parameters using the
hive_cli_params
to be used in your connection as in{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}
Parameters passed here can be overridden by run_cli’s hive_conf paramThe extra connection parameter
auth
gets passed as in thejdbc
connection string as is.- Parameters
-
static
_prepare_hiveconf
(d)[source]¶ This function prepares a list of hiveconf params from a dictionary of key value pairs.
- Parameters
d (dict) –
>>> hh = HiveCliHook() >>> hive_conf = {"hive.exec.dynamic.partition": "true", ... "hive.exec.dynamic.partition.mode": "nonstrict"} >>> hh._prepare_hiveconf(hive_conf) ["-hiveconf", "hive.exec.dynamic.partition=true", "-hiveconf", "hive.exec.dynamic.partition.mode=nonstrict"]
-
run_cli
(self, hql, schema=None, verbose=True, hive_conf=None)[source]¶ Run an hql statement using the hive cli. If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf
- Parameters
hive_conf (dict) – if specified these key value pairs will be passed to hive as
-hiveconf "key"="value"
. Note that they will be passed after thehive_cli_params
and thus will override whatever values are specified in the database.
>>> hh = HiveCliHook() >>> result = hh.run_cli("USE airflow;") >>> ("OK" in result) True
-
load_df
(self, df, table, field_dict=None, delimiter=', ', encoding='utf8', pandas_kwargs=None, **kwargs)[source]¶ Loads a pandas DataFrame into hive.
Hive data types will be inferred if not passed but column names will not be sanitized.
- Parameters
df (pandas.DataFrame) – DataFrame to load into a Hive table
table (str) – target Hive table, use dot notation to target a specific database
field_dict (collections.OrderedDict) – mapping from column name to hive data type. Note that it must be OrderedDict so as to keep columns’ order.
delimiter (str) – field delimiter in the file
encoding (str) – str encoding to use when writing DataFrame to file
pandas_kwargs (dict) – passed to DataFrame.to_csv
kwargs – passed to self.load_file
-
load_file
(self, filepath, table, delimiter=', ', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[source]¶ Loads a local file into Hive
Note that the table generated in Hive uses
STORED AS textfile
which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using aHiveOperator
.- Parameters
filepath (str) – local filepath of the file to load
table (str) – target Hive table, use dot notation to target a specific database
delimiter (str) – field delimiter in the file
field_dict (collections.OrderedDict) – A dictionary of the fields name in the file as keys and their Hive types as values. Note that it must be OrderedDict so as to keep columns’ order.
create (bool) – whether to create the table if it doesn’t exist
overwrite (bool) – whether to overwrite the data in table or partition
partition (dict) – target partition as a dict of partition columns and values
recreate (bool) – whether to drop and recreate the table at every execution
tblproperties (dict) – TBLPROPERTIES of the hive table being created
-
class
airflow.hooks.hive_hooks.
HiveMetastoreHook
(metastore_conn_id='metastore_default')[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Wrapper to interact with the Hive Metastore
-
check_for_partition
(self, schema, table, partition)[source]¶ Checks whether a partition exists
- Parameters
schema (str) – Name of hive schema (database) @table belongs to
table – Name of hive table @partition belongs to
- Partition
Expression that matches the partitions to check for (eg a = ‘b’ AND c = ‘d’)
- Return type
>>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' >>> hh.check_for_partition('airflow', t, "ds='2015-01-01'") True
-
check_for_named_partition
(self, schema, table, partition_name)[source]¶ Checks whether a partition with a given name exists
- Parameters
schema (str) – Name of hive schema (database) @table belongs to
table – Name of hive table @partition belongs to
- Partition
Name of the partitions to check for (eg a=b/c=d)
- Return type
>>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01") True >>> hh.check_for_named_partition('airflow', t, "ds=xxx") False
-
get_table
(self, table_name, db='default')[source]¶ Get a metastore table object
>>> hh = HiveMetastoreHook() >>> t = hh.get_table(db='airflow', table_name='static_babynames') >>> t.tableName 'static_babynames' >>> [col.name for col in t.sd.cols] ['state', 'year', 'name', 'gender', 'num']
-
get_partitions
(self, schema, table_name, filter=None)[source]¶ Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this.
>>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' >>> parts = hh.get_partitions(schema='airflow', table_name=t) >>> len(parts) 1 >>> parts [{'ds': '2015-01-01'}]
-
static
_get_max_partition_from_part_specs
(part_specs, partition_key, filter_map)[source]¶ Helper method to get max partition of partitions with partition_key from part specs. key:value pair in filter_map will be used to filter out partitions.
- Parameters
part_specs (list) – list of partition specs.
partition_key (str) – partition key name.
filter_map (map) – partition_key:partition_value map used for partition filtering, e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’}. Only partitions matching all partition_key:partition_value pairs will be considered as candidates of max partition.
- Returns
Max partition or None if part_specs is empty.
-
max_partition
(self, schema, table_name, field=None, filter_map=None)[source]¶ Returns the maximum value for all partitions with given field in a table. If only one partition key exist in the table, the key will be used as field. filter_map should be a partition_key:partition_value map and will be used to filter out partitions.
- Parameters
>>> hh = HiveMetastoreHook() >>> filter_map = {'ds': '2015-01-01', 'ds': '2014-01-01'} >>> t = 'static_babynames_partitioned' >>> hh.max_partition(schema='airflow', ... table_name=t, field='ds', filter_map=filter_map) '2015-01-01'
-
-
class
airflow.hooks.hive_hooks.
HiveServer2Hook
(hiveserver2_conn_id='hiveserver2_default')[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Wrapper around the pyhive library
Note that the default authMechanism is PLAIN, to override it you can specify it in the
extra
of your connection in the UI as in-
get_results
(self, hql, schema='default', fetch_size=None, hive_conf=None)[source]¶ Get results of the provided hql in target schema.
- Parameters
- Returns
results of hql execution, dict with data (list of results) and header
- Return type
-
to_csv
(self, hql, csv_filepath, schema='default', delimiter=', ', lineterminator='rn', output_header=True, fetch_size=1000, hive_conf=None)[source]¶ Execute hql in target schema and write results to a csv file.
- Parameters
csv_filepath (str) – filepath of csv to write results into.
schema (str) – target schema, default to ‘default’.
delimiter (str) – delimiter of the csv file, default to ‘,’.
lineterminator (str) – lineterminator of the csv file.
output_header (bool) – header of the csv file, default to True.
fetch_size (int) – number of result rows to write into the csv file, default to 1000.
hive_conf (dict) – hive_conf to execute alone with the hql.
-
get_records
(self, hql, schema='default')[source]¶ Get a set of records from a Hive query.
- Parameters
- Returns
result of hive execution
- Return type
>>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> len(hh.get_records(sql)) 100
-
get_pandas_df
(self, hql, schema='default')[source]¶ Get a pandas dataframe from a Hive query
- Parameters
- Returns
result of hql execution
- Return type
DataFrame
>>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> df = hh.get_pandas_df(sql) >>> len(df.index) 100
- Returns
pandas.DateFrame
-