airflow.hooks.hive_hooks

Module Contents

airflow.hooks.hive_hooks.HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][source]
airflow.hooks.hive_hooks.get_context_from_env_var()[source]

Extract context from env variable, e.g. dag_id, task_id and execution_date, so that they can be used inside BashOperator and PythonOperator. :return: The context of interest.

class airflow.hooks.hive_hooks.HiveCliHook(hive_cli_conn_id='hive_cli_default', run_as=None, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None)[source]

Bases:airflow.hooks.base_hook.BaseHook

Simple wrapper around the hive CLI.

It also supports the beeline a lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enable beeline, set the use_beeline param in the extra field of your connection as in { "use_beeline": true }

Note that you can also set default hive CLI parameters using the hive_cli_params to be used in your connection as in {"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"} Parameters passed here can be overridden by run_cli’s hive_conf param

The extra connection parameter auth gets passed as in the jdbc connection string as is.

Parameters
  • mapred_queue (str) – queue used by the Hadoop Scheduler (Capacity or Fair)

  • mapred_queue_priority (str) – priority within the job queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

  • mapred_job_name (str) – This name will appear in the jobtracker. This can make monitoring easier.

_prepare_cli_cmd(self)[source]

This function creates the command list from available information

static _prepare_hiveconf(d)[source]

This function prepares a list of hiveconf params from a dictionary of key value pairs.

Parameters

d (dict) –

>>> hh = HiveCliHook()
>>> hive_conf = {"hive.exec.dynamic.partition": "true",
... "hive.exec.dynamic.partition.mode": "nonstrict"}
>>> hh._prepare_hiveconf(hive_conf)
["-hiveconf", "hive.exec.dynamic.partition=true", "-hiveconf", "hive.exec.dynamic.partition.mode=nonstrict"]
run_cli(self, hql, schema=None, verbose=True, hive_conf=None)[source]

Run an hql statement using the hive cli. If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf

Parameters

hive_conf (dict) – if specified these key value pairs will be passed to hive as -hiveconf "key"="value". Note that they will be passed after the hive_cli_params and thus will override whatever values are specified in the database.

>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
>>> ("OK" in result)
True
test_hql(self, hql)[source]

Test an hql statement using the hive cli and EXPLAIN

load_df(self, df, table, field_dict=None, delimiter=', ', encoding='utf8', pandas_kwargs=None, **kwargs)[source]

Loads a pandas DataFrame into hive.

Hive data types will be inferred if not passed but column names will not be sanitized.

Parameters
  • df (pandas.DataFrame) – DataFrame to load into a Hive table

  • table (str) – target Hive table, use dot notation to target a specific database

  • field_dict (collections.OrderedDict) – mapping from column name to hive data type. Note that it must be OrderedDict so as to keep columns’ order.

  • delimiter (str) – field delimiter in the file

  • encoding (str) – str encoding to use when writing DataFrame to file

  • pandas_kwargs (dict) – passed to DataFrame.to_csv

  • kwargs – passed to self.load_file

load_file(self, filepath, table, delimiter=', ', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[source]

Loads a local file into Hive

Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters
  • filepath (str) – local filepath of the file to load

  • table (str) – target Hive table, use dot notation to target a specific database

  • delimiter (str) – field delimiter in the file

  • field_dict (collections.OrderedDict) – A dictionary of the fields name in the file as keys and their Hive types as values. Note that it must be OrderedDict so as to keep columns’ order.

  • create (bool) – whether to create the table if it doesn’t exist

  • overwrite (bool) – whether to overwrite the data in table or partition

  • partition (dict) – target partition as a dict of partition columns and values

  • recreate (bool) – whether to drop and recreate the table at every execution

  • tblproperties (dict) – TBLPROPERTIES of the hive table being created

kill(self)[source]
class airflow.hooks.hive_hooks.HiveMetastoreHook(metastore_conn_id='metastore_default')[source]

Bases:airflow.hooks.base_hook.BaseHook

Wrapper to interact with the Hive Metastore

MAX_PART_COUNT = 32767[source]
__getstate__(self)[source]
__setstate__(self, d)[source]
get_metastore_client(self)[source]

Returns a Hive thrift client.

get_conn(self)[source]
check_for_partition(self, schema, table, partition)[source]

Checks whether a partition exists

Parameters
  • schema (str) – Name of hive schema (database) @table belongs to

  • table – Name of hive table @partition belongs to

Partition

Expression that matches the partitions to check for (eg a = ‘b’ AND c = ‘d’)

Return type

bool

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
True
check_for_named_partition(self, schema, table, partition_name)[source]

Checks whether a partition with a given name exists

Parameters
  • schema (str) – Name of hive schema (database) @table belongs to

  • table – Name of hive table @partition belongs to

Partition

Name of the partitions to check for (eg a=b/c=d)

Return type

bool

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
True
>>> hh.check_for_named_partition('airflow', t, "ds=xxx")
False
get_table(self, table_name, db='default')[source]

Get a metastore table object

>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db='airflow', table_name='static_babynames')
>>> t.tableName
'static_babynames'
>>> [col.name for col in t.sd.cols]
['state', 'year', 'name', 'gender', 'num']
get_tables(self, db, pattern='*')[source]

Get a metastore table object

get_databases(self, pattern='*')[source]

Get a metastore table object

get_partitions(self, schema, table_name, filter=None)[source]

Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this.

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> parts = hh.get_partitions(schema='airflow', table_name=t)
>>> len(parts)
1
>>> parts
[{'ds': '2015-01-01'}]
static _get_max_partition_from_part_specs(part_specs, partition_key, filter_map)[source]

Helper method to get max partition of partitions with partition_key from part specs. key:value pair in filter_map will be used to filter out partitions.

Parameters
  • part_specs (list) – list of partition specs.

  • partition_key (str) – partition key name.

  • filter_map (map) – partition_key:partition_value map used for partition filtering, e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’}. Only partitions matching all partition_key:partition_value pairs will be considered as candidates of max partition.

Returns

Max partition or None if part_specs is empty.

max_partition(self, schema, table_name, field=None, filter_map=None)[source]

Returns the maximum value for all partitions with given field in a table. If only one partition key exist in the table, the key will be used as field. filter_map should be a partition_key:partition_value map and will be used to filter out partitions.

Parameters
  • schema (str) – schema name.

  • table_name (str) – table name.

  • field (str) – partition key to get max partition from.

  • filter_map (map) – partition_key:partition_value map used for partition filtering.

>>> hh = HiveMetastoreHook()
>>> filter_map = {'ds': '2015-01-01', 'ds': '2014-01-01'}
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow',        ... table_name=t, field='ds', filter_map=filter_map)
'2015-01-01'
table_exists(self, table_name, db='default')[source]

Check if table exists

>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db='airflow', table_name='static_babynames')
True
>>> hh.table_exists(db='airflow', table_name='does_not_exist')
False
class airflow.hooks.hive_hooks.HiveServer2Hook(hiveserver2_conn_id='hiveserver2_default')[source]

Bases:airflow.hooks.base_hook.BaseHook

Wrapper around the pyhive library

Note that the default authMechanism is PLAIN, to override it you can specify it in the extra of your connection in the UI as in

get_conn(self, schema=None)[source]

Returns a Hive connection object.

_get_results(self, hql, schema='default', fetch_size=None, hive_conf=None)[source]
get_results(self, hql, schema='default', fetch_size=None, hive_conf=None)[source]

Get results of the provided hql in target schema.

Parameters
  • hql (str or list) – hql to be executed.

  • schema (str) – target schema, default to ‘default’.

  • fetch_size (int) – max size of result to fetch.

  • hive_conf (dict) – hive_conf to execute alone with the hql.

Returns

results of hql execution, dict with data (list of results) and header

Return type

dict

to_csv(self, hql, csv_filepath, schema='default', delimiter=', ', lineterminator='rn', output_header=True, fetch_size=1000, hive_conf=None)[source]

Execute hql in target schema and write results to a csv file.

Parameters
  • hql (str or list) – hql to be executed.

  • csv_filepath (str) – filepath of csv to write results into.

  • schema (str) – target schema, default to ‘default’.

  • delimiter (str) – delimiter of the csv file, default to ‘,’.

  • lineterminator (str) – lineterminator of the csv file.

  • output_header (bool) – header of the csv file, default to True.

  • fetch_size (int) – number of result rows to write into the csv file, default to 1000.

  • hive_conf (dict) – hive_conf to execute alone with the hql.

get_records(self, hql, schema='default')[source]

Get a set of records from a Hive query.

Parameters
  • hql (str or list) – hql to be executed.

  • schema (str) – target schema, default to ‘default’.

  • hive_conf (dict) – hive_conf to execute alone with the hql.

Returns

result of hive execution

Return type

list

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> len(hh.get_records(sql))
100
get_pandas_df(self, hql, schema='default')[source]

Get a pandas dataframe from a Hive query

Parameters
  • hql (str or list) – hql to be executed.

  • schema (str) – target schema, default to ‘default’.

Returns

result of hql execution

Return type

DataFrame

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> df = hh.get_pandas_df(sql)
>>> len(df.index)
100
Returns

pandas.DateFrame