Source code for airflow.providers.google.cloud.example_dags.example_dataproc
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG that show how to use various Dataprocoperators to manage a cluster and submit jobs."""importosfromairflowimportmodelsfromairflow.providers.google.cloud.operators.dataprocimport(DataprocCreateClusterOperator,DataprocCreateWorkflowTemplateOperator,DataprocDeleteClusterOperator,DataprocInstantiateWorkflowTemplateOperator,DataprocSubmitJobOperator,DataprocUpdateClusterOperator,)fromairflow.providers.google.cloud.sensors.dataprocimportDataprocJobSensorfromairflow.utils.datesimportdays_agoPROJECT_ID=os.environ.get("GCP_PROJECT_ID","an-id")CLUSTER_NAME=os.environ.get("GCP_DATAPROC_CLUSTER_NAME","example-cluster")REGION=os.environ.get("GCP_LOCATION","europe-west1")ZONE=os.environ.get("GCP_REGION","europe-west1-b")BUCKET=os.environ.get("GCP_DATAPROC_BUCKET","dataproc-system-tests")OUTPUT_FOLDER="wordcount"OUTPUT_PATH=f"gs://{BUCKET}/{OUTPUT_FOLDER}/"PYSPARK_MAIN=os.environ.get("PYSPARK_MAIN","hello_world.py")PYSPARK_URI=f"gs://{BUCKET}/{PYSPARK_MAIN}"SPARKR_MAIN=os.environ.get("SPARKR_MAIN","hello_world.R")SPARKR_URI=f"gs://{BUCKET}/{SPARKR_MAIN}"# Cluster definition# [START how_to_cloud_dataproc_create_cluster]CLUSTER_CONFIG={"master_config":{"num_instances":1,"machine_type_uri":"n1-standard-4","disk_config":{"boot_disk_type":"pd-standard","boot_disk_size_gb":1024},},"worker_config":{"num_instances":2,"machine_type_uri":"n1-standard-4","disk_config":{"boot_disk_type":"pd-standard","boot_disk_size_gb":1024},},}# [END how_to_cloud_dataproc_create_cluster]# Update options# [START how_to_cloud_dataproc_updatemask_cluster_operator]CLUSTER_UPDATE={"config":{"worker_config":{"num_instances":3},"secondary_worker_config":{"num_instances":3}}}UPDATE_MASK={"paths":["config.worker_config.num_instances","config.secondary_worker_config.num_instances"]}# [END how_to_cloud_dataproc_updatemask_cluster_operator]TIMEOUT={"seconds":1*24*60*60}# Jobs definitions# [START how_to_cloud_dataproc_pig_config]PIG_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"pig_job":{"query_list":{"queries":["define sin HiveUDF('sin');"]}},}# [END how_to_cloud_dataproc_pig_config]# [START how_to_cloud_dataproc_sparksql_config]SPARK_SQL_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"spark_sql_job":{"query_list":{"queries":["SHOW DATABASES;"]}},}# [END how_to_cloud_dataproc_sparksql_config]# [START how_to_cloud_dataproc_spark_config]SPARK_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"spark_job":{"jar_file_uris":["file:///usr/lib/spark/examples/jars/spark-examples.jar"],"main_class":"org.apache.spark.examples.SparkPi",},}# [END how_to_cloud_dataproc_spark_config]# [START how_to_cloud_dataproc_pyspark_config]PYSPARK_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"pyspark_job":{"main_python_file_uri":PYSPARK_URI},}# [END how_to_cloud_dataproc_pyspark_config]# [START how_to_cloud_dataproc_sparkr_config]SPARKR_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"spark_r_job":{"main_r_file_uri":SPARKR_URI},}# [END how_to_cloud_dataproc_sparkr_config]# [START how_to_cloud_dataproc_hive_config]HIVE_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"hive_job":{"query_list":{"queries":["SHOW DATABASES;"]}},}# [END how_to_cloud_dataproc_hive_config]# [START how_to_cloud_dataproc_hadoop_config]HADOOP_JOB={"reference":{"project_id":PROJECT_ID},"placement":{"cluster_name":CLUSTER_NAME},"hadoop_job":{"main_jar_file_uri":"file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar","args":["wordcount","gs://pub/shakespeare/rose.txt",OUTPUT_PATH],},}# [END how_to_cloud_dataproc_hadoop_config]WORKFLOW_NAME="airflow-dataproc-test"WORKFLOW_TEMPLATE={"id":WORKFLOW_NAME,"placement":{"managed_cluster":{"cluster_name":CLUSTER_NAME,"config":CLUSTER_CONFIG,}},"jobs":[{"step_id":"pig_job_1","pig_job":PIG_JOB["pig_job"]}],}withmodels.DAG("example_gcp_dataproc",start_date=days_ago(1),schedule_interval=None)asdag:# [START how_to_cloud_dataproc_create_cluster_operator]create_cluster=DataprocCreateClusterOperator(task_id="create_cluster",project_id=PROJECT_ID,cluster_config=CLUSTER_CONFIG,region=REGION,cluster_name=CLUSTER_NAME,)# [END how_to_cloud_dataproc_create_cluster_operator]# [START how_to_cloud_dataproc_update_cluster_operator]scale_cluster=DataprocUpdateClusterOperator(task_id="scale_cluster",cluster_name=CLUSTER_NAME,cluster=CLUSTER_UPDATE,update_mask=UPDATE_MASK,graceful_decommission_timeout=TIMEOUT,project_id=PROJECT_ID,location=REGION,)# [END how_to_cloud_dataproc_update_cluster_operator]# [START how_to_cloud_dataproc_create_workflow_template]create_workflow_template=DataprocCreateWorkflowTemplateOperator(task_id="create_workflow_template",template=WORKFLOW_TEMPLATE,project_id=PROJECT_ID,location=REGION,)# [END how_to_cloud_dataproc_create_workflow_template]# [START how_to_cloud_dataproc_trigger_workflow_template]trigger_workflow=DataprocInstantiateWorkflowTemplateOperator(task_id="trigger_workflow",region=REGION,project_id=PROJECT_ID,template_id=WORKFLOW_NAME)# [END how_to_cloud_dataproc_trigger_workflow_template]pig_task=DataprocSubmitJobOperator(task_id="pig_task",job=PIG_JOB,location=REGION,project_id=PROJECT_ID)spark_sql_task=DataprocSubmitJobOperator(task_id="spark_sql_task",job=SPARK_SQL_JOB,location=REGION,project_id=PROJECT_ID)spark_task=DataprocSubmitJobOperator(task_id="spark_task",job=SPARK_JOB,location=REGION,project_id=PROJECT_ID)# [START cloud_dataproc_async_submit_sensor]spark_task_async=DataprocSubmitJobOperator(task_id="spark_task_async",job=SPARK_JOB,location=REGION,project_id=PROJECT_ID,asynchronous=True)spark_task_async_sensor=DataprocJobSensor(task_id='spark_task_async_sensor_task',location=REGION,project_id=PROJECT_ID,dataproc_job_id=spark_task_async.output,poke_interval=10,)# [END cloud_dataproc_async_submit_sensor]# [START how_to_cloud_dataproc_submit_job_to_cluster_operator]pyspark_task=DataprocSubmitJobOperator(task_id="pyspark_task",job=PYSPARK_JOB,location=REGION,project_id=PROJECT_ID)# [END how_to_cloud_dataproc_submit_job_to_cluster_operator]sparkr_task=DataprocSubmitJobOperator(task_id="sparkr_task",job=SPARKR_JOB,location=REGION,project_id=PROJECT_ID)hive_task=DataprocSubmitJobOperator(task_id="hive_task",job=HIVE_JOB,location=REGION,project_id=PROJECT_ID)hadoop_task=DataprocSubmitJobOperator(task_id="hadoop_task",job=HADOOP_JOB,location=REGION,project_id=PROJECT_ID)# [START how_to_cloud_dataproc_delete_cluster_operator]delete_cluster=DataprocDeleteClusterOperator(task_id="delete_cluster",project_id=PROJECT_ID,cluster_name=CLUSTER_NAME,region=REGION)# [END how_to_cloud_dataproc_delete_cluster_operator]create_cluster>>scale_clusterscale_cluster>>create_workflow_template>>trigger_workflow>>delete_clusterscale_cluster>>hive_task>>delete_clusterscale_cluster>>pig_task>>delete_clusterscale_cluster>>spark_sql_task>>delete_clusterscale_cluster>>spark_task>>delete_clusterscale_cluster>>spark_task_asyncspark_task_async_sensor>>delete_clusterscale_cluster>>pyspark_task>>delete_clusterscale_cluster>>sparkr_task>>delete_clusterscale_cluster>>hadoop_task>>delete_cluster# Task dependency created via `XComArgs`:# spark_task_async >> spark_task_async_sensor