Google DataFusion Operators¶
Cloud Data Fusion is a fully managed, cloud-native data integration service that helps users efficiently build and manage ETL/ELT data pipelines. With a graphical interface and a broad open source library of preconfigured connectors and transformations, Cloud Data Fusion shifts an organization’s focus away from code and integration to insights and action.
Prerequisite Tasks¶
Restart DataFusion Instance¶
To restart Data Fusion instance use:
CloudDataFusionRestartInstanceOperator
.
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
You can use Jinja templating with
instance_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Delete DataFusion Instance¶
To delete Data Fusion instance use:
CloudDataFusionDeleteInstanceOperator
.
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
)
You can use Jinja templating with
instance_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Create DataFusion Instance¶
To create Data Fusion instance use:
CloudDataFusionCreateInstanceOperator
.
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
You can use Jinja templating with
instance_name
, instance
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Update DataFusion Instance¶
To update Data Fusion instance use:
CloudDataFusionUpdateInstanceOperator
.
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
You can use Jinja templating with
instance_name
, instance
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Get DataFusion Instance¶
To retrieve Data Fusion instance use:
CloudDataFusionGetInstanceOperator
.
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
You can use Jinja templating with
instance_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Create a DataFusion pipeline¶
To create Data Fusion pipeline use:
CloudDataFusionCreatePipelineOperator
.
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
You can use Jinja templating with
instance_name
, pipeline_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Start a DataFusion pipeline¶
To start Data Fusion pipeline using synchronous mode:
CloudDataFusionStartPipelineOperator
.
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline",
)
To start Data Fusion pipeline using asynchronous mode:
CloudDataFusionStartPipelineOperator
.
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
There is a possibility to start Data Fusion pipeline asynchronously using deferrable mode. While asynchronous
parameter gives a possibility to wait until DataFusion pipeline reaches terminate state using synchronous
sleep() method, deferrable mode checks for the state using asynchronous calls.
It is not possible to use both asynchronous and deferrable parameters at the same time.
Please, check the example of using deferrable mode:
CloudDataFusionStartPipelineOperator
.
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
deferrable=True,
task_id="start_pipeline_def",
)
You can use Jinja templating with
instance_name
, pipeline_name
, runtime_args
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Stop a DataFusion pipeline¶
To stop Data Fusion pipeline use:
CloudDataFusionStopPipelineOperator
.
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
You can use Jinja templating with
instance_name
, pipeline_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Delete a DataFusion pipeline¶
To delete Data Fusion pipeline use:
CloudDataFusionDeletePipelineOperator
.
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
)
You can use Jinja templating with
instance_name
, version_id
, pipeline_name
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
List DataFusion pipelines¶
To list Data Fusion pipelines use:
CloudDataFusionListPipelinesOperator
.
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
You can use Jinja templating with
instance_name
, artifact_name
, artifact_version
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
Sensors¶
When start pipeline is triggered asynchronously sensors may be used to run checks and verify that the pipeline is in correct state.
CloudDataFusionPipelineStateSensor
.
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)