Amazon EMR Operators

Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. Using these frameworks and related open-source projects, you can process data for analytics purposes and business intelligence workloads. Amazon EMR also lets you transform and move large amounts of data into and out of other AWS data stores and databases, such as Amazon Simple Storage Service (Amazon S3) and Amazon DynamoDB.

Prerequisite Tasks

To use these operators, you must do a few things:

Note

In order to run the two examples successfully, you need to create the IAM Service Roles(EMR_EC2_DefaultRole and EMR_DefaultRole) for Amazon EMR. You can create these roles using the AWS CLI: aws emr create-default-roles.

Create EMR Job Flow

You can use EmrCreateJobFlowOperator to create a new EMR job flow. The cluster will be terminated automatically after finishing the steps.

JobFlow configuration

To create a job flow on EMR, you need to specify the configuration for the EMR cluster:

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py[source]

SPARK_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
        },
    }
]

JOB_FLOW_OVERRIDES = {
    'Name': 'PiCalc',
    'ReleaseLabel': 'emr-5.29.0',
    'Applications': [{'Name': 'Spark'}],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Primary node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
    },
    'Steps': SPARK_STEPS,
    'JobFlowRole': JOB_FLOW_ROLE,
    'ServiceRole': SERVICE_ROLE,
}

Here we create an EMR single-node Cluster PiCalc. It only has a single step calculate_pi which calculates the value of Pi using Spark. The config 'KeepJobFlowAliveWhenNoSteps': False tells the cluster to shut down after the step is finished. Alternatively, a config without a Steps value can be used and Steps can be added at a later date using EmrAddStepsOperator. See details below.

Note

EMR clusters launched with the EMR API like this one are not visible to all users by default, so you may not see the cluster in the EMR Management Console - you can change this by adding 'VisibleToAllUsers': True at the end of the JOB_FLOW_OVERRIDES dict.

For more config information, please refer to Boto3 EMR client.

Create the Job Flow

In the following code we are creating a new job flow using the configuration as explained above.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py[source]

job_flow_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)

Add Steps to an EMR Job Flow

To add Steps to an existing EMR Job Flow you can use EmrAddStepsOperator.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py[source]

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id=cluster_creator.output,
    steps=SPARK_STEPS,
)

Terminate an EMR Job Flow

To terminate an EMR Job Flow you can use EmrTerminateJobFlowOperator.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py[source]

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id=cluster_creator.output,
)

Modify Amazon EMR Container

To modify an existing EMR Container you can use EmrContainerSensor.

Amazon EMR Container Sensor

To monitor the state of an EMR Container you can use EmrContainerSensor.

Amazon EMR Job Flow Sensor

To monitor the state of an EMR Job Flow you can use EmrJobFlowSensor.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py[source]

job_sensor = EmrJobFlowSensor(
    task_id='check_job_flow',
    job_flow_id=job_flow_creator.output,
)

Amazon EMR Step Sensor

To monitor the state of a Step running an existing EMR Job Flow you can use EmrStepSensor.

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py[source]

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id=cluster_creator.output,
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
)

Was this entry helpful?