airflow.providers.google.cloud.hooks.dataflow
¶
This module contains a Google Dataflow Hook.
Module Contents¶
-
airflow.providers.google.cloud.hooks.dataflow.
process_line_and_extract_dataflow_job_id_callback
(on_new_job_id_callback: Optional[Callable[[str], None]]) → Callable[[str], None][source]¶ -
Returns callback which triggers function passed as `on_new_job_id_callback` when Dataflow job_id is found.
-
To be used for `process_line_callback` in
-
:py:class:`~airflow.providers.apache.beam.hooks.beam.BeamCommandRunner`
- Parameters
on_new_job_id_callback (callback) -- Callback called when the job ID is known
-
airflow.providers.google.cloud.hooks.dataflow.
_fallback_variable_parameter
(parameter_name: str, variable_key_name: str) → Callable[[T], T][source]¶
-
class
airflow.providers.google.cloud.hooks.dataflow.
DataflowJobStatus
[source]¶ Helper class with Dataflow job statuses. Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
-
class
airflow.providers.google.cloud.hooks.dataflow.
DataflowJobType
[source]¶ Helper class with Dataflow job types.
-
class
airflow.providers.google.cloud.hooks.dataflow.
_DataflowJobsController
(dataflow: Any, project_number: str, location: str, poll_sleep: int = 10, name: Optional[str] = None, job_id: Optional[str] = None, num_retries: int = 0, multiple_jobs: bool = False, drain_pipeline: bool = False, cancel_timeout: Optional[int] = 5 * 60, wait_until_finished: Optional[bool] = None)[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
Interface for communication with Google API.
It's not use Apache Beam, but only Google Dataflow API.
- Parameters
dataflow -- Discovery resource
project_number -- The Google Cloud Project ID.
location -- Job location.
poll_sleep -- The status refresh rate for pending operations.
name -- The Job ID prefix used when the multiple_jobs option is passed is set to True.
job_id -- ID of a single job.
num_retries -- Maximum number of retries in case of connection problems.
multiple_jobs -- If set to true this task will be searched by name prefix (
name
parameter), not by specific job ID, then actions will be performed on all matching jobs.drain_pipeline -- Optional, set to True if want to stop streaming job by draining it instead of canceling.
cancel_timeout -- wait time in seconds for successful job canceling
wait_until_finished --
If True, wait for the end of pipeline execution before exiting. If False, it only submits job and check once is job not in terminal state.
The default behavior depends on the type of pipeline:
for the streaming pipeline, wait for jobs to start,
for the batch pipeline, wait for the jobs to complete.
-
is_job_running
(self)[source]¶ Helper method to check if jos is still running in dataflow
- Returns
True if job is running.
- Return type
-
_get_current_jobs
(self)[source]¶ Helper method to get list of jobs that start with job name or id
- Returns
list of jobs including id's
- Return type
-
fetch_job_by_id
(self, job_id: str)[source]¶ Helper method to fetch the job with the specified Job ID.
-
fetch_job_metrics_by_id
(self, job_id: str)[source]¶ Helper method to fetch the job metrics with the specified Job ID.
- Parameters
job_id (str) -- Job ID to get.
- Returns
the JobMetrics. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics
- Return type
-
_fetch_list_job_messages_responses
(self, job_id: str)[source]¶ Helper method to fetch ListJobMessagesResponse with the specified Job ID.
- Parameters
job_id (str) -- Job ID to get.
- Returns
yields the ListJobMessagesResponse. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse
- Return type
-
fetch_job_messages_by_id
(self, job_id: str)[source]¶ Helper method to fetch the job messages with the specified Job ID.
- Parameters
job_id (str) -- Job ID to get.
- Returns
the list of JobMessages. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
- Return type
List[dict]
-
fetch_job_autoscaling_events_by_id
(self, job_id: str)[source]¶ Helper method to fetch the job autoscaling events with the specified Job ID.
- Parameters
job_id (str) -- Job ID to get.
- Returns
the list of AutoscalingEvents. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
- Return type
List[dict]
-
_check_dataflow_job_state
(self, job)[source]¶ Helper method to check the state of one job in dataflow for this task if job failed raise exception
- Returns
True if job is done.
- Return type
- Raise
Exception
-
class
airflow.providers.google.cloud.hooks.dataflow.
DataflowHook
(gcp_conn_id: str = 'google_cloud_default', delegate_to: Optional[str] = None, poll_sleep: int = 10, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, drain_pipeline: bool = False, cancel_timeout: Optional[int] = 5 * 60, wait_until_finished: Optional[bool] = None)[source]¶ Bases:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
Hook for Google Dataflow.
All the methods in the hook where project_id is used must be called with keyword arguments rather than positional.
-
start_java_dataflow
(self, job_name: str, variables: dict, jar: str, project_id: str, job_class: Optional[str] = None, append_job_name: bool = True, multiple_jobs: bool = False, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Starts Dataflow java job.
- Parameters
job_name (str) -- The name of the job.
variables (dict) -- Variables passed to the job.
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
jar -- Name of the jar for the job
job_class (str) -- Name of the java class for the job.
append_job_name (bool) -- True if unique suffix has to be appended to job name.
multiple_jobs (bool) -- True if to check for multiple job in dataflow
on_new_job_id_callback (callable) -- Callback called when the job ID is known.
location (str) -- Job location.
-
start_template_dataflow
(self, job_name: str, variables: dict, parameters: dict, dataflow_template: str, project_id: str, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION, environment: Optional[dict] = None)[source]¶ Starts Dataflow template job.
- Parameters
job_name (str) -- The name of the job.
variables (dict) --
Map of job runtime environment options. It will update environment argument if passed.
See also
For more information on possible configurations, look at the API documentation https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) -- Parameters fot the template
dataflow_template (str) -- GCS path to the template.
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
append_job_name (bool) -- True if unique suffix has to be appended to job name.
on_new_job_id_callback (callable) -- Callback called when the job ID is known.
location (str) -- Job location.
-
start_flex_template
(self, body: dict, location: str, project_id: str, on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]¶ Starts flex templates with the Dataflow pipeline.
- Parameters
body -- The request body. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) -- The location of the Dataflow job (for example europe-west1)
project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to
None
or missing, the default project_id from the GCP connection is used.on_new_job_id_callback -- A callback that is called when a Job ID is detected.
- Returns
the Job
-
start_python_dataflow
(self, job_name: str, variables: dict, dataflow: str, py_options: List[str], project_id: str, py_interpreter: str = 'python3', py_requirements: Optional[List[str]] = None, py_system_site_packages: bool = False, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Starts Dataflow job.
- Parameters
job_name (str) -- The name of the job.
variables (Dict) -- Variables passed to the job.
dataflow (str) -- Name of the Dataflow process.
py_options (List[str]) -- Additional options.
project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to
None
or missing, the default project_id from the GCP connection is used.py_interpreter (str) -- Python version of the beam pipeline. If None, this defaults to the python3. To track python versions supported by beam and related issues check: https://issues.apache.org/jira/browse/BEAM-1251
py_requirements (List[str]) --
Additional python package(s) to install. If a value is passed to this parameter, a new virtual environment has been created with additional packages installed.
You could also install the apache-beam package if it is not installed on your system or you want to use a different version.
py_system_site_packages --
Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information.
This option is only relevant if the
py_requirements
parameter is not None.append_job_name (bool) -- True if unique suffix has to be appended to job name.
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
on_new_job_id_callback (callable) -- Callback called when the job ID is known.
location (str) -- Job location.
-
static
build_dataflow_job_name
(job_name: str, append_job_name: bool = True)[source]¶ Builds Dataflow job name.
-
is_job_dataflow_running
(self, name: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION, variables: Optional[dict] = None)[source]¶ Helper method to check if jos is still running in dataflow
- Parameters
- Returns
True if job is running.
- Return type
-
cancel_job
(self, project_id: str, job_name: Optional[str] = None, job_id: Optional[str] = None, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Cancels the job with the specified name prefix or Job ID.
Parameter
name
andjob_id
are mutually exclusive.- Parameters
job_name (str) -- Name prefix specifying which jobs are to be canceled.
job_id (str) -- Job ID specifying which jobs are to be canceled.
location (str) -- Job location.
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
-
start_sql_job
(self, job_name: str, query: str, options: Dict[str, Any], project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback: Optional[Callable[[str], None]] = None)[source]¶ Starts Dataflow SQL query.
- Parameters
job_name (str) -- The unique name to assign to the Cloud Dataflow job.
query (str) -- The SQL query to execute.
options -- Job parameters to be executed. For more information, look at: https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query command reference
location (str) -- The location of the Dataflow job (for example europe-west1)
project_id (Optional[str]) -- The ID of the GCP project that owns the job. If set to
None
or missing, the default project_id from the GCP connection is used.on_new_job_id_callback (callable) -- Callback called when the job ID is known.
- Returns
the new job object
-
get_job
(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Gets the job with the specified Job ID.
- Parameters
job_id (str) -- Job ID to get.
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
location -- The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- Returns
the Job
- Return type
-
fetch_job_metrics_by_id
(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Gets the job metrics with the specified Job ID.
- Parameters
job_id (str) -- Job ID to get.
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
location -- The location of the Dataflow job (for example europe-west1). See: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- Returns
the JobMetrics. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics
- Return type
-
fetch_job_messages_by_id
(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Gets the job messages with the specified Job ID.
- Parameters
- Returns
the list of JobMessages. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
- Return type
List[dict]
-
fetch_job_autoscaling_events_by_id
(self, job_id: str, project_id: str, location: str = DEFAULT_DATAFLOW_LOCATION)[source]¶ Gets the job autoscaling events with the specified Job ID.
- Parameters
- Returns
the list of AutoscalingEvents. See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
- Return type
List[dict]
-
wait_for_done
(self, job_name: str, location: str, project_id: str, job_id: Optional[str] = None, multiple_jobs: bool = False)[source]¶ Wait for Dataflow job.
- Parameters
job_name (str) -- The 'jobName' to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key
'jobName'
inoptions
will be overwritten.location (str) -- location the job is running
project_id -- Optional, the Google Cloud project ID in which to start a job. If set to None or missing, the default project_id from the Google Cloud connection is used.
job_id (str) -- a Dataflow job ID
multiple_jobs (boolean) -- If pipeline creates multiple jobs then monitor all jobs
-