Google DataFusion Operators

Cloud Data Fusion is a fully managed, cloud-native data integration service that helps users efficiently build and manage ETL/ELT data pipelines. With a graphical interface and a broad open source library of preconfigured connectors and transformations, Cloud Data Fusion shifts an organization’s focus away from code and integration to insights and action.

Prerequisite Tasks

Restart DataFusion Instance

To restart Data Fusion instance use: CloudDataFusionRestartInstanceOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

You can use Jinja templating with instance_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Delete DataFusion Instance

To delete Data Fusion instance use: CloudDataFusionDeleteInstanceOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
)

You can use Jinja templating with instance_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Create DataFusion Instance

To create Data Fusion instance use: CloudDataFusionCreateInstanceOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

You can use Jinja templating with instance_name, instance, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Update DataFusion Instance

To update Data Fusion instance use: CloudDataFusionUpdateInstanceOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

You can use Jinja templating with instance_name, instance, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Get DataFusion Instance

To retrieve Data Fusion instance use: CloudDataFusionGetInstanceOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

You can use Jinja templating with instance_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Create a DataFusion pipeline

To create Data Fusion pipeline use: CloudDataFusionCreatePipelineOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

You can use Jinja templating with instance_name, pipeline_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Start a DataFusion pipeline

To start Data Fusion pipeline using synchronous mode: CloudDataFusionStartPipelineOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline",
)

To start Data Fusion pipeline using asynchronous mode: CloudDataFusionStartPipelineOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

You can use Jinja templating with instance_name, pipeline_name, runtime_args, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Stop a DataFusion pipeline

To stop Data Fusion pipeline use: CloudDataFusionStopPipelineOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

You can use Jinja templating with instance_name, pipeline_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Delete a DataFusion pipeline

To delete Data Fusion pipeline use: CloudDataFusionDeletePipelineOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
)

You can use Jinja templating with instance_name, version_id, pipeline_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

List DataFusion pipelines

To list Data Fusion pipelines use: CloudDataFusionListPipelinesOperator.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

You can use Jinja templating with instance_name, artifact_name, artifact_version, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Sensors

When start pipeline is triggered asynchronously sensors may be used to run checks and verify that the pipeline in in correct state.

CloudDataFusionPipelineStateSensor.

airflow/providers/google/cloud/example_dags/example_datafusion.pyView Source

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

Was this entry helpful?