airflow.providers.amazon.aws.operators.emr

Module Contents

Classes

EmrAddStepsOperator

An operator that adds steps to an existing EMR job_flow.

EmrContainerOperator

An operator that submits jobs to EMR on EKS virtual clusters.

EmrClusterLink

Operator link for EmrCreateJobFlowOperator. It allows users to access the EMR Cluster

EmrCreateJobFlowOperator

Creates an EMR JobFlow, reading the config from the EMR connection.

EmrModifyClusterOperator

An operator that modifies an existing EMR cluster.

EmrTerminateJobFlowOperator

Operator to terminate EMR JobFlows.

class airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator(*, job_flow_id: Optional[str] = None, job_flow_name: Optional[str] = None, cluster_states: Optional[List[str]] = None, aws_conn_id: str = 'aws_default', steps: Optional[Union[List[dict], str]] = None, **kwargs)[source]

Bases: airflow.models.BaseOperator

An operator that adds steps to an existing EMR job_flow.

Parameters
  • job_flow_id (Optional[str]) -- id of the JobFlow to add steps to. (templated)

  • job_flow_name (Optional[str]) -- name of the JobFlow to add steps to. Use as an alternative to passing job_flow_id. will search for id of JobFlow with matching name in one of the states in param cluster_states. Exactly one cluster like this should exist or will fail. (templated)

  • cluster_states (list) -- Acceptable cluster states when searching for JobFlow id by job_flow_name. (templated)

  • aws_conn_id (str) -- aws connection to uses

  • steps (list|str) -- boto3 style steps or reference to a steps file (must be '.json') to be added to the jobflow. (templated)

  • do_xcom_push (bool) -- if True, job_flow_id is pushed to XCom with key job_flow_id.

template_fields :Sequence[str] = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps'][source]
template_ext :Sequence[str] = ['.json'][source]
template_fields_renderers[source]
ui_color = #f9c915[source]
execute(self, context: airflow.utils.context.Context) List[str][source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.emr.EmrContainerOperator(*, name: str, virtual_cluster_id: str, execution_role_arn: str, release_label: str, job_driver: dict, configuration_overrides: Optional[dict] = None, client_request_token: Optional[str] = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, max_tries: Optional[int] = None, **kwargs: Any)[source]

Bases: airflow.models.BaseOperator

An operator that submits jobs to EMR on EKS virtual clusters.

Parameters
  • name (str) -- The name of the job run.

  • virtual_cluster_id (str) -- The EMR on EKS virtual cluster ID

  • execution_role_arn (str) -- The IAM role ARN associated with the job run.

  • release_label (str) -- The Amazon EMR release version to use for the job run.

  • job_driver (dict) -- Job configuration details, e.g. the Spark job parameters.

  • configuration_overrides (dict) -- The configuration overrides for the job run, specifically either application configuration or monitoring configuration.

  • client_request_token (str) -- The client idempotency token of the job run request. Use this if you want to specify a unique ID to prevent two jobs from getting started. If no token is provided, a UUIDv4 token will be generated for you.

  • aws_conn_id (str) -- The Airflow connection used for AWS credentials.

  • poll_interval (int) -- Time (in seconds) to wait between two consecutive calls to check query status on EMR

  • max_tries (int) -- Maximum number of times to wait for the job run to finish. Defaults to None, which will poll until the job is not in a pending, submitted, or running state.

template_fields :Sequence[str] = ['name', 'virtual_cluster_id', 'execution_role_arn', 'release_label', 'job_driver'][source]
ui_color = #f9c915[source]
hook(self) airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]

Create and return an EmrContainerHook.

execute(self, context: airflow.utils.context.Context) Optional[str][source]

Run job on EMR Containers

on_kill(self) None[source]

Cancel the submitted job run

Bases: airflow.models.BaseOperatorLink

Operator link for EmrCreateJobFlowOperator. It allows users to access the EMR Cluster

name = EMR Cluster[source]

Get link to EMR cluster.

Parameters
  • operator -- operator

  • dttm -- datetime

Returns

url link

class airflow.providers.amazon.aws.operators.emr.EmrCreateJobFlowOperator(*, aws_conn_id: str = 'aws_default', emr_conn_id: str = 'emr_default', job_flow_overrides: Optional[Union[str, Dict[str, Any]]] = None, region_name: Optional[str] = None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection.

Parameters
  • aws_conn_id (str) -- aws connection to uses

  • emr_conn_id (str) -- emr connection to use

  • job_flow_overrides (dict|str) -- boto3 style arguments or reference to an arguments file (must be '.json') to override emr_connection extra. (templated)

  • region_name (Optional[str]) -- Region named passed to EmrHook

template_fields :Sequence[str] = ['job_flow_overrides'][source]
template_ext :Sequence[str] = ['.json'][source]
template_fields_renderers[source]
ui_color = #f9c915[source]
execute(self, context: airflow.utils.context.Context) str[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator(*, cluster_id: str, step_concurrency_level: int, aws_conn_id: str = 'aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

An operator that modifies an existing EMR cluster. :param cluster_id: cluster identifier :type cluster_id: str :param step_concurrency_level: Concurrency of the cluster :type step_concurrency_level: int :param aws_conn_id: aws connection to uses :type aws_conn_id: str :param do_xcom_push: if True, cluster_id is pushed to XCom with key cluster_id. :type do_xcom_push: bool

template_fields :Sequence[str] = ['cluster_id', 'step_concurrency_level'][source]
template_ext :Sequence[str] = [][source]
ui_color = #f9c915[source]
execute(self, context: airflow.utils.context.Context) int[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator(*, job_flow_id: str, aws_conn_id: str = 'aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator to terminate EMR JobFlows.

Parameters
  • job_flow_id (str) -- id of the JobFlow to terminate. (templated)

  • aws_conn_id (str) -- aws connection to uses

template_fields :Sequence[str] = ['job_flow_id'][source]
template_ext :Sequence[str] = [][source]
ui_color = #f9c915[source]
execute(self, context: airflow.utils.context.Context) None[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?