# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook
from airflow.hooks.druid_hook import DruidHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
[docs]DEFAULT_TARGET_PARTITION_SIZE = 5000000 
[docs]class HiveToDruidTransfer(BaseOperator):
    """
    Moves data from Hive to Druid, [del]note that for now the data is loaded
    into memory before being pushed to Druid, so this operator should
    be used for smallish amount of data.[/del]
    :param sql: SQL query to execute against the Druid database. (templated)
    :type sql: str
    :param druid_datasource: the datasource you want to ingest into in druid
    :type druid_datasource: str
    :param ts_dim: the timestamp dimension
    :type ts_dim: str
    :param metric_spec: the metrics you want to define for your data
    :type metric_spec: list
    :param hive_cli_conn_id: the hive connection id
    :type hive_cli_conn_id: str
    :param druid_ingest_conn_id: the druid ingest connection id
    :type druid_ingest_conn_id: str
    :param metastore_conn_id: the metastore connection id
    :type metastore_conn_id: str
    :param hadoop_dependency_coordinates: list of coordinates to squeeze
        int the ingest json
    :type hadoop_dependency_coordinates: list[str]
    :param intervals: list of time intervals that defines segments,
        this is passed as is to the json object. (templated)
    :type intervals: list
    :param hive_tblproperties: additional properties for tblproperties in
        hive for the staging table
    :type hive_tblproperties: dict
    :param job_properties: additional properties for job
    :type job_properties: dict
    """
[docs]    template_fields = ('sql', 'intervals') 
[docs]    template_ext = ('.sql',) 
    @apply_defaults
    def __init__(
            self,
            sql,
            druid_datasource,
            ts_dim,
            metric_spec=None,
            hive_cli_conn_id='hive_cli_default',
            druid_ingest_conn_id='druid_ingest_default',
            metastore_conn_id='metastore_default',
            hadoop_dependency_coordinates=None,
            intervals=None,
            num_shards=-1,
            target_partition_size=-1,
            query_granularity="NONE",
            segment_granularity="DAY",
            hive_tblproperties=None,
            job_properties=None,
            *args, **kwargs):
        super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
        self.sql = sql
        self.druid_datasource = druid_datasource
        self.ts_dim = ts_dim
        self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
        self.num_shards = num_shards
        self.target_partition_size = target_partition_size
        self.query_granularity = query_granularity
        self.segment_granularity = segment_granularity
        self.metric_spec = metric_spec or [{
            "name": "count",
            "type": "count"}]
        self.hive_cli_conn_id = hive_cli_conn_id
        self.hadoop_dependency_coordinates = hadoop_dependency_coordinates
        self.druid_ingest_conn_id = druid_ingest_conn_id
        self.metastore_conn_id = metastore_conn_id
        self.hive_tblproperties = hive_tblproperties or {}
        self.job_properties = job_properties
[docs]    def execute(self, context):
        hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
        self.log.info("Extracting data from Hive")
        hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
        sql = self.sql.strip().strip(';')
        tblproperties = ''.join([", '{}' = '{}'"
                                .format(k, v)
                                 for k, v in self.hive_tblproperties.items()])
        hql = """\
        SET mapred.output.compress=false;
        SET hive.exec.compress.output=false;
        DROP TABLE IF EXISTS {hive_table};
        CREATE TABLE {hive_table}
        ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
        STORED AS TEXTFILE
        TBLPROPERTIES ('serialization.null.format' = ''{tblproperties})
        AS
        {sql}
        """.format(hive_table=hive_table, tblproperties=tblproperties, sql=sql)
        self.log.info("Running command:\n %s", hql)
        hive.run_cli(hql)
        m = HiveMetastoreHook(self.metastore_conn_id)
        # Get the Hive table and extract the columns
        t = m.get_table(hive_table)
        columns = [col.name for col in t.sd.cols]
        # Get the path on hdfs
        hdfs_uri = m.get_table(hive_table).sd.location
        pos = hdfs_uri.find('/user')
        static_path = hdfs_uri[pos:]
        schema, table = hive_table.split('.')
        druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id)
        try:
            index_spec = self.construct_ingest_query(
                static_path=static_path,
                columns=columns,
            )
            self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)
            druid.submit_indexing_job(index_spec)
            self.log.info("Load seems to have succeeded!")
        finally:
            self.log.info(
                "Cleaning up by dropping the temp Hive table %s",
                hive_table
            )
            hql = "DROP TABLE IF EXISTS {}".format(hive_table)
            hive.run_cli(hql) 
[docs]    def construct_ingest_query(self, static_path, columns):
        """
        Builds an ingest query for an HDFS TSV load.
        :param static_path: The path on hdfs where the data is
        :type static_path: str
        :param columns: List of all the columns that are available
        :type columns: list
        """
        # backward compatibility for num_shards,
        # but target_partition_size is the default setting
        # and overwrites the num_shards
        num_shards = self.num_shards
        target_partition_size = self.target_partition_size
        if self.target_partition_size == -1:
            if self.num_shards == -1:
                target_partition_size = DEFAULT_TARGET_PARTITION_SIZE
        else:
            num_shards = -1
        metric_names = [m['fieldName'] for m in self.metric_spec if m['type'] != 'count']
        # Take all the columns, which are not the time dimension
        # or a metric, as the dimension columns
        dimensions = [c for c in columns if c not in metric_names and c != self.ts_dim]
        ingest_query_dict = {
            "type": "index_hadoop",
            "spec": {
                "dataSchema": {
                    "metricsSpec": self.metric_spec,
                    "granularitySpec": {
                        "queryGranularity": self.query_granularity,
                        "intervals": self.intervals,
                        "type": "uniform",
                        "segmentGranularity": self.segment_granularity,
                    },
                    "parser": {
                        "type": "string",
                        "parseSpec": {
                            "columns": columns,
                            "dimensionsSpec": {
                                "dimensionExclusions": [],
                                "dimensions": dimensions,  # list of names
                                "spatialDimensions": []
                            },
                            "timestampSpec": {
                                "column": self.ts_dim,
                                "format": "auto"
                            },
                            "format": "tsv"
                        }
                    },
                    "dataSource": self.druid_datasource
                },
                "tuningConfig": {
                    "type": "hadoop",
                    "jobProperties": {
                        "mapreduce.job.user.classpath.first": "false",
                        "mapreduce.map.output.compress": "false",
                        "mapreduce.output.fileoutputformat.compress": "false",
                    },
                    "partitionsSpec": {
                        "type": "hashed",
                        "targetPartitionSize": target_partition_size,
                        "numShards": num_shards,
                    },
                },
                "ioConfig": {
                    "inputSpec": {
                        "paths": static_path,
                        "type": "static"
                    },
                    "type": "hadoop"
                }
            }
        }
        if self.job_properties:
            ingest_query_dict['spec']['tuningConfig']['jobProperties'] \
                
.update(self.job_properties)
        if self.hadoop_dependency_coordinates:
            ingest_query_dict['hadoopDependencyCoordinates'] \
                
= self.hadoop_dependency_coordinates
        return ingest_query_dict