Google Cloud Functions Operators

Prerequisite Tasks

To use these operators, you must do a few things:

GcfFunctionDeleteOperator

Use the operator to delete a function from Google Cloud Functions.

For parameter definition, take a look at GcfFunctionDeleteOperator.

Arguments

The following examples of OS environment variables show how you can build function name to use in the operator:

airflow/contrib/example_dags/example_gcp_function.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
    replace("-", "_")  # make sure there are no dashes in function name (!)
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
                                                               GCP_LOCATION,
                                                               GCF_SHORT_FUNCTION_NAME)

Using the operator

airflow/contrib/example_dags/example_gcp_function.pyView Source

delete_task = GcfFunctionDeleteOperator(
    task_id="gcf_delete_task",
    name=FUNCTION_NAME
)

Templating

template_fields = ('name', 'gcp_conn_id', 'api_version')

More information

See Google Cloud Functions API documentation to delete a function.

GcfFunctionDeployOperator

Use the operator to deploy a function to Google Cloud Functions. If a function with this name already exists, it will be updated.

For parameter definition, take a look at GcfFunctionDeployOperator.

Arguments

In the example DAG the following environment variables are used to parameterize the operator’s definition:

airflow/contrib/example_dags/example_gcp_function.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
    replace("-", "_")  # make sure there are no dashes in function name (!)
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
                                                               GCP_LOCATION,
                                                               GCF_SHORT_FUNCTION_NAME)

airflow/contrib/example_dags/example_gcp_function.pyView Source

GCF_SOURCE_ARCHIVE_URL = os.environ.get('GCF_SOURCE_ARCHIVE_URL', '')
GCF_SOURCE_UPLOAD_URL = os.environ.get('GCF_SOURCE_UPLOAD_URL', '')
GCF_SOURCE_REPOSITORY = os.environ.get(
    'GCF_SOURCE_REPOSITORY',
    'https://source.developers.google.com/'
    'projects/{}/repos/hello-world/moveable-aliases/master'.format(GCP_PROJECT_ID))
GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
GCF_RUNTIME = 'nodejs6'
GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', True)

Some of those variables are used to create the request’s body:

airflow/contrib/example_dags/example_gcp_function.pyView Source

body = {
    "name": FUNCTION_NAME,
    "entryPoint": GCF_ENTRYPOINT,
    "runtime": GCF_RUNTIME,
    "httpsTrigger": {}
}

When a DAG is created, the default_args dictionary can be used to pass arguments common with other tasks:

airflow/contrib/example_dags/example_gcp_function.pyView Source

default_args = {
    'start_date': dates.days_ago(1)
}

Note that the neither the body nor the default args are complete in the above examples. Depending on the variables set, there might be different variants on how to pass source code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository or sourceUploadUrl as described in the Cloud Functions API specification.

Additionally, default_args or direct operator args might contain zip_path parameter to run the extra step of uploading the source code before deploying it. In this case, you also need to provide an empty sourceUploadUrl parameter in the body.

Using the operator

Depending on the combination of parameters, the Function’s source code can be obtained from different sources:

airflow/contrib/example_dags/example_gcp_function.pyView Source

if GCF_SOURCE_ARCHIVE_URL:
    body['sourceArchiveUrl'] = GCF_SOURCE_ARCHIVE_URL
elif GCF_SOURCE_REPOSITORY:
    body['sourceRepository'] = {
        'url': GCF_SOURCE_REPOSITORY
    }
elif GCF_ZIP_PATH:
    body['sourceUploadUrl'] = ''
    default_args['zip_path'] = GCF_ZIP_PATH
elif GCF_SOURCE_UPLOAD_URL:
    body['sourceUploadUrl'] = GCF_SOURCE_UPLOAD_URL
else:
    raise Exception("Please provide one of the source_code parameters")

The code to create the operator:

airflow/contrib/example_dags/example_gcp_function.pyView Source

deploy_task = GcfFunctionDeployOperator(
    task_id="gcf_deploy_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=body,
    validate_body=GCP_VALIDATE_BODY
)

You can also create the operator without project id - project id will be retrieved from the GCP connection used:

airflow/contrib/example_dags/example_gcp_function.pyView Source

deploy2_task = GcfFunctionDeployOperator(
    task_id="gcf_deploy2_task",
    location=GCP_LOCATION,
    body=body,
    validate_body=GCP_VALIDATE_BODY
)

Templating

template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')

Troubleshooting

If during the deploy you see an error similar to:

“HttpError 403: Missing necessary permission iam.serviceAccounts.actAs for on resource project-name@appspot.gserviceaccount.com. Please grant the roles/iam.serviceAccountUser role.”

it means that your service account does not have the correct Cloud IAM permissions.

  1. Assign your Service Account the Cloud Functions Developer role.

  2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.

The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.

gcloud iam service-accounts add-iam-policy-binding \
  PROJECT_ID@appspot.gserviceaccount.com \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"

You can also do that via the GCP Web console.

See Adding the IAM service agent user role to the runtime service for details.

If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary.

More information

See Google Cloud API documentation to create a function.

Reference

For further information, look at:

Was this entry helpful?