Source code for airflow.operators.bash

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from typing import Dict, Optional

try:
    from functools import cached_property
except ImportError:
    from cached_property import cached_property

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.hooks.subprocess import SubprocessHook
from airflow.models import BaseOperator
from airflow.utils.operator_helpers import context_to_airflow_vars


[docs]class BashOperator(BaseOperator): r""" Execute a Bash script, command or set of commands. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:BashOperator` If BaseOperator.do_xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes :param bash_command: The command, set of commands or reference to a bash script (must be '.sh') to be executed. (templated) :type bash_command: str :param env: If env is not None, it must be a dict that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) :type env: dict :param output_encoding: Output encoding of bash command :type output_encoding: str :param skip_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: 99). If set to ``None``, any non-zero exit code will be treated as a failure. :type skip_exit_code: int Airflow will evaluate the exit code of the bash command. In general, a non-zero exit code will result in task failure and zero will result in task success. Exit code ``99`` (or another set in ``skip_exit_code``) will throw an :class:`airflow.exceptions.AirflowSkipException`, which will leave the task in ``skipped`` state. You can have all non-zero exit codes be treated as a failure by setting ``skip_exit_code=None``. .. list-table:: :widths: 25 25 :header-rows: 1 * - Exit code - Behavior * - 0 - success * - `skip_exit_code` (default: 99) - raise :class:`airflow.exceptions.AirflowSkipException` * - otherwise - raise :class:`airflow.exceptions.AirflowException` .. note:: Airflow will not recognize a non-zero exit code unless the whole shell exit with a non-zero exit code. This can be an issue if the non-zero exit arises from a sub-command. The easiest way of addressing this is to prefix the command with ``set -e;`` Example: .. code-block:: python bash_command = "set -e; python3 script.py '{{ next_execution_date }}'" .. note:: Add a space after the script name when directly calling a ``.sh`` script with the ``bash_command`` argument -- for example ``bash_command="my_script.sh "``. This is because Airflow tries to apply load this file and process it as a Jinja template to it ends with ``.sh``, which will likely not be what most users want. .. warning:: Care should be taken with "user" input or when using Jinja templates in the ``bash_command``, as this bash operator does not perform any escaping or sanitization of the command. This applies mostly to using "dag_run" conf, as that can be submitted via users in the Web UI. Most of the default template variables are not at risk. For example, do **not** do this: .. code-block:: python bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"', ) Instead, you should pass this via the ``env`` kwarg and use double-quotes inside the bash_command, as below: .. code-block:: python bash_task = BashOperator( task_id="bash_task", bash_command='echo "here is the message: \'$message\'"', env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, ) """
[docs] template_fields = ('bash_command', 'env')
[docs] template_fields_renderers = {'bash_command': 'bash', 'env': 'json'}
[docs] template_ext = ( '.sh', '.bash',
)
[docs] ui_color = '#f0ede4'
def __init__( self, *, bash_command: str, env: Optional[Dict[str, str]] = None, output_encoding: str = 'utf-8', skip_exit_code: int = 99, **kwargs, ) -> None: super().__init__(**kwargs) self.bash_command = bash_command self.env = env self.output_encoding = output_encoding self.skip_exit_code = skip_exit_code if kwargs.get('xcom_push') is not None: raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead") @cached_property
[docs] def subprocess_hook(self): """Returns hook for running the bash command""" return SubprocessHook()
[docs] def get_env(self, context): """Builds the set of environment variables to be exposed for the bash command""" env = self.env if env is None: env = os.environ.copy() airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) self.log.debug( 'Exporting the following env vars:\n%s', '\n'.join(f"{k}={v}" for k, v in airflow_context_vars.items()), ) env.update(airflow_context_vars) return env
[docs] def execute(self, context): env = self.get_env(context) result = self.subprocess_hook.run_command( command=['bash', '-c', self.bash_command], env=env, output_encoding=self.output_encoding, ) if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code: raise AirflowSkipException(f"Bash command returned exit code {self.skip_exit_code}. Skipping.") elif result.exit_code != 0: raise AirflowException('Bash command failed. The command returned a non-zero exit code.') return result.output
[docs] def on_kill(self) -> None: self.subprocess_hook.send_sigterm()

Was this entry helpful?