:mod:`airflow.hooks.hive_hooks`
===============================

.. py:module:: airflow.hooks.hive_hooks







Module Contents
---------------






.. data:: HIVE_QUEUE_PRIORITIES
   :annotation: = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'] 

   









.. function:: get_context_from_env_var()

   
   Extract context from env variable, e.g. dag_id, task_id and execution_date,
   so that they can be used inside BashOperator and PythonOperator.
   :return: The context of interest.

   









.. py:class:: HiveCliHook(hive_cli_conn_id='hive_cli_default', run_as=None, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None)

   Bases::class:`airflow.hooks.base_hook.BaseHook`

   

   Simple wrapper around the hive CLI.

   It also supports the ``beeline``
   a lighter CLI that runs JDBC and is replacing the heavier
   traditional CLI. To enable ``beeline``, set the use_beeline param in the
   extra field of your connection as in ``{ "use_beeline": true }``

   Note that you can also set default hive CLI parameters using the
   ``hive_cli_params`` to be used in your connection as in
   ``{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}``
   Parameters passed here can be overridden by run_cli's hive_conf param

   The extra connection parameter ``auth`` gets passed as in the ``jdbc``
   connection string as is.

   :param mapred_queue: queue used by the Hadoop Scheduler (Capacity or Fair)
   :type  mapred_queue: str
   :param mapred_queue_priority: priority within the job queue.
       Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
   :type  mapred_queue_priority: str
   :param mapred_job_name: This name will appear in the jobtracker.
       This can make monitoring easier.
   :type  mapred_job_name: str


   

   

   

   .. method:: _prepare_cli_cmd(self)

      
      This function creates the command list from available information

      



   

   .. staticmethod:: _prepare_hiveconf(d)

      
      This function prepares a list of hiveconf params
      from a dictionary of key value pairs.

      :param d:
      :type d: dict

      >>> hh = HiveCliHook()
      >>> hive_conf = {"hive.exec.dynamic.partition": "true",
      ... "hive.exec.dynamic.partition.mode": "nonstrict"}
      >>> hh._prepare_hiveconf(hive_conf)
      ["-hiveconf", "hive.exec.dynamic.partition=true", "-hiveconf", "hive.exec.dynamic.partition.mode=nonstrict"]

      



   

   .. method:: run_cli(self, hql, schema=None, verbose=True, hive_conf=None)

      
      Run an hql statement using the hive cli. If hive_conf is specified
      it should be a dict and the entries will be set as key/value pairs
      in HiveConf


      :param hive_conf: if specified these key value pairs will be passed
          to hive as ``-hiveconf "key"="value"``. Note that they will be
          passed after the ``hive_cli_params`` and thus will override
          whatever values are specified in the database.
      :type hive_conf: dict

      >>> hh = HiveCliHook()
      >>> result = hh.run_cli("USE airflow;")
      >>> ("OK" in result)
      True

      



   

   .. method:: test_hql(self, hql)

      
      Test an hql statement using the hive cli and EXPLAIN

      



   

   .. method:: load_df(self, df, table, field_dict=None, delimiter=',', encoding='utf8', pandas_kwargs=None, **kwargs)

      
      Loads a pandas DataFrame into hive.

      Hive data types will be inferred if not passed but column names will
      not be sanitized.

      :param df: DataFrame to load into a Hive table
      :type df: pandas.DataFrame
      :param table: target Hive table, use dot notation to target a
          specific database
      :type table: str
      :param field_dict: mapping from column name to hive data type.
          Note that it must be OrderedDict so as to keep columns' order.
      :type field_dict: collections.OrderedDict
      :param delimiter: field delimiter in the file
      :type delimiter: str
      :param encoding: str encoding to use when writing DataFrame to file
      :type encoding: str
      :param pandas_kwargs: passed to DataFrame.to_csv
      :type pandas_kwargs: dict
      :param kwargs: passed to self.load_file

      



   

   .. method:: load_file(self, filepath, table, delimiter=',', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)

      
      Loads a local file into Hive

      Note that the table generated in Hive uses ``STORED AS textfile``
      which isn't the most efficient serialization format. If a
      large amount of data is loaded and/or if the tables gets
      queried considerably, you may want to use this operator only to
      stage the data into a temporary table before loading it into its
      final destination using a ``HiveOperator``.

      :param filepath: local filepath of the file to load
      :type filepath: str
      :param table: target Hive table, use dot notation to target a
          specific database
      :type table: str
      :param delimiter: field delimiter in the file
      :type delimiter: str
      :param field_dict: A dictionary of the fields name in the file
          as keys and their Hive types as values.
          Note that it must be OrderedDict so as to keep columns' order.
      :type field_dict: collections.OrderedDict
      :param create: whether to create the table if it doesn't exist
      :type create: bool
      :param overwrite: whether to overwrite the data in table or partition
      :type overwrite: bool
      :param partition: target partition as a dict of partition columns
          and values
      :type partition: dict
      :param recreate: whether to drop and recreate the table at every
          execution
      :type recreate: bool
      :param tblproperties: TBLPROPERTIES of the hive table being created
      :type tblproperties: dict

      



   

   .. method:: kill(self)

      











.. py:class:: HiveMetastoreHook(metastore_conn_id='metastore_default')

   Bases::class:`airflow.hooks.base_hook.BaseHook`

   

   Wrapper to interact with the Hive Metastore


   

   

   .. attribute:: MAX_PART_COUNT
      :annotation: = 32767 

      



   

   

   

   .. method:: __getstate__(self)

      



   

   .. method:: __setstate__(self, d)

      



   

   .. method:: get_metastore_client(self)

      
      Returns a Hive thrift client.

      



   

   .. method:: get_conn(self)

      



   

   .. method:: check_for_partition(self, schema, table, partition)

      
      Checks whether a partition exists

      :param schema: Name of hive schema (database) @table belongs to
      :type schema: str
      :param table: Name of hive table @partition belongs to
      :type schema: str
      :partition: Expression that matches the partitions to check for
          (eg `a = 'b' AND c = 'd'`)
      :type schema: str
      :rtype: bool

      >>> hh = HiveMetastoreHook()
      >>> t = 'static_babynames_partitioned'
      >>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
      True

      



   

   .. method:: check_for_named_partition(self, schema, table, partition_name)

      
      Checks whether a partition with a given name exists

      :param schema: Name of hive schema (database) @table belongs to
      :type schema: str
      :param table: Name of hive table @partition belongs to
      :type schema: str
      :partition: Name of the partitions to check for (eg `a=b/c=d`)
      :type schema: str
      :rtype: bool

      >>> hh = HiveMetastoreHook()
      >>> t = 'static_babynames_partitioned'
      >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
      True
      >>> hh.check_for_named_partition('airflow', t, "ds=xxx")
      False

      



   

   .. method:: get_table(self, table_name, db='default')

      
      Get a metastore table object

      >>> hh = HiveMetastoreHook()
      >>> t = hh.get_table(db='airflow', table_name='static_babynames')
      >>> t.tableName
      'static_babynames'
      >>> [col.name for col in t.sd.cols]
      ['state', 'year', 'name', 'gender', 'num']

      



   

   .. method:: get_tables(self, db, pattern='*')

      
      Get a metastore table object

      



   

   .. method:: get_databases(self, pattern='*')

      
      Get a metastore table object

      



   

   .. method:: get_partitions(self, schema, table_name, filter=None)

      
      Returns a list of all partitions in a table. Works only
      for tables with less than 32767 (java short max val).
      For subpartitioned table, the number might easily exceed this.

      >>> hh = HiveMetastoreHook()
      >>> t = 'static_babynames_partitioned'
      >>> parts = hh.get_partitions(schema='airflow', table_name=t)
      >>> len(parts)
      1
      >>> parts
      [{'ds': '2015-01-01'}]

      



   

   .. staticmethod:: _get_max_partition_from_part_specs(part_specs, partition_key, filter_map)

      
      Helper method to get max partition of partitions with partition_key
      from part specs. key:value pair in filter_map will be used to
      filter out partitions.

      :param part_specs: list of partition specs.
      :type part_specs: list
      :param partition_key: partition key name.
      :type partition_key: str
      :param filter_map: partition_key:partition_value map used for partition filtering,
                         e.g. {'key1': 'value1', 'key2': 'value2'}.
                         Only partitions matching all partition_key:partition_value
                         pairs will be considered as candidates of max partition.
      :type filter_map: map
      :return: Max partition or None if part_specs is empty.

      



   

   .. method:: max_partition(self, schema, table_name, field=None, filter_map=None)

      
      Returns the maximum value for all partitions with given field in a table.
      If only one partition key exist in the table, the key will be used as field.
      filter_map should be a partition_key:partition_value map and will be used to
      filter out partitions.

      :param schema: schema name.
      :type schema: str
      :param table_name: table name.
      :type table_name: str
      :param field: partition key to get max partition from.
      :type field: str
      :param filter_map: partition_key:partition_value map used for partition filtering.
      :type filter_map: map

      >>> hh = HiveMetastoreHook()
      >>> filter_map = {'ds': '2015-01-01', 'ds': '2014-01-01'}
      >>> t = 'static_babynames_partitioned'
      >>> hh.max_partition(schema='airflow',        ... table_name=t, field='ds', filter_map=filter_map)
      '2015-01-01'

      



   

   .. method:: table_exists(self, table_name, db='default')

      
      Check if table exists

      >>> hh = HiveMetastoreHook()
      >>> hh.table_exists(db='airflow', table_name='static_babynames')
      True
      >>> hh.table_exists(db='airflow', table_name='does_not_exist')
      False

      











.. py:class:: HiveServer2Hook(hiveserver2_conn_id='hiveserver2_default')

   Bases::class:`airflow.hooks.base_hook.BaseHook`

   

   Wrapper around the pyhive library

   Note that the default authMechanism is PLAIN, to override it you
   can specify it in the ``extra`` of your connection in the UI as in


   

   

   

   .. method:: get_conn(self, schema=None)

      
      Returns a Hive connection object.

      



   

   .. method:: _get_results(self, hql, schema='default', fetch_size=None, hive_conf=None)

      



   

   .. method:: get_results(self, hql, schema='default', fetch_size=None, hive_conf=None)

      
      Get results of the provided hql in target schema.

      :param hql: hql to be executed.
      :type hql: str or list
      :param schema: target schema, default to 'default'.
      :type schema: str
      :param fetch_size: max size of result to fetch.
      :type fetch_size: int
      :param hive_conf: hive_conf to execute alone with the hql.
      :type hive_conf: dict
      :return: results of hql execution, dict with data (list of results) and header
      :rtype: dict

      



   

   .. method:: to_csv(self, hql, csv_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)

      
      Execute hql in target schema and write results to a csv file.

      :param hql: hql to be executed.
      :type hql: str or list
      :param csv_filepath: filepath of csv to write results into.
      :type csv_filepath: str
      :param schema: target schema, default to 'default'.
      :type schema: str
      :param delimiter: delimiter of the csv file, default to ','.
      :type delimiter: str
      :param lineterminator: lineterminator of the csv file.
      :type lineterminator: str
      :param output_header: header of the csv file, default to True.
      :type output_header: bool
      :param fetch_size: number of result rows to write into the csv file, default to 1000.
      :type fetch_size: int
      :param hive_conf: hive_conf to execute alone with the hql.
      :type hive_conf: dict

      



   

   .. method:: get_records(self, hql, schema='default')

      
      Get a set of records from a Hive query.

      :param hql: hql to be executed.
      :type hql: str or list
      :param schema: target schema, default to 'default'.
      :type schema: str
      :param hive_conf: hive_conf to execute alone with the hql.
      :type hive_conf: dict
      :return: result of hive execution
      :rtype: list

      >>> hh = HiveServer2Hook()
      >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
      >>> len(hh.get_records(sql))
      100

      



   

   .. method:: get_pandas_df(self, hql, schema='default')

      
      Get a pandas dataframe from a Hive query

      :param hql: hql to be executed.
      :type hql: str or list
      :param schema: target schema, default to 'default'.
      :type schema: str
      :return: result of hql execution
      :rtype: DataFrame

      >>> hh = HiveServer2Hook()
      >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
      >>> df = hh.get_pandas_df(sql)
      >>> len(df.index)
      100

      :return: pandas.DateFrame