Source code for airflow.operators.hive_to_druid

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook
from airflow.hooks.druid_hook import DruidHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

[docs]LOAD_CHECK_INTERVAL = 5
[docs]DEFAULT_TARGET_PARTITION_SIZE = 5000000
[docs]class HiveToDruidTransfer(BaseOperator): """ Moves data from Hive to Druid, [del]note that for now the data is loaded into memory before being pushed to Druid, so this operator should be used for smallish amount of data.[/del] :param sql: SQL query to execute against the Druid database. (templated) :type sql: str :param druid_datasource: the datasource you want to ingest into in druid :type druid_datasource: str :param ts_dim: the timestamp dimension :type ts_dim: str :param metric_spec: the metrics you want to define for your data :type metric_spec: list :param hive_cli_conn_id: the hive connection id :type hive_cli_conn_id: str :param druid_ingest_conn_id: the druid ingest connection id :type druid_ingest_conn_id: str :param metastore_conn_id: the metastore connection id :type metastore_conn_id: str :param hadoop_dependency_coordinates: list of coordinates to squeeze int the ingest json :type hadoop_dependency_coordinates: list[str] :param intervals: list of time intervals that defines segments, this is passed as is to the json object. (templated) :type intervals: list :param hive_tblproperties: additional properties for tblproperties in hive for the staging table :type hive_tblproperties: dict :param job_properties: additional properties for job :type job_properties: dict """
[docs] template_fields = ('sql', 'intervals')
[docs] template_ext = ('.sql',)
@apply_defaults def __init__( self, sql, druid_datasource, ts_dim, metric_spec=None, hive_cli_conn_id='hive_cli_default', druid_ingest_conn_id='druid_ingest_default', metastore_conn_id='metastore_default', hadoop_dependency_coordinates=None, intervals=None, num_shards=-1, target_partition_size=-1, query_granularity="NONE", segment_granularity="DAY", hive_tblproperties=None, job_properties=None, *args, **kwargs): super(HiveToDruidTransfer, self).__init__(*args, **kwargs) self.sql = sql self.druid_datasource = druid_datasource self.ts_dim = ts_dim self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] self.num_shards = num_shards self.target_partition_size = target_partition_size self.query_granularity = query_granularity self.segment_granularity = segment_granularity self.metric_spec = metric_spec or [{ "name": "count", "type": "count"}] self.hive_cli_conn_id = hive_cli_conn_id self.hadoop_dependency_coordinates = hadoop_dependency_coordinates self.druid_ingest_conn_id = druid_ingest_conn_id self.metastore_conn_id = metastore_conn_id self.hive_tblproperties = hive_tblproperties self.job_properties = job_properties
[docs] def execute(self, context): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) self.log.info("Extracting data from Hive") hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_') sql = self.sql.strip().strip(';') tblproperties = ''.join([", '{}' = '{}'" .format(k, v) for k, v in self.hive_tblproperties.items()]) hql = """\ SET mapred.output.compress=false; SET hive.exec.compress.output=false; DROP TABLE IF EXISTS {hive_table}; CREATE TABLE {hive_table} ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE TBLPROPERTIES ('serialization.null.format' = ''{tblproperties}) AS {sql} """.format(**locals()) self.log.info("Running command:\n %s", hql) hive.run_cli(hql) m = HiveMetastoreHook(self.metastore_conn_id) # Get the Hive table and extract the columns t = m.get_table(hive_table) columns = [col.name for col in t.sd.cols] # Get the path on hdfs hdfs_uri = m.get_table(hive_table).sd.location pos = hdfs_uri.find('/user') static_path = hdfs_uri[pos:] schema, table = hive_table.split('.') druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id) try: index_spec = self.construct_ingest_query( static_path=static_path, columns=columns, ) self.log.info("Inserting rows into Druid, hdfs path: %s", static_path) druid.submit_indexing_job(index_spec) self.log.info("Load seems to have succeeded!") finally: self.log.info( "Cleaning up by dropping the temp Hive table %s", hive_table ) hql = "DROP TABLE IF EXISTS {}".format(hive_table) hive.run_cli(hql)
[docs] def construct_ingest_query(self, static_path, columns): """ Builds an ingest query for an HDFS TSV load. :param static_path: The path on hdfs where the data is :type static_path: str :param columns: List of all the columns that are available :type columns: list """ # backward compatibility for num_shards, # but target_partition_size is the default setting # and overwrites the num_shards num_shards = self.num_shards target_partition_size = self.target_partition_size if self.target_partition_size == -1: if self.num_shards == -1: target_partition_size = DEFAULT_TARGET_PARTITION_SIZE else: num_shards = -1 metric_names = [m['fieldName'] for m in self.metric_spec if m['type'] != 'count'] # Take all the columns, which are not the time dimension # or a metric, as the dimension columns dimensions = [c for c in columns if c not in metric_names and c != self.ts_dim] ingest_query_dict = { "type": "index_hadoop", "spec": { "dataSchema": { "metricsSpec": self.metric_spec, "granularitySpec": { "queryGranularity": self.query_granularity, "intervals": self.intervals, "type": "uniform", "segmentGranularity": self.segment_granularity, }, "parser": { "type": "string", "parseSpec": { "columns": columns, "dimensionsSpec": { "dimensionExclusions": [], "dimensions": dimensions, # list of names "spatialDimensions": [] }, "timestampSpec": { "column": self.ts_dim, "format": "auto" }, "format": "tsv" } }, "dataSource": self.druid_datasource }, "tuningConfig": { "type": "hadoop", "jobProperties": { "mapreduce.job.user.classpath.first": "false", "mapreduce.map.output.compress": "false", "mapreduce.output.fileoutputformat.compress": "false", }, "partitionsSpec": { "type": "hashed", "targetPartitionSize": target_partition_size, "numShards": num_shards, }, }, "ioConfig": { "inputSpec": { "paths": static_path, "type": "static" }, "type": "hadoop" } } } if self.job_properties: ingest_query_dict['spec']['tuningConfig']['jobProperties'] \ .update(self.job_properties) if self.hadoop_dependency_coordinates: ingest_query_dict['hadoopDependencyCoordinates'] \ = self.hadoop_dependency_coordinates return ingest_query_dict