Google DataFusion Operators¶
Cloud Data Fusion is a fully managed, cloud-native data integration service that helps users efficiently build and manage ETL/ELT data pipelines. With a graphical interface and a broad open source library of preconfigured connectors and transformations, Cloud Data Fusion shifts an organization’s focus away from code and integration to insights and action.
Prerequisite Tasks¶
Restart DataFusion Instance¶
To restart Data Fusion instance use:
CloudDataFusionRestartInstanceOperator
.
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
You can use Jinja templating with
instance_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Delete DataFusion Instance¶
To delete Data Fusion instance use:
CloudDataFusionDeleteInstanceOperator
.
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
)
You can use Jinja templating with
instance_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Create DataFusion Instance¶
To create Data Fusion instance use:
CloudDataFusionCreateInstanceOperator
.
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
You can use Jinja templating with
instance_name
, instance
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Update DataFusion Instance¶
To update Data Fusion instance use:
CloudDataFusionUpdateInstanceOperator
.
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
You can use Jinja templating with
instance_name
, instance
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Get DataFusion Instance¶
To retrieve Data Fusion instance use:
CloudDataFusionGetInstanceOperator
.
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
You can use Jinja templating with
instance_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Create a DataFusion pipeline¶
To create Data Fusion pipeline use:
CloudDataFusionCreatePipelineOperator
.
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
You can use Jinja templating with
instance_name
, pipeline_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Start a DataFusion pipeline¶
To start Data Fusion pipeline using synchronous mode:
CloudDataFusionStartPipelineOperator
.
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline",
)
To start Data Fusion pipeline using asynchronous mode:
CloudDataFusionStartPipelineOperator
.
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
You can use Jinja templating with
instance_name
, pipeline_name
, runtime_args
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Stop a DataFusion pipeline¶
To stop Data Fusion pipeline use:
CloudDataFusionStopPipelineOperator
.
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
You can use Jinja templating with
instance_name
, pipeline_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Delete a DataFusion pipeline¶
To delete Data Fusion pipeline use:
CloudDataFusionDeletePipelineOperator
.
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
)
You can use Jinja templating with
instance_name
, version_id
, pipeline_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
List DataFusion pipelines¶
To list Data Fusion pipelines use:
CloudDataFusionListPipelinesOperator
.
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
You can use Jinja templating with
instance_name
, artifact_name
, artifact_version
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Sensors¶
When start pipeline is triggered asynchronously sensors may be used to run checks and verify that the pipeline in in correct state.
CloudDataFusionPipelineStateSensor
.
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)