airflow.providers.google.cloud.hooks.dataflow

This module contains a Google Dataflow Hook.

Module Contents

airflow.providers.google.cloud.hooks.dataflow.DEFAULT_DATAFLOW_LOCATION = us-central1[source]
airflow.providers.google.cloud.hooks.dataflow.JOB_ID_PATTERN[source]
airflow.providers.google.cloud.hooks.dataflow.T[source]
airflow.providers.google.cloud.hooks.dataflow._fallback_variable_parameter(parameter_name: str, variable_key_name: str) → Callable[[T], T][source]
airflow.providers.google.cloud.hooks.dataflow._fallback_to_location_from_variables[source]
airflow.providers.google.cloud.hooks.dataflow._fallback_to_project_id_from_variables[source]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[source]

Helper class with Dataflow job statuses. Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

JOB_STATE_DONE = JOB_STATE_DONE[source]
JOB_STATE_UNKNOWN = JOB_STATE_UNKNOWN[source]
JOB_STATE_STOPPED = JOB_STATE_STOPPED[source]
JOB_STATE_RUNNING = JOB_STATE_RUNNING[source]
JOB_STATE_FAILED = JOB_STATE_FAILED[source]
JOB_STATE_CANCELLED = JOB_STATE_CANCELLED[source]
JOB_STATE_UPDATED = JOB_STATE_UPDATED[source]
JOB_STATE_DRAINING = JOB_STATE_DRAINING[source]
JOB_STATE_DRAINED = JOB_STATE_DRAINED[source]
JOB_STATE_PENDING = JOB_STATE_PENDING[source]
JOB_STATE_CANCELLING = JOB_STATE_CANCELLING[source]
JOB_STATE_QUEUED = JOB_STATE_QUEUED[source]
FAILED_END_STATES[source]
SUCCEEDED_END_STATES[source]
TERMINAL_STATES[source]
AWAITING_STATES[source]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobType[source]

Helper class with Dataflow job types.

JOB_TYPE_UNKNOWN = JOB_TYPE_UNKNOWN[source]
JOB_TYPE_BATCH = JOB_TYPE_BATCH[source]
JOB_TYPE_STREAMING = JOB_TYPE_STREAMING[source]
class airflow.providers.google.cloud.hooks.dataflow._DataflowJobsController(dataflow: Any, project_number: str, location: str, poll_sleep: int = 10, name: Optional[str] = None, job_id: Optional[str] = None, num_retries: int = 0, multiple_jobs: bool = False, drain_pipeline: bool = False, cancel_timeout: Optional[int] = 5 * 60, wait_until_finished: Optional[bool] = None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Interface for communication with Google API.

It's not use Apache Beam, but only Google Dataflow API.

Parameters
  • dataflow -- Discovery resource

  • project_number -- The Google Cloud Project ID.

  • location -- Job location.

  • poll_sleep -- The status refresh rate for pending operations.

  • name -- The Job ID prefix used when the multiple_jobs option is passed is set to True.

  • job_id -- ID of a single job.

  • num_retries -- Maximum number of retries in case of connection problems.

  • multiple_jobs -- If set to true this task will be searched by name prefix (name parameter), not by specific job ID, then actions will be performed on all matching jobs.

  • drain_pipeline -- Optional, set to True if want to stop streaming job by draining it instead of canceling.

  • cancel_timeout -- wait time in seconds for successful job canceling

  • wait_until_finished --

    If True, wait for the end of pipeline execution before exiting. If False, it only submits job and check once is job not in terminal state.

    The default behavior depends on the type of pipeline:

    • for the streaming pipeline, wait for jobs to start,

    • for the batch pipeline, wait for the jobs to complete.

is_job_running(self)[source]

Helper method to check if jos is still running in dataflow

Returns

True if job is running.

Return type

bool

_get_current_jobs(self)[source]

Helper method to get list of jobs that start with job name or id

Returns

list of jobs including id's

Return type

list

fetch_job_by_id(self, job_id: str)[source]

Helper method to fetch the job with the specified Job ID.

Parameters

job_id (str) -- Job ID to get.

Returns

the Job

Return type

dict

fetch_job_metrics_by_id(self, job_id: str)[source]

Helper method to fetch the job metrics with the specified Job ID.

Parameters

job_id (str) -- Job ID to get.

Returns

the JobMetrics. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

Return type

dict

_fetch_list_job_messages_responses(self, job_id: str)[source]

Helper method to fetch ListJobMessagesResponse with the specified Job ID.

Parameters

job_id (str) -- Job ID to get.

Returns

yields the ListJobMessagesResponse. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse

Return type

Generator[dict, None, None]

fetch_job_messages_by_id(self, job_id: str)[source]

Helper method to fetch the job messages with the specified Job ID.

Parameters

job_id (str) -- Job ID to get.

Returns

the list of JobMessages. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

Return type

List[dict]

fetch_job_autoscaling_events_by_id(self, job_id: str)[source]

Helper method to fetch the job autoscaling events with the specified Job ID.

Parameters

job_id (str) -- Job ID to get.

Returns

the list of AutoscalingEvents. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

Return type

List[dict]

_fetch_all_jobs(self)[source]
_fetch_jobs_by_prefix_name(self, prefix_name: str)[source]
_refresh_jobs(self)[source]

Helper method to get all jobs by name

Returns

jobs

Return type

list

_check_dataflow_job_state(self, job)[source]

Helper method to check the state of one job in dataflow for this task if job failed raise exception

Returns

True if job is done.

Return type

bool

Raise

Exception

wait_for_done(self)[source]

Helper method to wait for result of submitted job.

get_jobs(self, refresh: bool = False)[source]

Returns Dataflow jobs.

Parameters

refresh (bool) -- Forces the latest data to be fetched.

Returns

list of jobs

Return type

list

_wait_for_states(self, expected_states: Set[str])[source]

Waiting for the jobs to reach a certain state.

cancel(self)[source]

Cancels or drains current job

class airflow.providers.google.cloud.hooks.dataflow._DataflowRunner(cmd: List[str], on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

_process_fd(self, fd)[source]

Prints output to logs and lookup for job ID in each line.

Parameters

fd -- File descriptor.

_process_line_and_extract_job_id(self, line: str)[source]

Extracts job_id.

Parameters

line (str) -- URL from which job_id has to be extracted

wait_for_done(self)[source]

Waits for Dataflow job to complete.

Returns

Job id

Return type

Optional[str]

class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id: str = 'google_cloud_default', delegate_to: Optional[str] = None, poll_sleep: int = 10, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, drain_pipeline: bool = False, cancel_timeout: Optional[int] = 5 * 60, wait_until_finished: Optional[bool] = None)[source]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Hook for Google Dataflow.

All the methods in the hook where project_id is used must be called with keyword arguments rather than positional.

get_conn(self)[source]

Returns a Google Cloud Dataflow service object.

_start_dataflow(self, variables: dict, name: str, command_prefix: List[str], project_id: str, multiple_jobs: bool = False, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]
start_java_dataflow(self, job_name: str, variables: dict, jar: str, project_id: str, job_class: Optional[str] = None, append_job_name: bool = True, multiple_jobs: bool = False, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Starts Dataflow java job.

Parameters
  • job_name (str) -- The name of the job.

  • variables (dict) -- Variables passed to the job.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • jar -- Name of the jar for the job

  • job_class (str) -- Name of the java class for the job.

  • append_job_name (bool) -- True if unique suffix has to be appended to job name.

  • multiple_jobs (bool) -- True if to check for multiple job in dataflow

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

  • location (str) -- Job location.

start_template_dataflow(self, job_name: str, variables: dict, parameters: dict, dataflow_template: str, project_id: str, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION, environment: Optional[dict] = None)[source]

Starts Dataflow template job.

Parameters
  • job_name (str) -- The name of the job.

  • variables (dict) --

    Map of job runtime environment options. It will update environment argument if passed.

    See also

    For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) -- Parameters fot the template

  • dataflow_template (str) -- GCS path to the template.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • append_job_name (bool) -- True if unique suffix has to be appended to job name.

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

  • location (str) -- Job location.

start_flex_template(self, body: dict, location: str, project_id: str, on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]

Starts flex templates with the Dataflow pipeline.

Parameters
Returns

the Job

start_python_dataflow(self, job_name: str, variables: dict, dataflow: str, py_options: List[str], project_id: str, py_interpreter: str = 'python3', py_requirements: Optional[List[str]] = None, py_system_site_packages: bool = False, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Starts Dataflow job.

Parameters
  • job_name (str) -- The name of the job.

  • variables (Dict) -- Variables passed to the job.

  • dataflow (str) -- Name of the Dataflow process.

  • py_options (List[str]) -- Additional options.

  • project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to None or missing, the default project_id from the GCP connection is used.

  • py_interpreter (str) -- Python version of the beam pipeline. If None, this defaults to the python3. To track python versions supported by beam and related issues check: https://issues.apache.org/jira/browse/BEAM-1251

  • py_requirements (List[str]) --

    Additional python package(s) to install. If a value is passed to this parameter, a new virtual environment has been created with additional packages installed.

    You could also install the apache-beam package if it is not installed on your system or you want to use a different version.

  • py_system_site_packages --

    Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information.

    This option is only relevant if the py_requirements parameter is not None.

  • append_job_name (bool) -- True if unique suffix has to be appended to job name.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

  • location (str) -- Job location.

static _build_dataflow_job_name(job_name: str, append_job_name: bool = True)[source]
static _options_to_args(variables: dict)[source]
is_job_dataflow_running(self, name: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION, variables: Optional[dict] = None)[source]

Helper method to check if jos is still running in dataflow

Parameters
  • name (str) -- The name of the job.

  • project_id (str) -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) -- Job location.

Returns

True if job is running.

Return type

bool

cancel_job(self, project_id: str, job_name: Optional[str] = None, job_id: Optional[str] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Cancels the job with the specified name prefix or Job ID.

Parameter name and job_id are mutually exclusive.

Parameters
  • job_name (str) -- Name prefix specifying which jobs are to be canceled.

  • job_id (str) -- Job ID specifying which jobs are to be canceled.

  • location (str) -- Job location.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

start_sql_job(self, job_name: str, query: str, options: Dict[str, Any], project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]

Starts Dataflow SQL query.

Parameters
  • job_name (str) -- The unique name to assign to the Cloud Dataflow job.

  • query (str) -- The SQL query to execute.

  • options -- Job parameters to be executed. For more information, look at: https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query command reference

  • location (str) -- The location of the Dataflow job (for example europe-west1)

  • project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to None or missing, the default project_id from the GCP connection is used.

  • on_new_job_id_callback (callable) -- Callback called when the job ID is known.

Returns

the new job object

get_job(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location -- The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

Returns

the Job

Return type

dict

fetch_job_metrics_by_id(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job metrics with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location -- The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

Returns

the JobMetrics. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

Return type

dict

fetch_job_messages_by_id(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job messages with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) -- Job location.

Returns

the list of JobMessages. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

Return type

List[dict]

fetch_job_autoscaling_events_by_id(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]

Gets the job autoscaling events with the specified Job ID.

Parameters
  • job_id (str) -- Job ID to get.

  • project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • location (str) -- Job location.

Returns

the list of AutoscalingEvents. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

Return type

List[dict]

Was this entry helpful?