Amazon EMR Operators

Prerequisite Tasks

To use these operators, you must do a few things:

Overview

Airflow to AWS EMR integration provides several operators to create and interact with EMR service.

Two example_dags are provided which showcase these operators in action.

  • example_emr_job_flow_automatic_steps.py

  • example_emr_job_flow_manual_steps.py

Note

In order to run the 2 examples successfully, you need to create the IAM Service Roles (EMR_EC2_DefaultRole and EMR_DefaultRole) for Amazon EMR.

You can create these roles using the AWS CLI: aws emr create-default-roles

Create EMR Job Flow with automatic steps

Purpose

This example dag example_emr_job_flow_automatic_steps.py uses EmrCreateJobFlowOperator to create a new EMR job flow calculating the mathematical constant Pi, and monitors the progress with EmrJobFlowSensor. The cluster will be terminated automatically after finishing the steps.

JobFlow configuration

To create a job flow at EMR, you need to specify the configuration for the EMR cluster:

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.pyView Source

SPARK_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
        },
    }
]

JOB_FLOW_OVERRIDES = {
    'Name': 'PiCalc',
    'ReleaseLabel': 'emr-5.29.0',
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'SPOT',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm1.medium',
                'InstanceCount': 1,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
    },
    'Steps': SPARK_STEPS,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
}

Here we create a EMR single-node Cluster PiCalc. It only has a single step calculate_pi which calculates the value of Pi using Spark. The config 'KeepJobFlowAliveWhenNoSteps': False tells the cluster to shut down after the step is finished. For more config information, please refer to Boto3 EMR client.

Defining tasks

In the following code we are creating a new job flow, add a step, monitor the step, and then terminate the cluster.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.pyView Source

    job_flow_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    job_sensor = EmrJobFlowSensor(
        task_id='check_job_flow',
        job_flow_id=job_flow_creator.output,
        aws_conn_id='aws_default',
    )

Create EMR Job Flow with manual steps

Purpose

This example dag example_emr_job_flow_manual_steps.py is similar to the previous one except that instead of adding job flow step during cluster creation, we add the step after the cluster is created. And the cluster is manually terminated at the end. This allows for further customization on how you want to run your jobs.

JobFlow configuration

The configuration is similar to the previous example, except that we set 'KeepJobFlowAliveWhenNoSteps': True because we will terminate the cluster manually. Also, we would not specify Steps in the config when creating the cluster.

Defining tasks

Here is the task definitions for our DAG.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.pyView Source

    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
        steps=SPARK_STEPS,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id=cluster_creator.output,
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
    )

    step_adder >> step_checker >> cluster_remover

Was this entry helpful?