Google Dataprep Operators

Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning. Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing. Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group. For more information about the service visit Google Dataprep API documentation

Before you begin

Before using Dataprep within Airflow you need to authenticate your account with TOKEN. To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.

TOKEN should be added to the Connection in Airflow in JSON format. You can check Managing Connections

The DataprepRunJobGroupOperator will run specified job. Operator required a recipe id. To identify the recipe id please use API documentation for runJobGroup E.g. if the URL is /flows/10?recipe=7, the recipe id is 7. The recipe cannot be created via this operator. It can be created only via UI which is available here. Some of parameters can be override by DAG’s body request. How to do it is shown in example dag.

See following example: Set values for these fields: .. code-block:

Connection Id: "your_conn_id"
Extra: {"token": "TOKEN", "base_url": "https://api.clouddataprep.com"}

Prerequisite Tasks

To use these operators, you must do a few things:

Run Job Group

Operator task is to create a job group, which launches the specified job as the authenticated user. This performs the same action as clicking on the Run Job button in the application.

To get information about jobs within a Cloud Dataprep job use: DataprepRunJobGroupOperator

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

run_job_group_task = DataprepRunJobGroupOperator(
    task_id="run_job_group",
    project_id=GCP_PROJECT_ID,
    body_request={
        "wrangledDataset": {"id": RECIPE_ID},
        "overrides": WRITE_SETTINGS,
    },
)

Get Jobs For Job Group

Operator task is to get information about the batch jobs within a Cloud Dataprep job.

To get information about jobs within a Cloud Dataprep job use: DataprepGetJobsForJobGroupOperator

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

get_jobs_for_job_group_task = DataprepGetJobsForJobGroupOperator(
    task_id="get_jobs_for_job_group",
    job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)

Get Job Group

Operator task is to get the specified job group. A job group is a job that is executed from a specific node in a flow.

To get information about jobs within a Cloud Dataprep job use: DataprepGetJobGroupOperator

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

get_job_group_task = DataprepGetJobGroupOperator(
    task_id="get_job_group",
    project_id=GCP_PROJECT_ID,
    job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
    embed="",
    include_deleted=False,
)

Copy Flow

Operator task is to copy the flow.

To get information about jobs within a Cloud Dataprep job use: DataprepCopyFlowOperator

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

copy_task = DataprepCopyFlowOperator(
    task_id="copy_flow",
    project_id=GCP_PROJECT_ID,
    flow_id=FLOW_ID,
    name=f"dataprep_example_flow_{DAG_ID}_{ENV_ID}",
)

Run Flow

Operator task is to run the flow. A flow is a container for wrangling logic which contains imported datasets, recipe, output objects, and References.

To get information about jobs within a Cloud Dataprep job use: DataprepRunFlowOperator

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

run_flow_task = DataprepRunFlowOperator(
    task_id="run_flow",
    project_id=GCP_PROJECT_ID,
    flow_id="{{ task_instance.xcom_pull('copy_flow')['id'] }}",
    body_request={
        "overrides": {
            RECIPE_NAME: WRITE_SETTINGS,
        },
    },
)

Delete flow

Operator task is to delete the flow. A flow is a container for wrangling logic which contains imported datasets, recipe, output objects, and References.

To get information about jobs within a Cloud Dataprep job use: DataprepDeleteFlowOperator

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

delete_flow_task = DataprepDeleteFlowOperator(
    task_id="delete_flow",
    flow_id="{{ task_instance.xcom_pull('copy_flow')['id'] }}",
)

Check if Job Group is finished

Sensor task is to tell the system when started job group is finished no matter successfully or not. A job group is a job that is executed from a specific node in a flow.

To get information about jobs within a Cloud Dataprep job use: DataprepJobGroupIsFinishedSensor

Example usage:

tests/system/providers/google/cloud/dataprep/example_dataprep.py[source]

check_flow_status_sensor = DataprepJobGroupIsFinishedSensor(
    task_id="check_flow_status",
    job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)

Was this entry helpful?