Qubole

Qubole is an open, simple, and secure data lake platform for machine learning, streaming and adhoc analytics. Qubole delivers a Self-Service Platform for Big Data Analytics built on Amazon Web Services, Microsoft and Google Clouds.

Airflow provides operators to execute tasks (commands) on QDS and perform checks against Qubole Commands. Also, there are provided sensors that waits for a file, folder or partition to be present in cloud storage and check for its presence via QDS APIs

Prerequisite Tasks

Execute tasks

To run following commands use QuboleOperator.

Run Hive command

To run query that shows all tables you can use

airflow/providers/qubole/example_dags/example_qubole.py[source]

hive_show_table = QuboleOperator(
    task_id='hive_show_table',
    command_type='hivecmd',
    query='show tables',
    cluster_label='{{ params.cluster_label }}',
    fetch_logs=True,
    # If `fetch_logs`=true, will fetch qubole command logs and concatenate
    # them into corresponding airflow task logs
    tags='airflow_example_run',
    # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
    params={
        'cluster_label': 'default',
    },
)

Also you can run script that locates in the bucket by passing path to query file

airflow/providers/qubole/example_dags/example_qubole.py[source]

hive_s3_location = QuboleOperator(
    task_id='hive_s3_location',
    command_type="hivecmd",
    script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
    notify=True,
    tags=['tag1', 'tag2'],
    # If the script at s3 location has any qubole specific macros to be replaced
    # macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
)

Run Hadoop command

To run jar file in your Hadoop cluster use

airflow/providers/qubole/example_dags/example_qubole.py[source]

hadoop_jar_cmd = QuboleOperator(
    task_id='hadoop_jar_cmd',
    command_type='hadoopcmd',
    sub_command='jar s3://paid-qubole/HadoopAPIExamples/'
    'jars/hadoop-0.20.1-dev-streaming.jar '
    '-mapper wc '
    '-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/'
    'data/3.tsv -output '
    's3://paid-qubole/HadoopAPITests/data/3_wc',
    cluster_label='{{ params.cluster_label }}',
    fetch_logs=True,
    params={
        'cluster_label': 'default',
    },
)

Run Pig command

To run script script in Pig Latin in your Hadoop cluster use

airflow/providers/qubole/example_dags/example_qubole.py[source]

pig_cmd = QuboleOperator(
    task_id='pig_cmd',
    command_type="pigcmd",
    script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
    parameters="key1=value1 key2=value2",
)

Run Shell command

To run Shell-script script use

airflow/providers/qubole/example_dags/example_qubole.py[source]

shell_cmd = QuboleOperator(
    task_id='shell_cmd',
    command_type="shellcmd",
    script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
    parameters="param1 param2",
)

Run Presto command

To run query using Presto use

airflow/providers/qubole/example_dags/example_qubole.py[source]

presto_cmd = QuboleOperator(task_id='presto_cmd', command_type='prestocmd', query='show tables')

Run DB commands

To run query as DbTap use

airflow/providers/qubole/example_dags/example_qubole.py[source]

db_query = QuboleOperator(
    task_id='db_query', command_type='dbtapquerycmd', query='show tables', db_tap_id=2064
)

To run DB export command use

airflow/providers/qubole/example_dags/example_qubole.py[source]

db_export = QuboleOperator(
    task_id='db_export',
    command_type='dbexportcmd',
    mode=1,
    hive_table='default_qubole_airline_origin_destination',
    db_table='exported_airline_origin_destination',
    partition_spec='dt=20110104-02',
    dbtap_id=2064,
)

To run DB import command use

airflow/providers/qubole/example_dags/example_qubole.py[source]

db_import = QuboleOperator(
    task_id='db_import',
    command_type='dbimportcmd',
    mode=1,
    hive_table='default_qubole_airline_origin_destination',
    db_table='exported_airline_origin_destination',
    where_clause='id < 10',
    parallelism=2,
    dbtap_id=2064,
)

Run Spark commands

To run Scala script as a Spark job use

airflow/providers/qubole/example_dags/example_qubole.py[source]

prog = '''
import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
'''

spark_cmd = QuboleOperator(
    task_id='spark_cmd',
    command_type="sparkcmd",
    program=prog,
    language='scala',
    arguments='--class SparkPi',
    tags='airflow_example_run',
)

File sensor

Usage examples of QuboleFileSensor.

File or directory existence

To wait for file or directory existence in cluster use

airflow/providers/qubole/example_dags/example_qubole.py[source]

check_s3_file = QuboleFileSensor(
    task_id='check_s3_file',
    poke_interval=60,
    timeout=600,
    data={
        "files": [
            "s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
            "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
        ]  # will check for availability of all the files in array
    },
)

Partition sensor

Usage examples of QubolePartitionSensor.

Partition existence

To wait for table partition existence in cluster use

airflow/providers/qubole/example_dags/example_qubole.py[source]

check_hive_partition = QubolePartitionSensor(
    task_id='check_hive_partition',
    poke_interval=10,
    timeout=60,
    data={
        "schema": "default",
        "table": "my_partitioned_table",
        "columns": [
            {"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
            {"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
        ],  # will check for partitions like [month=12/day=12,month=12/day=13]
    },
)

Reference

For further information, look at:

Was this entry helpful?