Google Cloud Functions Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using Cloud Console.
Enable billing for your project, as described in Google Cloud documentation.
Enable API, as described in Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[gcp]'
Detailed information is available Installation
GcfFunctionDeleteOperator¶
Use the operator to delete a function from Google Cloud Functions.
For parameter definition, take a look at
GcfFunctionDeleteOperator
.
Arguments¶
The following examples of OS environment variables show how you can build function name to use in the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
replace("-", "_") # make sure there are no dashes in function name (!)
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
GCP_LOCATION,
GCF_SHORT_FUNCTION_NAME)
Using the operator¶
delete_task = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
Templating¶
template_fields = ('name', 'gcp_conn_id', 'api_version')
More information¶
See Google Cloud Functions API documentation to delete a function.
GcfFunctionDeployOperator¶
Use the operator to deploy a function to Google Cloud Functions. If a function with this name already exists, it will be updated.
For parameter definition, take a look at
GcfFunctionDeployOperator
.
Arguments¶
In the example DAG the following environment variables are used to parameterize the operator’s definition:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
replace("-", "_") # make sure there are no dashes in function name (!)
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
GCP_LOCATION,
GCF_SHORT_FUNCTION_NAME)
GCF_SOURCE_ARCHIVE_URL = os.environ.get('GCF_SOURCE_ARCHIVE_URL', '')
GCF_SOURCE_UPLOAD_URL = os.environ.get('GCF_SOURCE_UPLOAD_URL', '')
GCF_SOURCE_REPOSITORY = os.environ.get(
'GCF_SOURCE_REPOSITORY',
'https://source.developers.google.com/'
'projects/{}/repos/hello-world/moveable-aliases/master'.format(GCP_PROJECT_ID))
GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
GCF_RUNTIME = 'nodejs6'
GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', True)
Some of those variables are used to create the request’s body:
body = {
"name": FUNCTION_NAME,
"entryPoint": GCF_ENTRYPOINT,
"runtime": GCF_RUNTIME,
"httpsTrigger": {}
}
When a DAG is created, the default_args dictionary can be used to pass arguments common with other tasks:
default_args = {
'start_date': dates.days_ago(1)
}
Note that the neither the body nor the default args are complete in the above examples.
Depending on the variables set, there might be different variants on how to pass source
code related fields. Currently, you can pass either sourceArchiveUrl
,
sourceRepository
or sourceUploadUrl
as described in the
Cloud Functions API specification.
Additionally, default_args
or direct operator args might contain zip_path
parameter
to run the extra step of uploading the source code before deploying it.
In this case, you also need to provide an empty sourceUploadUrl
parameter in the body.
Using the operator¶
Depending on the combination of parameters, the Function’s source code can be obtained from different sources:
if GCF_SOURCE_ARCHIVE_URL:
body['sourceArchiveUrl'] = GCF_SOURCE_ARCHIVE_URL
elif GCF_SOURCE_REPOSITORY:
body['sourceRepository'] = {
'url': GCF_SOURCE_REPOSITORY
}
elif GCF_ZIP_PATH:
body['sourceUploadUrl'] = ''
default_args['zip_path'] = GCF_ZIP_PATH
elif GCF_SOURCE_UPLOAD_URL:
body['sourceUploadUrl'] = GCF_SOURCE_UPLOAD_URL
else:
raise Exception("Please provide one of the source_code parameters")
The code to create the operator:
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=body,
validate_body=GCP_VALIDATE_BODY
)
You can also create the operator without project id - project id will be retrieved from the GCP connection used:
deploy2_task = GcfFunctionDeployOperator(
task_id="gcf_deploy2_task",
location=GCP_LOCATION,
body=body,
validate_body=GCP_VALIDATE_BODY
)
Templating¶
template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')
Troubleshooting¶
If during the deploy you see an error similar to:
“HttpError 403: Missing necessary permission iam.serviceAccounts.actAs for on resource project-name@appspot.gserviceaccount.com. Please grant the roles/iam.serviceAccountUser role.”
it means that your service account does not have the correct Cloud IAM permissions.
Assign your Service Account the Cloud Functions Developer role.
Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.
The typical way of assigning Cloud IAM permissions with gcloud
is
shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project
and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
gcloud iam service-accounts add-iam-policy-binding \
PROJECT_ID@appspot.gserviceaccount.com \
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
You can also do that via the GCP Web console.
See Adding the IAM service agent user role to the runtime service for details.
If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary.
More information¶
See Google Cloud API documentation to create a function.