Dynamic DAG Generation¶
To have a task repeated based on the output/result of a previous task see Dynamic Task Mapping.
Dynamic DAGs with environment variables¶
If you want to use variables to configure your code, you should always use environment variables in your top-level code rather than Airflow Variables. Using Airflow Variables at top-level code creates a connection to metadata DB of Airflow to fetch the value, which can slow down parsing and place extra load on the DB. See the Airflow Variables on how to make best use of Airflow Variables in your DAGs using Jinja templates .
For example you could set DEPLOYMENT
variable differently for your production and development
environments. The variable DEPLOYMENT
could be set to PROD
in your production environment and to
DEV
in your development environment. Then you could build your dag differently in production and
development environment, depending on the value of the environment variable.
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
Generating Python code with embedded meta-data¶
You can externally generate Python code containing the meta-data as importable constants. Such constant can then be imported directly by your DAG and used to construct the object and build the dependencies. This makes it easy to import such code from multiple DAGs without the need to find, load and parse the meta-data stored in the constant - this is done automatically by Python interpreter when it processes the "import" statement. This sounds strange at first, but it is surprisingly easy to generate such code and make sure this is a valid Python code that you can import from your DAGs.
For example assume you dynamically generate (in your DAG folder), the my_company_utils/common.py
file:
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
Then you can import and use the ALL_TASKS
constant in all your DAGs like that:
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
for task in ALL_TASKS:
# create your operators and relations here
pass
Don't forget that in this case you need to add empty __init__.py
file in the my_company_utils
folder
and you should add the my_company_utils/.*
line to .airflowignore
file (if using the regexp ignore
syntax), so that the whole folder is ignored by the scheduler when it looks for DAGs.
Dynamic DAGs with external configuration from a structured data file¶
If you need to use a more complex meta-data to prepare your DAG structure and you would prefer to keep the data in a structured non-python format, you should export the data to the DAG folder in a file and push it to the DAG folder, rather than try to pull the data by the DAG's top-level code - for the reasons explained in the parent Top level Python Code.
The meta-data should be exported and stored together with the DAGs in a convenient file format (JSON, YAML
formats are good candidates) in DAG folder. Ideally, the meta-data should be published in the same
package/folder as the module of the DAG file you load it from, because then you can find location of
the meta-data file in your DAG easily. The location of the file to read can be found using the
__file__
attribute of the module containing the DAG:
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
Dynamic DAGs with globals()
¶
You can dynamically generate DAGs by working with globals()
.
As long as a DAG
object in globals()
is created, Airflow will load it.
from datetime import datetime
from airflow.decorators import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
globals()[dag_id] = dynamic_generated_dag()
The code below will generate a DAG for each config: dynamic_generated_dag_config1
and dynamic_generated_dag_config2
.
Each of them can run separately with related configuration
Warning
Using this practice, pay attention to "late binding" behaviour in Python loops. See that GitHub discussion for more details