Google Cloud Data Pipelines Operators¶
Data Pipelines is a Dataflow feature that allows customers to create and schedule recurring jobs, view aggregated job metrics, and define and manage job SLOs. A pipeline consists of a collection of jobs including ways to manage them. A pipeline may be associated with a Dataflow Template (classic/flex) and include all jobs launched with the associated template.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Creating a Data Pipeline¶
To create a new Data Pipelines instance using a request body and parent name, use CreateDataPipelineOperator.
The operator accesses Google Cloud’s Data Pipelines API and calls upon the
create method
to run the given pipeline.
- CreateDataPipelineOperatoraccepts four parameters:
- body: instance of the Pipeline, project_id: id of the GCP project that owns the job, location: destination for the Pipeline, gcp_conn_id: id to connect to Google Cloud. 
The request body and project id need to be passed each time, while the GCP connection id and location have default values. The project id and location will be used to build the parent name needed to create the operator.
Here is an example of how you can create a Data Pipelines instance by running the above parameters with CreateDataPipelineOperator:
create_data_pipeline = CreateDataPipelineOperator(
    task_id="create_data_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": DATAPIPELINES_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)
Running a Data Pipeline¶
To run a Data Pipelines instance, use RunDataPipelineOperator.
The operator accesses Google Cloud’s Data Pipelines API and calls upon the
run method
to run the given pipeline.
RunDataPipelineOperator can take in four parameters:
- data_pipeline_name: the name of the Data Pipelines instance
- project_id: the ID of the GCP project that owns the job
- location: the location of the Data Pipelines instance
- gcp_conn_id: the connection ID to connect to the Google Cloud Platform
Only the Data Pipeline name and Project ID are required parameters, as the Location and GCP Connection ID have default values. The Project ID and Location will be used to build the parent name, which is where the given Data Pipeline should be located.
You can run a Data Pipelines instance by running the above parameters with RunDataPipelineOperator:
run_data_pipeline = RunDataPipelineOperator(
    task_id="run_data_pipeline",
    data_pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)
Once called, the RunDataPipelineOperator will return the Google Cloud Dataflow Job created by running the given pipeline.
For further information regarding the API usage, see Data Pipelines API REST Resource in the Google Cloud documentation.
