airflow.providers.google.cloud.hooks.dataflow

This module contains a Google Dataflow Hook.

Module Contents

airflow.providers.google.cloud.hooks.dataflow.DEFAULT_DATAFLOW_LOCATION = us-central1[source]
airflow.providers.google.cloud.hooks.dataflow.JOB_ID_PATTERN[source]
airflow.providers.google.cloud.hooks.dataflow.T[source]
airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback: Optional[Callable[[str], None]]) → Callable[[str], None][source]
Returns callback which triggers function passed as `on_new_job_id_callback` when Dataflow job_id is found.
To be used for `process_line_callback` in
:py:class:`~airflow.providers.apache.beam.hooks.beam.BeamCommandRunner`
Parameters

on_new_job_id_callback (callback) -- Callback called when the job ID is known

class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[source]

Helper class with Dataflow job statuses. Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

JOB_STATE_DONE = JOB_STATE_DONE[source]
JOB_STATE_UNKNOWN = JOB_STATE_UNKNOWN[source]
JOB_STATE_STOPPED = JOB_STATE_STOPPED[source]
JOB_STATE_RUNNING = JOB_STATE_RUNNING[source]
JOB_STATE_FAILED = JOB_STATE_FAILED[source]
JOB_STATE_CANCELLED = JOB_STATE_CANCELLED[source]
JOB_STATE_UPDATED = JOB_STATE_UPDATED[source]
JOB_STATE_DRAINING = JOB_STATE_DRAINING[source]
JOB_STATE_DRAINED = JOB_STATE_DRAINED[source]
JOB_STATE_PENDING = JOB_STATE_PENDING[source]
JOB_STATE_CANCELLING = JOB_STATE_CANCELLING[source]
JOB_STATE_QUEUED = JOB_STATE_QUEUED[source]
FAILED_END_STATES[source]
SUCCEEDED_END_STATES[source]
TERMINAL_STATES[source]
AWAITING_STATES[source]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobType[source]

Helper class with Dataflow job types.

JOB_TYPE_UNKNOWN = JOB_TYPE_UNKNOWN[source]
JOB_TYPE_BATCH = JOB_TYPE_BATCH[source]
JOB_TYPE_STREAMING = JOB_TYPE_STREAMING[source]
class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id: str = 'google_cloud_default', delegate_to: Optional[str] = None, poll_sleep: int = 10, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, drain_pipeline: bool = False, cancel_timeout: Optional[int] = 5 * 60, wait_until_finished: Optional[bool] = None)[source]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Hook for Google Dataflow.

All the methods in the hook where project_id is used must be called with keyword arguments rather than positional.

get_conn(self)[source]

Returns a Google Cloud Dataflow service object.

start_java_dataflow(self, job_name: str, variables: dict, jar: str, project_id: str, job_class: Optional[str] = None, append_job_name: bool = True, multiple_jobs: bool = False, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Starts Dataflow java job.

Parameters
  • job_name (str) -- The name of the job.

  • variables (dict) -- Variables passed to the job.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • jar -- Name of the jar for the job

  • job_class (str) -- Name of the java class for the job.

  • append_job_name (bool) -- True if unique suffix has to be appended to job name.

  • multiple_jobs (bool) -- True if to check for multiple job in dataflow

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

  • location (str) -- Job location.

start_template_dataflow(self, job_name: str, variables: dict, parameters: dict, dataflow_template: str, project_id: str, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION, environment: Optional[dict] = None)[source]

Starts Dataflow template job.

Parameters
  • job_name (str) -- The name of the job.

  • variables (dict) --

    Map of job runtime environment options. It will update environment argument if passed.

    See also

    For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) -- Parameters fot the template

  • dataflow_template (str) -- GCS path to the template.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • append_job_name (bool) -- True if unique suffix has to be appended to job name.

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

  • location (str) -- Job location.

start_flex_template(self, body: dict, location: str, project_id: str, on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]

Starts flex templates with the Dataflow pipeline.

Parameters
Returns

the Job

start_python_dataflow(self, job_name: str, variables: dict, dataflow: str, py_options: List[str], project_id: str, py_interpreter: str = 'python3', py_requirements: Optional[List[str]] = None, py_system_site_packages: bool = False, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Starts Dataflow job.

Parameters
  • job_name (str) -- The name of the job.

  • variables (Dict) -- Variables passed to the job.

  • dataflow (str) -- Name of the Dataflow process.

  • py_options (List[str]) -- Additional options.

  • project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to None or missing, the default project_id from the GCP connection is used.

  • py_interpreter (str) -- Python version of the beam pipeline. If None, this defaults to the python3. To track python versions supported by beam and related issues check: https://issues.apache.org/jira/browse/BEAM-1251

  • py_requirements (List[str]) --

    Additional python package(s) to install. If a value is passed to this parameter, a new virtual environment has been created with additional packages installed.

    You could also install the apache-beam package if it is not installed on your system or you want to use a different version.

  • py_system_site_packages --

    Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information.

    This option is only relevant if the py_requirements parameter is not None.

  • append_job_name (bool) -- True if unique suffix has to be appended to job name.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

  • location (str) -- Job location.

static build_dataflow_job_name(job_name: str, append_job_name: bool = True)[source]

Builds Dataflow job name.

is_job_dataflow_running(self, name: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION, variables: Optional[dict] = None)[source]

Helper method to check if jos is still running in dataflow

Parameters
  • name (str) -- The name of the job.

  • project_id (str) -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) -- Job location.

Returns

True if job is running.

Return type

bool

cancel_job(self, project_id: str, job_name: Optional[str] = None, job_id: Optional[str] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Cancels the job with the specified name prefix or Job ID.

Parameter name and job_id are mutually exclusive.

Parameters
  • job_name (str) -- Name prefix specifying which jobs are to be canceled.

  • job_id (str) -- Job ID specifying which jobs are to be canceled.

  • location (str) -- Job location.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

start_sql_job(self, job_name: str, query: str, options: Dict[str, Any], project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]

Starts Dataflow SQL query.

Parameters
  • job_name (str) -- The unique name to assign to the Cloud Dataflow job.

  • query (str) -- The SQL query to execute.

  • options -- Job parameters to be executed. For more information, look at: https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query command reference

  • location (str) -- The location of the Dataflow job (for example europe-west1)

  • project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to None or missing, the default project_id from the GCP connection is used.

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

Returns

the new job object

get_job(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location -- The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

Returns

the Job

Return type

dict

fetch_job_metrics_by_id(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job metrics with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location -- The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

Returns

the JobMetrics. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

Return type

dict

fetch_job_messages_by_id(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job messages with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) -- Job location.

Returns

the list of JobMessages. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

Return type

List[dict]

fetch_job_autoscaling_events_by_id(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job autoscaling events with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) -- Job location.

Returns

the list of AutoscalingEvents. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

Return type

List[dict]

wait_for_done(self, job_name: str, location: str, project_id: str, job_id: Optional[str] = None, multiple_jobs: bool = False)[source]

Wait for Dataflow job.

Parameters
  • job_name (str) -- The 'jobName' to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key 'jobName' in options will be overwritten.

  • location (str) -- location the job is running

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • job_id (str) -- a Dataflow job ID

  • multiple_jobs (boolean) -- If pipeline creates multiple jobs then monitor all jobs

Was this entry helpful?