Using Operators

An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs.

See the Operators Concepts documentation and the Operators API Reference for more information.

BashOperator

Use the BashOperator to execute commands in a Bash shell.

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    dag=dag,
)

Templating

You can use Jinja templates to parameterize the bash_command argument.

also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag,
)

Troubleshooting

Jinja template not found

Add a space after the script name when directly calling a Bash script with the bash_command argument. This is because Airflow tries to apply a Jinja template to it, which will fail.

t2 = BashOperator(
    task_id='bash_example',

    # This fails with `Jinja template not found` error
    # bash_command="/home/batcher/test.sh",

    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
    dag=dag)

PythonOperator

Use the PythonOperator to execute Python callables.

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag,
)

Passing in arguments

Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag,
    )

    run_this >> task

Templating

When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

The templates_dict argument is templated, so each value in the dictionary is evaluated as a Jinja template.

Google Cloud Platform Operators

GceInstanceStartOperator

Allows to start an existing Google Compute Engine instance.

In this example parameter values are extracted from Airflow variables. Moreover, the default_args dict is used to pass common arguments to all operators in a single DAG.

PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
INSTANCE = models.Variable.get('INSTANCE', '')
SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
SET_MACHINE_TYPE_BODY = {
    'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
}

default_args = {
    'start_date': airflow.utils.dates.days_ago(1)
}

Define the GceInstanceStartOperator by passing the required arguments to the constructor.

gce_instance_start = GceInstanceStartOperator(
    project_id=PROJECT_ID,
    zone=LOCATION,
    resource_id=INSTANCE,
    task_id='gcp_compute_start_task'
)

GceInstanceStopOperator

Allows to stop an existing Google Compute Engine instance.

For parameter definition take a look at GceInstanceStartOperator above.

Define the GceInstanceStopOperator by passing the required arguments to the constructor.

gce_instance_stop = GceInstanceStopOperator(
    project_id=PROJECT_ID,
    zone=LOCATION,
    resource_id=INSTANCE,
    task_id='gcp_compute_stop_task'
)

GceSetMachineTypeOperator

Allows to change the machine type for a stopped instance to the specified machine type.

For parameter definition take a look at GceInstanceStartOperator above.

Define the GceSetMachineTypeOperator by passing the required arguments to the constructor.

gce_set_machine_type = GceSetMachineTypeOperator(
    project_id=PROJECT_ID,
    zone=LOCATION,
    resource_id=INSTANCE,
    body=SET_MACHINE_TYPE_BODY,
    task_id='gcp_compute_set_machine_type'
)

GcfFunctionDeleteOperator

Use the default_args dict to pass arguments to the operator.

PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
# A fully-qualified name of the function to delete

FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                               ENTRYPOINT)
default_args = {
    'start_date': airflow.utils.dates.days_ago(1)
}

Use the GcfFunctionDeleteOperator to delete a function from Google Cloud Functions.

    t1 = GcfFunctionDeleteOperator(
        task_id="gcf_delete_task",
        name=FUNCTION_NAME
    )

Troubleshooting

If you want to run or deploy an operator using a service account and get “forbidden 403” errors, it means that your service account does not have the correct Cloud IAM permissions.

  1. Assign your Service Account the Cloud Functions Developer role.
  2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.

The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.

gcloud iam service-accounts add-iam-policy-binding \
  PROJECT_ID@appspot.gserviceaccount.com \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"

See Adding the IAM service agent user role to the runtime service for details

GcfFunctionDeployOperator

Use the GcfFunctionDeployOperator to deploy a function from Google Cloud Functions.

The following examples of Airflow variables show various variants and combinations of default_args that you can use. The variables are defined as follows:

PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                               ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)

With those variables you can define the body of the request:

body = {
    "name": FUNCTION_NAME,
    "entryPoint": ENTRYPOINT,
    "runtime": RUNTIME,
    "httpsTrigger": {}
}

When you create a DAG, the default_args dictionary can be used to pass the body and other arguments:

default_args = {
    'start_date': dates.days_ago(1),
    'project_id': PROJECT_ID,
    'location': LOCATION,
    'body': body,
    'validate_body': VALIDATE_BODY
}

Note that the neither the body nor the default args are complete in the above examples. Depending on the set variables, there might be different variants on how to pass source code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository or sourceUploadUrl as described in the CloudFunction API specification. Additionally, default_args might contain zip_path parameter to run the extra step of uploading the source code before deploying it. In the last case, you also need to provide an empty sourceUploadUrl parameter in the body.

Based on the variables defined above, example logic of setting the source code related fields is shown here:

if SOURCE_ARCHIVE_URL:
    body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
    body['sourceRepository'] = {
        'url': SOURCE_REPOSITORY
    }
elif ZIP_PATH:
    body['sourceUploadUrl'] = ''
    default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
    body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
    raise Exception("Please provide one of the source_code parameters")

The code to create the operator:

    deploy_task = GcfFunctionDeployOperator(
        task_id="gcf_deploy_task",
        name=FUNCTION_NAME
    )

Troubleshooting

If you want to run or deploy an operator using a service account and get “forbidden 403” errors, it means that your service account does not have the correct Cloud IAM permissions.

  1. Assign your Service Account the Cloud Functions Developer role.
  2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.

The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.

gcloud iam service-accounts add-iam-policy-binding \
  PROJECT_ID@appspot.gserviceaccount.com \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"

See Adding the IAM service agent user role to the runtime service for details

If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary.

CloudSqlInstanceDatabaseCreateOperator

Creates a new database inside a Cloud SQL instance.

For parameter definition take a look at CloudSqlInstanceDatabaseCreateOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
    project_id=PROJECT_ID,
    body=db_create_body,
    instance=INSTANCE_NAME,
    task_id='sql_db_create_task'
)

Example request body:

db_create_body = {
    "instance": INSTANCE_NAME,
    "name": DB_NAME,
    "project": PROJECT_ID
}

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlInstanceDatabaseDeleteOperator

Deletes a database from a Cloud SQL instance.

For parameter definition take a look at CloudSqlInstanceDatabaseDeleteOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
    project_id=PROJECT_ID,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id='sql_db_delete_task'
)

Templating

template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
                   'api_version')

CloudSqlInstanceDatabasePatchOperator

Updates a resource containing information about a database inside a Cloud SQL instance using patch semantics. See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

For parameter definition take a look at CloudSqlInstanceDatabasePatchOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
    project_id=PROJECT_ID,
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id='sql_db_patch_task'
)

Example request body:

db_patch_body = {
    "charset": "utf16",
    "collation": "utf16_general_ci"
}

Templating

template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
                   'api_version')

CloudSqlInstanceDeleteOperator

Deletes a Cloud SQL instance in Google Cloud Platform.

For parameter definition take a look at CloudSqlInstanceDeleteOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
    project_id=PROJECT_ID,
    instance=INSTANCE_NAME,
    task_id='sql_instance_delete_task'
)

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlInstanceCreateOperator

Creates a new Cloud SQL instance in Google Cloud Platform.

For parameter definition take a look at CloudSqlInstanceCreateOperator.

If an instance with the same name exists, no action will be taken and the operator will succeed.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Example body defining the instance:

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {
            "binaryLogEnabled": True,
            "enabled": True,
            "startTime": "05:00"
        },
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {
            "zone": "europe-west4-a"
        },
        "maintenanceWindow": {
            "hour": 5,
            "day": 7,
            "updateTrack": "canary"
        },
        "pricingPlan": "PER_USE",
        "replicationType": "ASYNCHRONOUS",
        "storageAutoResize": False,
        "storageAutoResizeLimit": 0,
        "userLabels": {
            "my-key": "my-value"
        }
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

Using the operator

sql_instance_create_task = CloudSqlInstanceCreateOperator(
    project_id=PROJECT_ID,
    body=body,
    instance=INSTANCE_NAME,
    task_id='sql_instance_create_task'
)

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlInstancePatchOperator

Updates settings of a Cloud SQL instance in Google Cloud Platform (partial update).

For parameter definition take a look at CloudSqlInstancePatchOperator.

This is a partial update, so only values for the settings specified in the body will be set / updated. The rest of the existing instance’s configuration will remain unchanged.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Example body defining the instance:

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {
            "hour": 3,
            "day": 6,
            "updateTrack": "canary"
        },
        "userLabels": {
            "my-key-patch": "my-value-patch"
        }
    }
}

Using the operator

sql_instance_patch_task = CloudSqlInstancePatchOperator(
    project_id=PROJECT_ID,
    body=patch_body,
    instance=INSTANCE_NAME,
    task_id='sql_instance_patch_task'
)

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')