Amazon EMR Serverless Operators¶
Amazon EMR Serverless is a serverless option in Amazon EMR that makes it easy for data analysts and engineers to run open-source big data analytics frameworks without configuring, managing, and scaling clusters or servers. You get all the features and benefits of Amazon EMR without the need for experts to plan and manage clusters.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation
Operators¶
Create an EMR Serverless Application¶
You can use EmrServerlessCreateApplicationOperator
to
create a new EMR Serverless Application.
emr_serverless_app = EmrServerlessCreateApplicationOperator(
task_id="create_emr_serverless_task",
release_label="emr-6.6.0",
job_type="SPARK",
config={"name": "new_application"},
)
Start an EMR Serverless Job¶
You can use EmrServerlessStartJobOperator
to
start an EMR Serverless Job.
start_job = EmrServerlessStartJobOperator(
task_id="start_emr_serverless_job",
application_id=emr_serverless_app_id,
execution_role_arn=role_arn,
job_driver=SPARK_JOB_DRIVER,
configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
)
Stop an EMR Serverless Application¶
You can use EmrServerlessStopApplicationOperator
to
stop an EMR Serverless Application.
stop_app = EmrServerlessStopApplicationOperator(
task_id="stop_application",
application_id=emr_serverless_app_id,
force_stop=True,
)
Delete an EMR Serverless Application¶
You can use EmrServerlessDeleteApplicationOperator
to
delete an EMR Serverless Application.
delete_app = EmrServerlessDeleteApplicationOperator(
task_id="delete_application",
application_id=emr_serverless_app_id,
)
Sensors¶
Wait on an EMR Serverless Job state¶
To monitor the state of an EMR Serverless Job you can use
EmrServerlessJobSensor
.
wait_for_job = EmrServerlessJobSensor(
task_id="wait_for_job",
application_id=emr_serverless_app_id,
job_run_id=start_job.output,
# the default is to wait for job completion, here we just wait for the job to be running.
target_states={"RUNNING"},
)
Wait on an EMR Serverless Application state¶
To monitor the state of an EMR Serverless Application you can use
EmrServerlessApplicationSensor
.
wait_for_app_creation = EmrServerlessApplicationSensor(
task_id="wait_for_app_creation",
application_id=emr_serverless_app_id,
)