Source code for airflow.macros.hive

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime





def _closest_date(target_dt, date_list, before_target=None):
    """
    This function finds the date in a list closest to the target date.
    An optional parameter can be given to get the closest before or after.

    :param target_dt: The target date
    :type target_dt: datetime.date
    :param date_list: The list of dates to search
    :type date_list: list[datetime.date]
    :param before_target: closest before or after the target
    :type before_target: bool or None
    :returns: The closest date
    :rtype: datetime.date or None
    """
    fb = lambda d: target_dt - d if d <= target_dt else datetime.timedelta.max
    fa = lambda d: d - target_dt if d >= target_dt else datetime.timedelta.max
    fnone = lambda d: target_dt - d if d < target_dt else d - target_dt
    if before_target is None:
        return min(date_list, key=fnone).date()
    if before_target:
        return min(date_list, key=fb).date()
    else:
        return min(date_list, key=fa).date()


[docs]def closest_ds_partition( table, ds, before=True, schema="default", metastore_conn_id='metastore_default'): """ This function finds the date in a list closest to the target date. An optional parameter can be given to get the closest before or after. :param table: A hive table name :type table: str :param ds: A datestamp ``%Y-%m-%d`` e.g. ``yyyy-mm-dd`` :type ds: list[datetime.date] :param before: closest before (True), after (False) or either side of ds :type before: bool or None :param schema: table schema :type schema: str :param metastore_conn_id: which matastore connection to use :type metastore_conn_id: str :returns: The closest date :rtype: str or None >>> tbl = 'airflow.static_babynames_partitioned' >>> closest_ds_partition(tbl, '2015-01-02') '2015-01-01' """ from airflow.hooks.hive_hooks import HiveMetastoreHook if '.' in table: schema, table = table.split('.') hh = HiveMetastoreHook(metastore_conn_id=metastore_conn_id) partitions = hh.get_partitions(schema=schema, table_name=table) if not partitions: return None part_vals = [list(p.values())[0] for p in partitions] if ds in part_vals: return ds else: parts = [datetime.datetime.strptime(pv, '%Y-%m-%d') for pv in part_vals] target_dt = datetime.datetime.strptime(ds, '%Y-%m-%d') closest_ds = _closest_date(target_dt, parts, before_target=before) return closest_ds.isoformat()

Was this entry helpful?