BashOperator

Use the BashOperator to execute commands in a Bash shell. The Bash command or script to execute is determined by:

  1. The bash_command argument when using BashOperator, or

  2. If using the TaskFlow decorator, @task.bash, a non-empty string value returned from the decorated callable.

Tip

The @task.bash decorator is recommended over the classic BashOperator to execute Bash commands.

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def run_after_loop() -> str:
    return "echo 1"

run_this = run_after_loop()

airflow/example_dags/example_bash_operator.py[source]

run_this = BashOperator(
    task_id="run_after_loop",
    bash_command="ls -alh --color=always / && echo https://airflow.apache.org/  && echo 'some <code>html</code>'",
)

Templating

You can use Jinja templates to parameterize the Bash command.

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def also_run_this() -> str:
    return 'echo "ti_key={{ task_instance_key_str }}"'

also_this = also_run_this()

airflow/example_dags/example_bash_operator.py[source]

also_run_this = BashOperator(
    task_id="also_run_this",
    bash_command='echo "ti_key={{ task_instance_key_str }}"',
)

Using the @task.bash TaskFlow decorator allows you to return a formatted string and take advantage of having all execution context variables directly accessible to decorated tasks.

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def also_run_this_again(task_instance_key_str) -> str:
    return f'echo "ti_key={task_instance_key_str}"'

also_this_again = also_run_this_again()

You are encouraged to take advantage of this approach as it fits nicely into the overall TaskFlow paradigm.

Caution

Care should be taken with “user” input when using Jinja templates in the Bash command as escaping and sanitization of the Bash command is not performed.

This applies mostly to using dag_run.conf, as that can be submitted via users in the Web UI. Most of the default template variables are not at risk.

For example, do not do:

@task.bash
def bash_task() -> str:
    return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'


# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
    message = dag_run.conf["message"] if dag_run.conf else ""
    return f'echo "here is the message: {message}"'
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)

Instead, you should pass this via the env kwarg and use double-quotes inside the Bash command.

@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
    return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)

Skipping

In general a non-zero exit code produces an AirflowException and thus a task failure. In cases where it is desirable to instead have the task end in a skipped state, you can exit with code 99 (or with another exit code if you pass skip_on_exit_code).

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def this_will_skip() -> str:
    return 'echo "hello world"; exit 99;'

this_skips = this_will_skip()

airflow/example_dags/example_bash_operator.py[source]

this_will_skip = BashOperator(
    task_id="this_will_skip",
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)

Executing commands from files

Both the BashOperator and @task.bash TaskFlow decorator enables you to execute Bash commands stored in files. The files must have a .sh or .bash extension.

Note the space after the script name (more on this in the next section).

@task.bash
def run_command_from_script() -> str:
    return "$AIRFLOW_HOME/scripts/example.sh "


run_script = run_command_from_script()
run_script = BashOperator(
    task_id="run_command_from_script",
    bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)

Jinja template not found

If you encounter a “Template not found” exception when trying to execute a Bash script, add a space after the script name. This is because Airflow tries to apply a Jinja template to it, which will fail.

@task.bash
def bash_example():
    # This fails with 'Jinja template not found' error
    # return "/home/batcher/test.sh",
    # This works (has a space after)
    return "/home/batcher/test.sh "
BashOperator(
    task_id="bash_example",
    # This fails with 'Jinja template not found' error
    # bash_command="/home/batcher/test.sh",
    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
)

However, if you want to use templating in your Bash script, do not add the space and instead put your Bash script in a location relative to the directory containing the DAG file. So if your DAG file is in /usr/local/airflow/dags/test_dag.py, you can move your test.sh file to any location under /usr/local/airflow/dags/ (Example: /usr/local/airflow/dags/scripts/test.sh) and pass the relative path to bash_command as shown below:

@task.bash
def bash_example():
    # "scripts" folder is under "/usr/local/airflow/dags"
    return "scripts/test.sh"
t2 = BashOperator(
    task_id="bash_example",
    # "scripts" folder is under "/usr/local/airflow/dags"
    bash_command="scripts/test.sh",
)

Creating separate folder for Bash scripts may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines.

It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.

@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
    @task.bash
    def bash_example():
        return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
    t2 = BashOperator(
        task_id="bash_example",
        bash_command="test.sh ",
    )

Enriching Bash with Python

The @task.bash TaskFlow decorator allows you to combine both Bash and Python into a powerful combination within a task.

Using Python conditionals, other function calls, etc. within a @task.bash task can help define, augment, or even build the Bash command(s) to execute.

For example, use conditional logic to determine task behavior:

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def sleep_in(day: str) -> str:
    if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
        return f"sleep {60 * 60}"
    else:
        raise AirflowSkipException("No sleeping in today!")

sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")

Or call a function to help build a Bash command:

airflow/example_dags/example_bash_decorator.py[source]

def _get_files_in_cwd() -> list[str]:
    from pathlib import Path

    dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
    files = [str(elem) for elem in dir_contents if elem.is_file()]

    return files

@task.bash
def get_file_stats() -> str:
    files = _get_files_in_cwd()
    cmd = "stat "
    cmd += " ".join(files)

    return cmd

get_file_stats()

There are numerous possibilities with this type of pre-execution enrichment.

BashSensor

Use the BashSensor to use arbitrary command for sensing. The command should return 0 when it succeeds, any other value otherwise.

airflow/example_dags/example_sensors.py[source]

t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0")

t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1")

Was this entry helpful?