#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Apache Beam Hook."""
import json
import select
import shlex
import subprocess
import textwrap
from tempfile import TemporaryDirectory
from typing import Callable, List, Optional
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.python_virtualenv import prepare_virtualenv
[docs]class BeamRunnerType:
"""
Helper class for listing runner types.
For more information about runners see:
https://beam.apache.org/documentation/
"""
[docs] DataflowRunner = "DataflowRunner"
[docs] DirectRunner = "DirectRunner"
[docs] SparkRunner = "SparkRunner"
[docs] FlinkRunner = "FlinkRunner"
[docs] SamzaRunner = "SamzaRunner"
[docs] NemoRunner = "NemoRunner"
[docs] JetRunner = "JetRunner"
[docs] Twister2Runner = "Twister2Runner"
[docs]def beam_options_to_args(options: dict) -> List[str]:
"""
Returns a formatted pipeline options from a dictionary of arguments
The logic of this method should be compatible with Apache Beam:
https://github.com/apache/beam/blob/b56740f0e8cd80c2873412847d0b336837429fb9/sdks/python/
apache_beam/options/pipeline_options.py#L230-L251
:param options: Dictionary with options
:type options: dict
:return: List of arguments
:rtype: List[str]
"""
if not options:
return []
args: List[str] = []
for attr, value in options.items():
if value is None or (isinstance(value, bool) and value):
args.append(f"--{attr}")
elif isinstance(value, list):
args.extend([f"--{attr}={v}" for v in value])
else:
args.append(f"--{attr}={value}")
return args
[docs]class BeamCommandRunner(LoggingMixin):
"""
Class responsible for running pipeline command in subprocess
:param cmd: Parts of the command to be run in subprocess
:type cmd: List[str]
:param process_line_callback: Optional callback which can be used to process
stdout and stderr to detect job id
:type process_line_callback: Optional[Callable[[str], None]]
"""
def __init__(
self,
cmd: List[str],
process_line_callback: Optional[Callable[[str], None]] = None,
) -> None:
super().__init__()
self.log.info("Running command: %s", " ".join(shlex.quote(c) for c in cmd))
self.process_line_callback = process_line_callback
self.job_id: Optional[str] = None
self._proc = subprocess.Popen(
cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
[docs] def _process_fd(self, fd):
"""
Prints output to logs.
:param fd: File descriptor.
"""
if fd not in (self._proc.stdout, self._proc.stderr):
raise Exception("No data in stderr or in stdout.")
fd_to_log = {self._proc.stderr: self.log.warning, self._proc.stdout: self.log.info}
func_log = fd_to_log[fd]
while True:
line = fd.readline().decode()
if not line:
return
if self.process_line_callback:
self.process_line_callback(line)
func_log(line.rstrip("\n"))
[docs] def wait_for_done(self) -> None:
"""Waits for Apache Beam pipeline to complete."""
self.log.info("Start waiting for Apache Beam process to complete.")
reads = [self._proc.stderr, self._proc.stdout]
while True:
# Wait for at least one available fd.
readable_fds, _, _ = select.select(reads, [], [], 5)
if readable_fds is None:
self.log.info("Waiting for Apache Beam process to complete.")
continue
for readable_fd in readable_fds:
self._process_fd(readable_fd)
if self._proc.poll() is not None:
break
# Corner case: check if more output was created between the last read and the process termination
for readable_fd in reads:
self._process_fd(readable_fd)
self.log.info("Process exited with return code: %s", self._proc.returncode)
if self._proc.returncode != 0:
raise AirflowException(f"Apache Beam process failed with return code {self._proc.returncode}")
[docs]class BeamHook(BaseHook):
"""
Hook for Apache Beam.
All the methods in the hook where project_id is used must be called with
keyword arguments rather than positional.
:param runner: Runner type
:type runner: str
"""
def __init__(
self,
runner: str,
) -> None:
self.runner = runner
super().__init__()
[docs] def _start_pipeline(
self,
variables: dict,
command_prefix: List[str],
process_line_callback: Optional[Callable[[str], None]] = None,
) -> None:
cmd = command_prefix + [
f"--runner={self.runner}",
]
if variables:
cmd.extend(beam_options_to_args(variables))
cmd_runner = BeamCommandRunner(
cmd=cmd,
process_line_callback=process_line_callback,
)
cmd_runner.wait_for_done()
[docs] def start_python_pipeline( # pylint: disable=too-many-arguments
self,
variables: dict,
py_file: str,
py_options: List[str],
py_interpreter: str = "python3",
py_requirements: Optional[List[str]] = None,
py_system_site_packages: bool = False,
process_line_callback: Optional[Callable[[str], None]] = None,
):
"""
Starts Apache Beam python pipeline.
:param variables: Variables passed to the pipeline.
:type variables: Dict
:param py_options: Additional options.
:type py_options: List[str]
:param py_interpreter: Python version of the Apache Beam pipeline.
If None, this defaults to the python3.
To track python versions supported by beam and related
issues check: https://issues.apache.org/jira/browse/BEAM-1251
:type py_interpreter: str
:param py_requirements: Additional python package(s) to install.
If a value is passed to this parameter, a new virtual environment has been created with
additional packages installed.
You could also install the apache-beam package if it is not installed on your system or you want
to use a different version.
:type py_requirements: List[str]
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
See virtualenv documentation for more information.
This option is only relevant if the ``py_requirements`` parameter is not None.
:type py_system_site_packages: bool
:param on_new_job_id_callback: Callback called when the job ID is known.
:type on_new_job_id_callback: callable
"""
if "labels" in variables:
variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()]
if py_requirements is not None:
if not py_requirements and not py_system_site_packages:
warning_invalid_environment = textwrap.dedent(
"""\
Invalid method invocation. You have disabled inclusion of system packages and empty list
required for installation, so it is not possible to create a valid virtual environment.
In the virtual environment, apache-beam package must be installed for your job to be \
executed. To fix this problem:
* install apache-beam on the system, then set parameter py_system_site_packages to True,
* add apache-beam to the list of required packages in parameter py_requirements.
"""
)
raise AirflowException(warning_invalid_environment)
with TemporaryDirectory(prefix="apache-beam-venv") as tmp_dir:
py_interpreter = prepare_virtualenv(
venv_directory=tmp_dir,
python_bin=py_interpreter,
system_site_packages=py_system_site_packages,
requirements=py_requirements,
)
command_prefix = [py_interpreter] + py_options + [py_file]
self._start_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)
else:
command_prefix = [py_interpreter] + py_options + [py_file]
self._start_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)
[docs] def start_java_pipeline(
self,
variables: dict,
jar: str,
job_class: Optional[str] = None,
process_line_callback: Optional[Callable[[str], None]] = None,
) -> None:
"""
Starts Apache Beam Java pipeline.
:param variables: Variables passed to the job.
:type variables: dict
:param jar: Name of the jar for the pipeline
:type job_class: str
:param job_class: Name of the java class for the pipeline.
:type job_class: str
"""
if "labels" in variables:
variables["labels"] = json.dumps(variables["labels"], separators=(",", ":"))
command_prefix = ["java", "-cp", jar, job_class] if job_class else ["java", "-jar", jar]
self._start_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)