Amazon EMR¶
Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. Using these frameworks and related open-source projects, you can process data for analytics purposes and business intelligence workloads. Amazon EMR also lets you transform and move large amounts of data into and out of other AWS data stores and databases, such as Amazon Simple Storage Service (Amazon S3) and Amazon DynamoDB.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Generic Parameters¶
- aws_conn_id
- Reference to Amazon Web Services Connection ID. If this parameter is set to - Nonethen the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default:- aws_default
- region_name
- AWS Region Name. If this parameter is set to - Noneor omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None
- verify
- Whether or not to verify SSL certificates. - False- Do not validate SSL certificates.
- path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. 
 - If this parameter is set to - Noneor is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None
- botocore_config
- The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc. Example, for more detail about parameters please have a look botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - If this parameter is set to - Noneor omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None- Note - Specifying an empty dictionary, - {}, will overwrite the connection configuration for botocore.config.Config
Operators¶
Note
In order to run the examples successfully, you need to create the IAM Service
Roles(EMR_EC2_DefaultRole and EMR_DefaultRole) for Amazon EMR.  You can
create these roles using the AWS CLI: aws emr create-default-roles.
Create an EMR job flow¶
You can use EmrCreateJobFlowOperator to
create a new EMR job flow.  The cluster will be terminated automatically after finishing the steps.
The default behaviour is to mark the DAG Task node as success as soon as the cluster is launched
(wait_policy=None).
It is possible to modify this behaviour by using a different wait_policy. Available options are:
- WaitPolicy.WAIT_FOR_COMPLETION- DAG Task node waits for the cluster to be running
- WaitPolicy.WAIT_FOR_STEPS_COMPLETION- DAG Task node waits for the cluster to terminate
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
Using deferrable mode will release worker slots and leads to efficient utilization of
resources within Airflow cluster.However this mode will need the Airflow triggerer to be
available in your deployment.
JobFlow configuration¶
To create a job flow on EMR, you need to specify the configuration for the EMR cluster:
SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]
JOB_FLOW_OVERRIDES: dict[str, Any] = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-7.1.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        # If the EMR steps complete too quickly the cluster will be torn down before the other system test
        # tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
        # Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}
Here we create an EMR single-node Cluster PiCalc. It only has a single step calculate_pi which
calculates the value of Pi using Spark.  The config 'KeepJobFlowAliveWhenNoSteps': False
tells the cluster to shut down after the step is finished.  Alternatively, a config without a Steps
value can be used and Steps can be added at a later date using
EmrAddStepsOperator.  See details below.
Note
EMR clusters launched with the EMR API like this one are not visible to all users by default, so
you may not see the cluster in the EMR Management Console - you can change this by adding
'VisibleToAllUsers': True at the end of the JOB_FLOW_OVERRIDES dict.
For more config information, please refer to Boto3 EMR client.
Create the Job Flow¶
In the following code we are creating a new job flow using the configuration as explained above.
create_job_flow = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)
Add Steps to an EMR job flow¶
To add steps to an existing EMR Job flow you can use
EmrAddStepsOperator.
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
Using deferrable mode will release worker slots and leads to efficient utilization of
resources within Airflow cluster.However this mode will need the Airflow triggerer to be
available in your deployment.
add_steps = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id=create_job_flow.output,
    steps=SPARK_STEPS,
    execution_role_arn=execution_role_arn,
)
Terminate an EMR job flow¶
To terminate an EMR Job Flow you can use
EmrTerminateJobFlowOperator.
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
Using deferrable mode will release worker slots and leads to efficient utilization of
resources within Airflow cluster.However this mode will need the Airflow triggerer to be
available in your deployment.
remove_cluster = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id=create_job_flow.output,
)
Modify Amazon EMR container¶
To modify an existing EMR container you can use
EmrContainerSensor.
modify_cluster = EmrModifyClusterOperator(
    task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)
Start an EMR notebook execution¶
You can use EmrStartNotebookExecutionOperator to
start a notebook execution on an existing notebook attached to a running cluster.
start_execution = EmrStartNotebookExecutionOperator(
    task_id="start_execution",
    editor_id=editor_id,
    cluster_id=cluster_id,
    relative_path="EMR-System-Test.ipynb",
    service_role="EMR_Notebooks_DefaultRole",
)
Stop an EMR notebook execution¶
You can use EmrStopNotebookExecutionOperator to
stop a running notebook execution.
stop_execution = EmrStopNotebookExecutionOperator(
    task_id="stop_execution",
    notebook_execution_id=notebook_execution_id_1,
)
Sensors¶
Wait on an EMR notebook execution state¶
To monitor the state of an EMR notebook execution you can use
EmrNotebookExecutionSensor.
wait_for_execution_start = EmrNotebookExecutionSensor(
    task_id="wait_for_execution_start",
    notebook_execution_id=notebook_execution_id_1,
    target_states={"RUNNING"},
    poke_interval=5,
)
Wait on an Amazon EMR job flow state¶
To monitor the state of an EMR job flow you can use
EmrJobFlowSensor.
check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)
Wait on an Amazon EMR step state¶
To monitor the state of an EMR job step you can use
EmrStepSensor.
wait_for_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id=create_job_flow.output,
    step_id=get_step_id(add_steps.output),
)
Throttling¶
Amazon EMR has relatively low service quotas, which can be viewed in detail here. As a consequence, you might experience throttling issues when using any of the operators and sensors listed in this page. To circumvent this limitation, consider customizing the AWS connection configuration to modify the default Boto3 retry strategy. See AWS connection configuration documentation.