airflow.providers.apache.hive.hooks.hive
¶
Module Contents¶
Classes¶
Simple wrapper around the hive CLI. |
|
Wrapper to interact with the Hive Metastore. |
|
Wrapper around the pyhive library. |
Functions¶
Extract context from env variable, (dag_id, task_id, etc) for use in BashOperator and PythonOperator. |
Attributes¶
- airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][source]¶
- airflow.providers.apache.hive.hooks.hive.get_context_from_env_var()[source]¶
Extract context from env variable, (dag_id, task_id, etc) for use in BashOperator and PythonOperator.
- Returns
The context of interest.
- Return type
dict[Any, Any]
- class airflow.providers.apache.hive.hooks.hive.HiveCliHook(hive_cli_conn_id=default_conn_name, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, hive_cli_params='', auth=None, proxy_user=None)[source]¶
Bases:
airflow.hooks.base.BaseHook
Simple wrapper around the hive CLI.
It also supports the
beeline
a lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enablebeeline
, set the use_beeline param in the extra field of your connection as in{ "use_beeline": true }
Note that you can also set default hive CLI parameters by passing
hive_cli_params
space separated list of parameters to add to the hive command.The extra connection parameter
auth
gets passed as in thejdbc
connection string as is.- Parameters
hive_cli_conn_id (str) – Reference to the Hive CLI connection id.
mapred_queue (str | None) – queue used by the Hadoop Scheduler (Capacity or Fair)
mapred_queue_priority (str | None) – priority within the job queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
mapred_job_name (str | None) – This name will appear in the jobtracker. This can make monitoring easier.
hive_cli_params (str) – Space separated list of hive command parameters to add to the hive command.
proxy_user (str | None) – Run HQL code as this user.
- classmethod get_connection_form_widgets()[source]¶
Return connection widgets to add to Hive Client Wrapper connection form.
- classmethod get_ui_field_behaviour()[source]¶
Return custom UI field behaviour for Hive Client Wrapper connection.
- run_cli(hql, schema=None, verbose=True, hive_conf=None)[source]¶
Run an hql statement using the hive cli.
If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf.
- Parameters
hql (str) – an hql (hive query language) statement to run with hive cli
schema (str | None) – Name of hive schema (database) to use
verbose (bool) – Provides additional logging. Defaults to True.
hive_conf (dict[Any, Any] | None) – if specified these key value pairs will be passed to hive as
-hiveconf "key"="value"
. Note that they will be passed after thehive_cli_params
and thus will override whatever values are specified in the database.
>>> hh = HiveCliHook() >>> result = hh.run_cli("USE airflow;") >>> ("OK" in result) True
- load_df(df, table, field_dict=None, delimiter=',', encoding='utf8', pandas_kwargs=None, **kwargs)[source]¶
Load a pandas DataFrame into hive.
Hive data types will be inferred if not passed but column names will not be sanitized.
- Parameters
df (pandas.DataFrame) – DataFrame to load into a Hive table
table (str) – target Hive table, use dot notation to target a specific database
field_dict (dict[Any, Any] | None) – mapping from column name to hive data type. Note that Python dict is ordered so it keeps columns’ order.
delimiter (str) – field delimiter in the file
encoding (str) – str encoding to use when writing DataFrame to file
pandas_kwargs (Any) – passed to DataFrame.to_csv
kwargs (Any) – passed to self.load_file
- load_file(filepath, table, delimiter=',', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[source]¶
Load a local file into Hive.
Note that the table generated in Hive uses
STORED AS textfile
which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using aHiveOperator
.- Parameters
filepath (str) – local filepath of the file to load
table (str) – target Hive table, use dot notation to target a specific database
delimiter (str) – field delimiter in the file
field_dict (dict[Any, Any] | None) – A dictionary of the fields name in the file as keys and their Hive types as values. Note that Python dict is ordered so it keeps columns’ order.
create (bool) – whether to create the table if it doesn’t exist
overwrite (bool) – whether to overwrite the data in table or partition
partition (dict[str, Any] | None) – target partition as a dict of partition columns and values
recreate (bool) – whether to drop and recreate the table at every execution
tblproperties (dict[str, Any] | None) – TBLPROPERTIES of the hive table being created
- class airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook(metastore_conn_id=default_conn_name)[source]¶
Bases:
airflow.hooks.base.BaseHook
Wrapper to interact with the Hive Metastore.
- Parameters
metastore_conn_id (str) – reference to the :ref: metastore thrift service connection id <howto/connection:hive_metastore>.
- check_for_partition(schema, table, partition)[source]¶
Check whether a partition exists.
- Parameters
>>> hh = HiveMetastoreHook() >>> t = "static_babynames_partitioned" >>> hh.check_for_partition("airflow", t, "ds='2015-01-01'") True
- check_for_named_partition(schema, table, partition_name)[source]¶
Check whether a partition with a given name exists.
- Parameters
>>> hh = HiveMetastoreHook() >>> t = "static_babynames_partitioned" >>> hh.check_for_named_partition("airflow", t, "ds=2015-01-01") True >>> hh.check_for_named_partition("airflow", t, "ds=xxx") False
- get_table(table_name, db='default')[source]¶
Get a metastore table object.
>>> hh = HiveMetastoreHook() >>> t = hh.get_table(db="airflow", table_name="static_babynames") >>> t.tableName 'static_babynames' >>> [col.name for col in t.sd.cols] ['state', 'year', 'name', 'gender', 'num']
- get_partitions(schema, table_name, partition_filter=None)[source]¶
Return a list of all partitions in a table.
Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this.
>>> hh = HiveMetastoreHook() >>> t = "static_babynames_partitioned" >>> parts = hh.get_partitions(schema="airflow", table_name=t) >>> len(parts) 1 >>> parts [{'ds': '2015-01-01'}]
- max_partition(schema, table_name, field=None, filter_map=None)[source]¶
Return the maximum value for all partitions with given field in a table.
If only one partition key exist in the table, the key will be used as field. filter_map should be a partition_key:partition_value map and will be used to filter out partitions.
- Parameters
>>> hh = HiveMetastoreHook() >>> filter_map = {'ds': '2015-01-01'} >>> t = 'static_babynames_partitioned' >>> hh.max_partition(schema='airflow', ... table_name=t, field='ds', filter_map=filter_map) '2015-01-01'
- table_exists(table_name, db='default')[source]¶
Check if table exists.
>>> hh = HiveMetastoreHook() >>> hh.table_exists(db="airflow", table_name="static_babynames") True >>> hh.table_exists(db="airflow", table_name="does_not_exist") False
- drop_partitions(table_name, part_vals, delete_data=False, db='default')[source]¶
Drop partitions from the given table matching the part_vals input.
- Parameters
table_name – table name.
part_vals – list of partition specs.
delete_data – Setting to control if underlying data have to deleted in addition to dropping partitions.
db – Name of hive schema (database) @table belongs to
>>> hh = HiveMetastoreHook() >>> hh.drop_partitions(db='airflow', table_name='static_babynames', part_vals="['2020-05-01']") True
- class airflow.providers.apache.hive.hooks.hive.HiveServer2Hook(*args, schema=None, log_sql=True, **kwargs)[source]¶
Bases:
airflow.providers.common.sql.hooks.sql.DbApiHook
Wrapper around the pyhive library.
Notes: * the default auth_mechanism is PLAIN, to override it you can specify it in the
extra
of your connection in the UI * the default for run_set_variable_statements is true, if you are using impala you may need to set it to false in theextra
of your connection in the UI- Parameters
hiveserver2_conn_id – Reference to the :ref: Hive Server2 thrift service connection id <howto/connection:hiveserver2>.
schema (str | None) – Hive database name.
- get_results(sql, schema='default', fetch_size=None, hive_conf=None)[source]¶
Get results of the provided hql in target schema.
- Parameters
- Returns
results of hql execution, dict with data (list of results) and header
- Return type
- to_csv(sql, csv_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)[source]¶
Execute hql in target schema and write results to a csv file.
- Parameters
sql (str) – hql to be executed.
csv_filepath (str) – filepath of csv to write results into.
schema (str) – target schema, default to ‘default’.
delimiter (str) – delimiter of the csv file, default to ‘,’.
lineterminator (str) – lineterminator of the csv file.
output_header (bool) – header of the csv file, default to True.
fetch_size (int) – number of result rows to write into the csv file, default to 1000.
hive_conf (dict[Any, Any] | None) – hive_conf to execute alone with the hql.
- get_records(sql, parameters=None, **kwargs)[source]¶
Get a set of records from a Hive query; optionally pass a ‘schema’ kwarg to specify target schema.
- Parameters
- Returns
result of hive execution
- Return type
Any
>>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> len(hh.get_records(sql)) 100
- get_pandas_df(sql, schema='default', hive_conf=None, **kwargs)[source]¶
Get a pandas dataframe from a Hive query.
- Parameters
- Returns
result of hive execution
- Return type
>>> hh = HiveServer2Hook() >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" >>> df = hh.get_pandas_df(sql) >>> len(df.index) 100
- Returns
pandas.DateFrame
- Return type