airflow.providers.apache.hive.hooks.hive

Module Contents

airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][source]
airflow.providers.apache.hive.hooks.hive.get_context_from_env_var() → Dict[Any, Any][source]
Extract context from env variable, e.g. dag_id, task_id and execution_date,
so that they can be used inside BashOperator and PythonOperator.
Returns

The context of interest.

class airflow.providers.apache.hive.hooks.hive.HiveCliHook(hive_cli_conn_id: str = default_conn_name, run_as: Optional[str] = None, mapred_queue: Optional[str] = None, mapred_queue_priority: Optional[str] = None, mapred_job_name: Optional[str] = None)[source]

Bases: airflow.hooks.base.BaseHook

Simple wrapper around the hive CLI.

It also supports the beeline a lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enable beeline, set the use_beeline param in the extra field of your connection as in { "use_beeline": true }

Note that you can also set default hive CLI parameters using the hive_cli_params to be used in your connection as in {"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"} Parameters passed here can be overridden by run_cli's hive_conf param

The extra connection parameter auth gets passed as in the jdbc connection string as is.

Parameters
  • hive_cli_conn_id (str) -- Reference to the Hive CLI connection id.

  • mapred_queue (str) -- queue used by the Hadoop Scheduler (Capacity or Fair)

  • mapred_queue_priority (str) -- priority within the job queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

  • mapred_job_name (str) -- This name will appear in the jobtracker. This can make monitoring easier.

conn_name_attr = hive_cli_conn_id[source]
default_conn_name = hive_cli_default[source]
conn_type = hive_cli[source]
hook_name = Hive Client Wrapper[source]
_get_proxy_user(self)[source]

This function set the proper proxy_user value in case the user overwrite the default.

_prepare_cli_cmd(self)[source]

This function creates the command list from available information

static _prepare_hiveconf(d: Dict[Any, Any])[source]

This function prepares a list of hiveconf params from a dictionary of key value pairs.

Parameters

d (dict) --

>>> hh = HiveCliHook()
>>> hive_conf = {"hive.exec.dynamic.partition": "true",
... "hive.exec.dynamic.partition.mode": "nonstrict"}
>>> hh._prepare_hiveconf(hive_conf)
["-hiveconf", "hive.exec.dynamic.partition=true",        "-hiveconf", "hive.exec.dynamic.partition.mode=nonstrict"]
run_cli(self, hql: Union[str, str], schema: Optional[str] = None, verbose: bool = True, hive_conf: Optional[Dict[Any, Any]] = None)[source]

Run an hql statement using the hive cli. If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf

Parameters

hive_conf (dict) -- if specified these key value pairs will be passed to hive as -hiveconf "key"="value". Note that they will be passed after the hive_cli_params and thus will override whatever values are specified in the database.

>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
>>> ("OK" in result)
True
test_hql(self, hql: Union[str, str])[source]

Test an hql statement using the hive cli and EXPLAIN

load_df(self, df: pandas.DataFrame, table: str, field_dict: Optional[Dict[Any, Any]] = None, delimiter: str = ',', encoding: str = 'utf8', pandas_kwargs: Any = None, **kwargs)[source]

Loads a pandas DataFrame into hive.

Hive data types will be inferred if not passed but column names will not be sanitized.

Parameters
  • df (pandas.DataFrame) -- DataFrame to load into a Hive table

  • table (str) -- target Hive table, use dot notation to target a specific database

  • field_dict (collections.OrderedDict) -- mapping from column name to hive data type. Note that it must be OrderedDict so as to keep columns' order.

  • delimiter (str) -- field delimiter in the file

  • encoding (str) -- str encoding to use when writing DataFrame to file

  • pandas_kwargs (dict) -- passed to DataFrame.to_csv

  • kwargs -- passed to self.load_file

load_file(self, filepath: str, table: str, delimiter: str = ',', field_dict: Optional[Dict[Any, Any]] = None, create: bool = True, overwrite: bool = True, partition: Optional[Dict[str, Any]] = None, recreate: bool = False, tblproperties: Optional[Dict[str, Any]] = None)[source]

Loads a local file into Hive

Note that the table generated in Hive uses STORED AS textfile which isn't the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters
  • filepath (str) -- local filepath of the file to load

  • table (str) -- target Hive table, use dot notation to target a specific database

  • delimiter (str) -- field delimiter in the file

  • field_dict (collections.OrderedDict) -- A dictionary of the fields name in the file as keys and their Hive types as values. Note that it must be OrderedDict so as to keep columns' order.

  • create (bool) -- whether to create the table if it doesn't exist

  • overwrite (bool) -- whether to overwrite the data in table or partition

  • partition (dict) -- target partition as a dict of partition columns and values

  • recreate (bool) -- whether to drop and recreate the table at every execution

  • tblproperties (dict) -- TBLPROPERTIES of the hive table being created

kill(self)[source]

Kill Hive cli command

class airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook(metastore_conn_id: str = default_conn_name)[source]

Bases: airflow.hooks.base.BaseHook

Wrapper to interact with the Hive Metastore

Parameters

metastore_conn_id (str) -- reference to the :ref: metastore thrift service connection id <howto/connection:hive_metastore>.

MAX_PART_COUNT = 32767[source]
conn_name_attr = metastore_conn_id[source]
default_conn_name = metastore_default[source]
conn_type = hive_metastore[source]
hook_name = Hive Metastore Thrift[source]
__getstate__(self)[source]
__setstate__(self, d: Dict[str, Any])[source]
get_metastore_client(self)[source]

Returns a Hive thrift client.

_find_valid_server(self)[source]
get_conn(self)[source]
check_for_partition(self, schema: str, table: str, partition: str)[source]

Checks whether a partition exists

Parameters
  • schema (str) -- Name of hive schema (database) @table belongs to

  • table -- Name of hive table @partition belongs to

Partition

Expression that matches the partitions to check for (eg a = 'b' AND c = 'd')

Return type

bool

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
True
check_for_named_partition(self, schema: str, table: str, partition_name: str)[source]

Checks whether a partition with a given name exists

Parameters
  • schema (str) -- Name of hive schema (database) @table belongs to

  • table (str) -- Name of hive table @partition belongs to

Partition

Name of the partitions to check for (eg a=b/c=d)

Return type

bool

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
True
>>> hh.check_for_named_partition('airflow', t, "ds=xxx")
False
get_table(self, table_name: str, db: str = 'default')[source]

Get a metastore table object

>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db='airflow', table_name='static_babynames')
>>> t.tableName
'static_babynames'
>>> [col.name for col in t.sd.cols]
['state', 'year', 'name', 'gender', 'num']
get_tables(self, db: str, pattern: str = '*')[source]

Get a metastore table object

get_databases(self, pattern: str = '*')[source]

Get a metastore table object

get_partitions(self, schema: str, table_name: str, partition_filter: Optional[str] = None)[source]

Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this.

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> parts = hh.get_partitions(schema='airflow', table_name=t)
>>> len(parts)
1
>>> parts
[{'ds': '2015-01-01'}]
static _get_max_partition_from_part_specs(part_specs: List[Any], partition_key: Optional[str], filter_map: Optional[Dict[str, Any]])[source]

Helper method to get max partition of partitions with partition_key from part specs. key:value pair in filter_map will be used to filter out partitions.

Parameters
  • part_specs (list) -- list of partition specs.

  • partition_key (str) -- partition key name.

  • filter_map (map) -- partition_key:partition_value map used for partition filtering, e.g. {'key1': 'value1', 'key2': 'value2'}. Only partitions matching all partition_key:partition_value pairs will be considered as candidates of max partition.

Returns

Max partition or None if part_specs is empty.

Return type

basestring

max_partition(self, schema: str, table_name: str, field: Optional[str] = None, filter_map: Optional[Dict[Any, Any]] = None)[source]

Returns the maximum value for all partitions with given field in a table. If only one partition key exist in the table, the key will be used as field. filter_map should be a partition_key:partition_value map and will be used to filter out partitions.

Parameters
  • schema (str) -- schema name.

  • table_name (str) -- table name.

  • field (str) -- partition key to get max partition from.

  • filter_map (map) -- partition_key:partition_value map used for partition filtering.

>>> hh = HiveMetastoreHook()
>>> filter_map = {'ds': '2015-01-01'}
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow',        ... table_name=t, field='ds', filter_map=filter_map)
'2015-01-01'
table_exists(self, table_name: str, db: str = 'default')[source]

Check if table exists

>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db='airflow', table_name='static_babynames')
True
>>> hh.table_exists(db='airflow', table_name='does_not_exist')
False
drop_partitions(self, table_name, part_vals, delete_data=False, db='default')[source]

Drop partitions from the given table matching the part_vals input

Parameters
  • table_name (str) -- table name.

  • part_vals (list) -- list of partition specs.

  • delete_data (bool) -- Setting to control if underlying data have to deleted in addition to dropping partitions.

  • db (str) -- Name of hive schema (database) @table belongs to

>>> hh = HiveMetastoreHook()
>>> hh.drop_partitions(db='airflow', table_name='static_babynames',
part_vals="['2020-05-01']")
True
class airflow.providers.apache.hive.hooks.hive.HiveServer2Hook[source]

Bases: airflow.hooks.dbapi.DbApiHook

Wrapper around the pyhive library

Notes: * the default authMechanism is PLAIN, to override it you can specify it in the extra of your connection in the UI * the default for run_set_variable_statements is true, if you are using impala you may need to set it to false in the extra of your connection in the UI

Parameters
  • hiveserver2_conn_id (str) -- Reference to the :ref: Hive Server2 thrift service connection id <howto/connection:hiveserver2>.

  • schema (Optional[str]) -- Hive database name.

conn_name_attr = hiveserver2_conn_id[source]
default_conn_name = hiveserver2_default[source]
conn_type = hiveserver2[source]
hook_name = Hive Server 2 Thrift[source]
supports_autocommit = False[source]
get_conn(self, schema: Optional[str] = None)[source]

Returns a Hive connection object.

_get_results(self, hql: Union[str, str, List[str]], schema: str = 'default', fetch_size: Optional[int] = None, hive_conf: Optional[Dict[Any, Any]] = None)[source]
get_results(self, hql: Union[str, str], schema: str = 'default', fetch_size: Optional[int] = None, hive_conf: Optional[Dict[Any, Any]] = None)[source]

Get results of the provided hql in target schema.

Parameters
  • hql (str or list) -- hql to be executed.

  • schema (str) -- target schema, default to 'default'.

  • fetch_size (int) -- max size of result to fetch.

  • hive_conf (dict) -- hive_conf to execute alone with the hql.

Returns

results of hql execution, dict with data (list of results) and header

Return type

dict

to_csv(self, hql: Union[str, str], csv_filepath: str, schema: str = 'default', delimiter: str = ',', lineterminator: str = '\r\n', output_header: bool = True, fetch_size: int = 1000, hive_conf: Optional[Dict[Any, Any]] = None)[source]

Execute hql in target schema and write results to a csv file.

Parameters
  • hql (str or list) -- hql to be executed.

  • csv_filepath (str) -- filepath of csv to write results into.

  • schema (str) -- target schema, default to 'default'.

  • delimiter (str) -- delimiter of the csv file, default to ','.

  • lineterminator (str) -- lineterminator of the csv file.

  • output_header (bool) -- header of the csv file, default to True.

  • fetch_size (int) -- number of result rows to write into the csv file, default to 1000.

  • hive_conf (dict) -- hive_conf to execute alone with the hql.

get_records(self, hql: Union[str, str], schema: str = 'default', hive_conf: Optional[Dict[Any, Any]] = None)[source]

Get a set of records from a Hive query.

Parameters
  • hql (str or list) -- hql to be executed.

  • schema (str) -- target schema, default to 'default'.

  • hive_conf (dict) -- hive_conf to execute alone with the hql.

Returns

result of hive execution

Return type

list

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> len(hh.get_records(sql))
100
get_pandas_df(self, hql: Union[str, str], schema: str = 'default', hive_conf: Optional[Dict[Any, Any]] = None, **kwargs)[source]

Get a pandas dataframe from a Hive query

Parameters
  • hql (str or list) -- hql to be executed.

  • schema (str) -- target schema, default to 'default'.

  • hive_conf (dict) -- hive_conf to execute alone with the hql.

  • kwargs (dict) -- (optional) passed into pandas.DataFrame constructor

Returns

result of hive execution

Return type

DataFrame

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> df = hh.get_pandas_df(sql)
>>> len(df.index)
100
Returns

pandas.DateFrame

Was this entry helpful?