Source code for tests.system.providers.qubole.example_qubole
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importfilecmpimportosimportrandomimporttextwrapfromdatetimeimportdatetimefromairflowimportDAGfromairflow.decoratorsimporttasktry:fromairflow.operators.emptyimportEmptyOperatorexceptModuleNotFoundError:fromairflow.operators.dummyimportDummyOperatorasEmptyOperator# type: ignorefromairflow.operators.pythonimportBranchPythonOperatorfromairflow.providers.qubole.operators.quboleimportQuboleOperatorfromairflow.utils.trigger_ruleimportTriggerRule
[docs]dag.doc_md=textwrap.dedent(""" This is only an example DAG to highlight usage of QuboleOperator in various scenarios, some of these tasks may or may not work based on your Qubole account setup. Run a shell command from Qubole Analyze against your Airflow cluster with following to trigger it manually `airflow dags trigger example_qubole_operator`. *Note: Make sure that connection `qubole_default` is properly set before running this example. Also be aware that it might spin up clusters to run these examples.* """
)@task(trigger_rule=TriggerRule.ALL_DONE)defcompare_result(hive_show_table,hive_s3_location,ti=None):""" Compares the results of two QuboleOperator tasks. :param hive_show_table: The "hive_show_table" task. :param hive_s3_location: The "hive_s3_location" task. :param ti: The TaskInstance object. :return: True if the files are the same, False otherwise. :rtype: bool """qubole_result_1=hive_show_table.get_results(ti)qubole_result_2=hive_s3_location.get_results(ti)returnfilecmp.cmp(qubole_result_1,qubole_result_2)# [START howto_operator_qubole_run_hive_query]hive_show_table=QuboleOperator(task_id='hive_show_table',command_type='hivecmd',query='show tables',cluster_label='{{ params.cluster_label }}',fetch_logs=True,# If `fetch_logs`=true, will fetch qubole command logs and concatenate# them into corresponding airflow task logstags='airflow_example_run',# To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_idparams={'cluster_label':'default',},)# [END howto_operator_qubole_run_hive_query]# [START howto_operator_qubole_run_hive_script]hive_s3_location=QuboleOperator(task_id='hive_s3_location',command_type="hivecmd",script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",notify=True,tags=['tag1','tag2'],# If the script at s3 location has any qubole specific macros to be replaced# macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',)# [END howto_operator_qubole_run_hive_script]options=['hadoop_jar_cmd','presto_cmd','db_query','spark_cmd']branching=BranchPythonOperator(task_id='branching',python_callable=lambda:random.choice(options))[hive_show_table,hive_s3_location]>>compare_result(hive_s3_location,hive_show_table)>>branchingjoin=EmptyOperator(task_id='join',trigger_rule=TriggerRule.ONE_SUCCESS)# [START howto_operator_qubole_run_hadoop_jar]hadoop_jar_cmd=QuboleOperator(task_id='hadoop_jar_cmd',command_type='hadoopcmd',sub_command='jar s3://paid-qubole/HadoopAPIExamples/''jars/hadoop-0.20.1-dev-streaming.jar ''-mapper wc ''-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/''data/3.tsv -output ''s3://paid-qubole/HadoopAPITests/data/3_wc',cluster_label='{{ params.cluster_label }}',fetch_logs=True,params={'cluster_label':'default',},)# [END howto_operator_qubole_run_hadoop_jar]# [START howto_operator_qubole_run_pig_script]pig_cmd=QuboleOperator(task_id='pig_cmd',command_type="pigcmd",script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",parameters="key1=value1 key2=value2",)# [END howto_operator_qubole_run_pig_script]branching>>hadoop_jar_cmd>>pig_cmd>>join# [START howto_operator_qubole_run_presto_query]presto_cmd=QuboleOperator(task_id='presto_cmd',command_type='prestocmd',query='show tables')# [END howto_operator_qubole_run_presto_query]# [START howto_operator_qubole_run_shell_script]shell_cmd=QuboleOperator(task_id='shell_cmd',command_type="shellcmd",script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",parameters="param1 param2",)# [END howto_operator_qubole_run_shell_script]branching>>presto_cmd>>shell_cmd>>join# [START howto_operator_qubole_run_db_tap_query]db_query=QuboleOperator(task_id='db_query',command_type='dbtapquerycmd',query='show tables',db_tap_id=2064)# [END howto_operator_qubole_run_db_tap_query]# [START howto_operator_qubole_run_db_export]db_export=QuboleOperator(task_id='db_export',command_type='dbexportcmd',mode=1,hive_table='default_qubole_airline_origin_destination',db_table='exported_airline_origin_destination',partition_spec='dt=20110104-02',dbtap_id=2064,)# [END howto_operator_qubole_run_db_export]branching>>db_query>>db_export>>join# [START howto_operator_qubole_run_db_import]db_import=QuboleOperator(task_id='db_import',command_type='dbimportcmd',mode=1,hive_table='default_qubole_airline_origin_destination',db_table='exported_airline_origin_destination',where_clause='id < 10',parallelism=2,dbtap_id=2064,)# [END howto_operator_qubole_run_db_import]# [START howto_operator_qubole_run_spark_scala]prog=''' import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } } '''spark_cmd=QuboleOperator(task_id='spark_cmd',command_type="sparkcmd",program=prog,language='scala',arguments='--class SparkPi',tags='airflow_example_run',)# [END howto_operator_qubole_run_spark_scala]branching>>db_import>>spark_cmd>>joinfromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)