Amazon EMR Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Overview¶
Airflow to AWS EMR integration provides several operators to create and interact with EMR service.
Two example_dags are provided which showcase these operators in action.
example_emr_job_flow_automatic_steps.py
example_emr_job_flow_manual_steps.py
Note
In order to run the 2 examples successfully, you need to create the IAM Service Roles (EMR_EC2_DefaultRole
and EMR_DefaultRole
) for Amazon EMR.
You can create these roles using the AWS CLI: aws emr create-default-roles
Create EMR Job Flow with automatic steps¶
Purpose¶
This example dag example_emr_job_flow_automatic_steps.py
use EmrCreateJobFlowOperator
to create a new EMR job flow calculating the mathematical constant Pi
, and monitor the progress
with EmrJobFlowSensor
. The cluster will be terminated automatically after finishing the steps.
JobFlow configuration¶
To create a job flow at EMR, you need to specify the configuration for the EMR cluster:
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'PiCalc',
'ReleaseLabel': 'emr-5.29.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.medium',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
Here we create a EMR single-node Cluster PiCalc. It only has a single step calculate_pi which calculates the value of Pi
using Spark.
The config 'KeepJobFlowAliveWhenNoSteps': False
tells the cluster to shut down after the step is finished.
For more config information, please refer to Boto3 EMR client.
Defining tasks¶
In the following code we are creating a new job flow, add a step, monitor the step, and then terminate the cluster.
job_flow_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
job_sensor = EmrJobFlowSensor(
task_id='check_job_flow',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
)
job_flow_creator >> job_sensor
Create EMR Job Flow with manual steps¶
Purpose¶
This example dag example_emr_job_flow_manual_steps.py
is similar to the previous one except that instead of adding job flow step during cluster creation,
we add the step after the cluster is created. And the cluster is manually terminated at the end.
This allows for further customization on how you want to run your jobs.
JobFlow configuration¶
The configuration is similar to the previous example, except that we set 'KeepJobFlowAliveWhenNoSteps': True
because we will terminate the cluster manually.
Also, we would not specify Steps
in the config when creating the cluster.
Defining tasks¶
Here is the task definitions for our DAG.
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover