airflow.providers.google.cloud.hooks.dataflow

This module contains a Google Dataflow Hook.

Module Contents

Classes

DataflowJobStatus

Helper class with Dataflow job statuses.

DataflowJobType

Helper class with Dataflow job types.

DataflowHook

Hook for Google Dataflow.

AsyncDataflowHook

Async hook class for dataflow service.

Functions

process_line_and_extract_dataflow_job_id_callback(...)

Build callback that triggers the specified function.

Attributes

DEFAULT_DATAFLOW_LOCATION

JOB_ID_PATTERN

T

airflow.providers.google.cloud.hooks.dataflow.DEFAULT_DATAFLOW_LOCATION = 'us-central1'[source]
airflow.providers.google.cloud.hooks.dataflow.JOB_ID_PATTERN[source]
airflow.providers.google.cloud.hooks.dataflow.T[source]
airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[source]

Build callback that triggers the specified function.

The returned callback is intended to be used as process_line_callback in BeamCommandRunner.

Parameters

on_new_job_id_callback (Callable[[str], None] | None) – Callback called when the job ID is known

class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[source]

Helper class with Dataflow job statuses.

Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

JOB_STATE_DONE = 'JOB_STATE_DONE'[source]
JOB_STATE_UNKNOWN = 'JOB_STATE_UNKNOWN'[source]
JOB_STATE_STOPPED = 'JOB_STATE_STOPPED'[source]
JOB_STATE_RUNNING = 'JOB_STATE_RUNNING'[source]
JOB_STATE_FAILED = 'JOB_STATE_FAILED'[source]
JOB_STATE_CANCELLED = 'JOB_STATE_CANCELLED'[source]
JOB_STATE_UPDATED = 'JOB_STATE_UPDATED'[source]
JOB_STATE_DRAINING = 'JOB_STATE_DRAINING'[source]
JOB_STATE_DRAINED = 'JOB_STATE_DRAINED'[source]
JOB_STATE_PENDING = 'JOB_STATE_PENDING'[source]
JOB_STATE_CANCELLING = 'JOB_STATE_CANCELLING'[source]
JOB_STATE_QUEUED = 'JOB_STATE_QUEUED'[source]
FAILED_END_STATES[source]
SUCCEEDED_END_STATES[source]
TERMINAL_STATES[source]
AWAITING_STATES[source]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobType[source]

Helper class with Dataflow job types.

JOB_TYPE_UNKNOWN = 'JOB_TYPE_UNKNOWN'[source]
JOB_TYPE_BATCH = 'JOB_TYPE_BATCH'[source]
JOB_TYPE_STREAMING = 'JOB_TYPE_STREAMING'[source]
class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[source]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Hook for Google Dataflow.

All the methods in the hook where project_id is used must be called with keyword arguments rather than positional.

get_conn()[source]

Return a Google Cloud Dataflow service object.

start_java_dataflow(job_name, variables, jar, project_id, job_class=None, append_job_name=True, multiple_jobs=False, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

Start Dataflow java job.

Parameters
  • job_name (str) – The name of the job.

  • variables (dict) – Variables passed to the job.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • jar (str) – Name of the jar for the job

  • job_class (str | None) – Name of the java class for the job.

  • append_job_name (bool) – True if unique suffix has to be appended to job name.

  • multiple_jobs (bool) – True if to check for multiple job in dataflow

  • on_new_job_id_callback (Callable[[str], None] | None) – Callback called when the job ID is known.

  • location (str) – Job location.

start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[source]

Launch a Dataflow job with a Classic Template and wait for its completion.

Parameters
  • job_name (str) – The name of the job.

  • variables (dict) –

    Map of job runtime environment options. It will update environment argument if passed.

    See also

    For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – Parameters for the template

  • dataflow_template (str) – GCS path to the template.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • append_job_name (bool) – True if unique suffix has to be appended to job name.

  • on_new_job_id_callback (Callable[[str], None] | None) – (Deprecated) Callback called when the Job is known.

  • on_new_job_callback (Callable[[dict], None] | None) – Callback called when the Job is known.

  • location (str) –

    Job location.

    See also

    For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params

launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[source]

Launch a Dataflow job with a Classic Template and exit without waiting for its completion.

Parameters
  • job_name (str) – The name of the job.

  • variables (dict) –

    Map of job runtime environment options. It will update environment argument if passed.

    See also

    For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – Parameters for the template

  • dataflow_template (str) – GCS path to the template.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • append_job_name (bool) – True if unique suffix has to be appended to job name.

  • location (str) –

    Job location.

    See also

    For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params

Returns

the Dataflow job response

Return type

dict[str, str]

send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[source]
start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[source]

Launch a Dataflow job with a Flex Template and wait for its completion.

Parameters
Returns

the Job

Return type

dict[str, str]

launch_job_with_flex_template(body, location, project_id)[source]

Launch a Dataflow Job with a Flex Template and exit without waiting for the job completion.

Parameters
Returns

a Dataflow job response

Return type

dict[str, str]

static extract_job_id(job)[source]
start_python_dataflow(job_name, variables, dataflow, py_options, project_id, py_interpreter='python3', py_requirements=None, py_system_site_packages=False, append_job_name=True, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

Start Dataflow job.

Parameters
  • job_name (str) – The name of the job.

  • variables (dict) – Variables passed to the job.

  • dataflow (str) – Name of the Dataflow process.

  • py_options (list[str]) – Additional options.

  • project_id (str) – The ID of the GCP project that owns the job. If set to None or missing, the default project_id from the GCP connection is used.

  • py_interpreter (str) – Python version of the beam pipeline. If None, this defaults to the python3. To track python versions supported by beam and related issues check: https://issues.apache.org/jira/browse/BEAM-1251

  • py_requirements (list[str] | None) –

    Additional python package(s) to install. If a value is passed to this parameter, a new virtual environment has been created with additional packages installed.

    You could also install the apache-beam package if it is not installed on your system or you want to use a different version.

  • py_system_site_packages (bool) –

    Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information.

    This option is only relevant if the py_requirements parameter is not None.

  • append_job_name (bool) – True if unique suffix has to be appended to job name.

  • project_id – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • on_new_job_id_callback (Callable[[str], None] | None) – Callback called when the job ID is known.

  • location (str) – Job location.

static build_dataflow_job_name(job_name, append_job_name=True)[source]

Build Dataflow job name.

is_job_dataflow_running(name, project_id, location=DEFAULT_DATAFLOW_LOCATION, variables=None)[source]

Check if jos is still running in dataflow.

Parameters
  • name (str) – The name of the job.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) – Job location.

Returns

True if job is running.

Return type

bool

cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

Cancel the job with the specified name prefix or Job ID.

Parameter name and job_id are mutually exclusive.

Parameters
  • job_name (str | None) – Name prefix specifying which jobs are to be canceled.

  • job_id (str | None) – Job ID specifying which jobs are to be canceled.

  • location (str) – Job location.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[source]

Start Dataflow SQL query.

Parameters
  • job_name (str) – The unique name to assign to the Cloud Dataflow job.

  • query (str) – The SQL query to execute.

  • options (dict[str, Any]) – Job parameters to be executed. For more information, look at: https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query command reference

  • location (str) – The location of the Dataflow job (for example europe-west1)

  • project_id (str) – The ID of the GCP project that owns the job. If set to None or missing, the default project_id from the GCP connection is used.

  • on_new_job_id_callback (Callable[[str], None] | None) – (Deprecated) Callback called when the job ID is known.

  • on_new_job_callback (Callable[[dict], None] | None) – Callback called when the job is known.

Returns

the new job object

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[source]

Get the job with the specified Job ID.

Parameters
Returns

the Job

Return type

dict

fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

Get the job metrics with the specified Job ID.

Parameters
Returns

the JobMetrics. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

Return type

dict

fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

Get the job messages with the specified Job ID.

Parameters
  • job_id (str) – Job ID to get.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) – Job location.

Returns

the list of JobMessages. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

Return type

list[dict]

fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

Get the job autoscaling events with the specified Job ID.

Parameters
  • job_id (str) – Job ID to get.

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) – Job location.

Returns

the list of AutoscalingEvents. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

Return type

list[dict]

wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[source]

Wait for Dataflow job.

Parameters
  • job_name (str) – The ‘jobName’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key 'jobName' in options will be overwritten.

  • location (str) – location the job is running

  • project_id (str) – Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • job_id (str | None) – a Dataflow job ID

  • multiple_jobs (bool) – If pipeline creates multiple jobs then monitor all jobs

is_job_done(location, project_id, job_id)[source]

Check that Dataflow job is started(for streaming job) or finished(for batch job).

Parameters
  • location (str) – location the job is running

  • project_id (str) – Google Cloud project ID in which to start a job

  • job_id (str) – Dataflow job ID

class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

Async hook class for dataflow service.

sync_hook_class[source]
async initialize_client(client_class)[source]

Initialize object of the given class.

Method is used to initialize asynchronous client. Because of the big amount of the classes which are used for Dataflow service it was decided to initialize them the same way with credentials which are received from the method of the GoogleBaseHook class. :param client_class: Class of the Google cloud SDK

async get_project_id()[source]
async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

Get the job with the specified Job ID.

Parameters
  • job_id (str) – Job ID to get.

  • project_id (str) – the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • job_view (int) – Optional. JobView object which determines representation of the returned data

  • location (str) – Optional. The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

Get the job status with the specified Job ID.

Parameters
  • job_id (str) – Job ID to get.

  • project_id (str) – the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • job_view (int) – Optional. JobView object which determines representation of the returned data

  • location (str) – Optional. The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]

List jobs.

For detail see: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.ListJobsRequest

Parameters
  • jobs_filter (int | None) – Optional. This field filters out and returns jobs in the specified job state.

  • project_id (str | None) – Optional. The Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str | None) – Optional. The location of the Dataflow job (for example europe-west1).

  • page_size (int | None) – Optional. If there are many jobs, limit response to at most this many.

  • page_token (str | None) – Optional. Set this to the ‘next_page_token’ field of a previous response to request additional results in a long list.

async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

Return ListJobMessagesAsyncPager object from MessagesV1Beta3AsyncClient.

This method wraps around a similar method of MessagesV1Beta3AsyncClient. ListJobMessagesAsyncPager can be iterated over to extract messages associated with a specific Job ID.

For more details see the MessagesV1Beta3AsyncClient method description at: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient

Parameters
  • job_id (str) – ID of the Dataflow job to get messages about.

  • project_id (str | None) – Optional. The Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • minimum_importance (int) – Optional. Filter to only get messages with importance >= level. For more details see the description at: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance

  • page_size (int | None) – Optional. If specified, determines the maximum number of messages to return. If unspecified, the service may choose an appropriate default, or may return an arbitrarily large number of results.

  • page_token (str | None) – Optional. If supplied, this should be the value of next_page_token returned by an earlier call. This will cause the next page of results to be returned.

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – Optional. If specified, return only messages with timestamps >= start_time. The default is the job creation time (i.e. beginning of messages).

  • end_time (google.protobuf.timestamp_pb2.Timestamp | None) – Optional. If specified, return only messages with timestamps < end_time. The default is the current time.

  • location (str | None) – Optional. The [regional endpoint] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) that contains the job specified by job_id.

async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

Return JobMetrics object from MetricsV1Beta3AsyncClient.

This method wraps around a similar method of MetricsV1Beta3AsyncClient.

For more details see the MetricsV1Beta3AsyncClient method description at: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient

Parameters
  • job_id (str) – ID of the Dataflow job to get metrics for.

  • project_id (str | None) – Optional. The Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – Optional. Return only metric data that has changed since this time. Default is to return all information about all metrics for the job.

  • location (str | None) – Optional. The [regional endpoint] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) that contains the job specified by job_id.

Was this entry helpful?