Source code for airflow.contrib.operators.gcp_compute_operator

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from copy import deepcopy

from googleapiclient.errors import HttpError

from airflow import AirflowException
from airflow.contrib.hooks.gcp_compute_hook import GceHook
from airflow.contrib.utils.gcp_field_sanitizer import GcpBodyFieldSanitizer
from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from json_merge_patch import merge


[docs]class GceBaseOperator(BaseOperator): """ Abstract base operator for Google Compute Engine operators to inherit from. """ @apply_defaults def __init__(self, zone, resource_id, project_id=None, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs): self.project_id = project_id self.zone = zone self.resource_id = resource_id self.gcp_conn_id = gcp_conn_id self.api_version = api_version self._validate_inputs() self._hook = GceHook(gcp_conn_id=self.gcp_conn_id, api_version=self.api_version) super(GceBaseOperator, self).__init__(*args, **kwargs) def _validate_inputs(self): if self.project_id == '': raise AirflowException("The required parameter 'project_id' is missing") if not self.zone: raise AirflowException("The required parameter 'zone' is missing") if not self.resource_id: raise AirflowException("The required parameter 'resource_id' is missing") def execute(self, context): pass
[docs]class GceInstanceStartOperator(GceBaseOperator): """ Starts an instance in Google Compute Engine. :param zone: Google Cloud Platform zone where the instance exists. :type zone: str :param resource_id: Name of the Compute Engine instance resource. :type resource_id: str :param project_id: Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used. :type project_id: str :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to 'google_cloud_default'. :type gcp_conn_id: str :param api_version: Optional, API version used (for example v1 - or beta). Defaults to v1. :type api_version: str :param validate_body: Optional, If set to False, body validation is not performed. Defaults to False. """ # [START gce_instance_start_template_fields] template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version') # [END gce_instance_start_template_fields] @apply_defaults def __init__(self, zone, resource_id, project_id=None, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs): super(GceInstanceStartOperator, self).__init__( project_id=project_id, zone=zone, resource_id=resource_id, gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) def execute(self, context): return self._hook.start_instance(zone=self.zone, resource_id=self.resource_id, project_id=self.project_id)
[docs]class GceInstanceStopOperator(GceBaseOperator): """ Stops an instance in Google Compute Engine. :param zone: Google Cloud Platform zone where the instance exists. :type zone: str :param resource_id: Name of the Compute Engine instance resource. :type resource_id: str :param project_id: Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used. :type project_id: str :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to 'google_cloud_default'. :type gcp_conn_id: str :param api_version: Optional, API version used (for example v1 - or beta). Defaults to v1. :type api_version: str :param validate_body: Optional, If set to False, body validation is not performed. Defaults to False. """ # [START gce_instance_stop_template_fields] template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version') # [END gce_instance_stop_template_fields] @apply_defaults def __init__(self, zone, resource_id, project_id=None, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs): super(GceInstanceStopOperator, self).__init__( project_id=project_id, zone=zone, resource_id=resource_id, gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) def execute(self, context): self._hook.stop_instance(zone=self.zone, resource_id=self.resource_id, project_id=self.project_id)
SET_MACHINE_TYPE_VALIDATION_SPECIFICATION = [ dict(name="machineType", regexp="^.+$"), ]
[docs]class GceSetMachineTypeOperator(GceBaseOperator): """ Changes the machine type for a stopped instance to the machine type specified in the request. :param zone: Google Cloud Platform zone where the instance exists. :type zone: str :param resource_id: Name of the Compute Engine instance resource. :type resource_id: str :param body: Body required by the Compute Engine setMachineType API, as described in https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType#request-body :type body: dict :param project_id: Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used. :type project_id: str :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to 'google_cloud_default'. :type gcp_conn_id: str :param api_version: Optional, API version used (for example v1 - or beta). Defaults to v1. :type api_version: str :param validate_body: Optional, If set to False, body validation is not performed. Defaults to False. :type validate_body: bool """ # [START gce_instance_set_machine_type_template_fields] template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version') # [END gce_instance_set_machine_type_template_fields] @apply_defaults def __init__(self, zone, resource_id, body, project_id=None, gcp_conn_id='google_cloud_default', api_version='v1', validate_body=True, *args, **kwargs): self.body = body self._field_validator = None if validate_body: self._field_validator = GcpBodyFieldValidator( SET_MACHINE_TYPE_VALIDATION_SPECIFICATION, api_version=api_version) super(GceSetMachineTypeOperator, self).__init__( project_id=project_id, zone=zone, resource_id=resource_id, gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) def _validate_all_body_fields(self): if self._field_validator: self._field_validator.validate(self.body) def execute(self, context): self._validate_all_body_fields() return self._hook.set_machine_type(zone=self.zone, resource_id=self.resource_id, body=self.body, project_id=self.project_id)
GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [ dict(name="name", regexp="^.+$"), dict(name="description", optional=True), dict(name="properties", type='dict', optional=True, fields=[ dict(name="description", optional=True), dict(name="tags", optional=True, fields=[ dict(name="items", optional=True) ]), dict(name="machineType", optional=True), dict(name="canIpForward", optional=True), dict(name="networkInterfaces", optional=True), # not validating deeper dict(name="disks", optional=True), # not validating the array deeper dict(name="metadata", optional=True, fields=[ dict(name="fingerprint", optional=True), dict(name="items", optional=True), dict(name="kind", optional=True), ]), dict(name="serviceAccounts", optional=True), # not validating deeper dict(name="scheduling", optional=True, fields=[ dict(name="onHostMaintenance", optional=True), dict(name="automaticRestart", optional=True), dict(name="preemptible", optional=True), dict(name="nodeAffinitites", optional=True), # not validating deeper ]), dict(name="labels", optional=True), dict(name="guestAccelerators", optional=True), # not validating deeper dict(name="minCpuPlatform", optional=True), ]), ] GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [ "kind", "id", "name", "creationTimestamp", "properties.disks.sha256", "properties.disks.kind", "properties.disks.sourceImageEncryptionKey.sha256", "properties.disks.index", "properties.disks.licenses", "properties.networkInterfaces.kind", "properties.networkInterfaces.accessConfigs.kind", "properties.networkInterfaces.name", "properties.metadata.kind", "selfLink" ]
[docs]class GceInstanceTemplateCopyOperator(GceBaseOperator): """ Copies the instance template, applying specified changes. :param resource_id: Name of the Instance Template :type resource_id: str :param body_patch: Patch to the body of instanceTemplates object following rfc7386 PATCH semantics. The body_patch content follows https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates Name field is required as we need to rename the template, all the other fields are optional. It is important to follow PATCH semantics - arrays are replaced fully, so if you need to update an array you should provide the whole target array as patch element. :type body_patch: dict :param project_id: Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used. :type project_id: str :param request_id: Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122. :type request_id: str :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to 'google_cloud_default'. :type gcp_conn_id: str :param api_version: Optional, API version used (for example v1 - or beta). Defaults to v1. :type api_version: str :param validate_body: Optional, If set to False, body validation is not performed. Defaults to False. :type validate_body: bool """ # [START gce_instance_template_copy_operator_template_fields] template_fields = ('project_id', 'resource_id', 'request_id', 'gcp_conn_id', 'api_version') # [END gce_instance_template_copy_operator_template_fields] @apply_defaults def __init__(self, resource_id, body_patch, project_id=None, request_id=None, gcp_conn_id='google_cloud_default', api_version='v1', validate_body=True, *args, **kwargs): self.body_patch = body_patch self.request_id = request_id self._field_validator = None if 'name' not in self.body_patch: raise AirflowException("The body '{}' should contain at least " "name for the new operator in the 'name' field". format(body_patch)) if validate_body: self._field_validator = GcpBodyFieldValidator( GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION, api_version=api_version) self._field_sanitizer = GcpBodyFieldSanitizer( GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE) super(GceInstanceTemplateCopyOperator, self).__init__( project_id=project_id, zone='global', resource_id=resource_id, gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) def _validate_all_body_fields(self): if self._field_validator: self._field_validator.validate(self.body_patch) def execute(self, context): self._validate_all_body_fields() try: # Idempotence check (sort of) - we want to check if the new template # is already created and if is, then we assume it was created by previous run # of CopyTemplate operator - we do not check if content of the template # is as expected. Templates are immutable so we cannot update it anyway # and deleting/recreating is not worth the hassle especially # that we cannot delete template if it is already used in some Instance # Group Manager. We assume success if the template is simply present existing_template = self._hook.get_instance_template( resource_id=self.body_patch['name'], project_id=self.project_id) self.log.info("The {} template already existed. It was likely " "created by previous run of the operator. Assuming success.") return existing_template except HttpError as e: # We actually expect to get 404 / Not Found here as the template should # not yet exist if not e.resp.status == 404: raise e old_body = self._hook.get_instance_template(resource_id=self.resource_id, project_id=self.project_id) new_body = deepcopy(old_body) self._field_sanitizer.sanitize(new_body) new_body = merge(new_body, self.body_patch) self.log.info("Calling insert instance template with updated body: {}". format(new_body)) self._hook.insert_instance_template(body=new_body, request_id=self.request_id, project_id=self.project_id) return self._hook.get_instance_template(resource_id=self.body_patch['name'], project_id=self.project_id)
[docs]class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator): """ Patches the Instance Group Manager, replacing source template URL with the destination one. API V1 does not have update/patch operations for Instance Group Manager, so you must use beta or newer API version. Beta is the default. :param resource_id: Name of the Instance Group Manager :type resource_id: str :param zone: Google Cloud Platform zone where the Instance Group Manager exists. :type zone: str :param source_template: URL of the template to replace. :type source_template: str :param destination_template: URL of the target template. :type destination_template: str :param project_id: Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used. :type project_id: str :param request_id: Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122. :type request_id: str :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to 'google_cloud_default'. :type gcp_conn_id: str :param api_version: Optional, API version used (for example v1 - or beta). Defaults to v1. :type api_version: str :param validate_body: Optional, If set to False, body validation is not performed. Defaults to False. :type validate_body: bool """ # [START gce_igm_update_template_operator_template_fields] template_fields = ('project_id', 'resource_id', 'zone', 'request_id', 'source_template', 'destination_template', 'gcp_conn_id', 'api_version') # [END gce_igm_update_template_operator_template_fields] @apply_defaults def __init__(self, resource_id, zone, source_template, destination_template, project_id=None, update_policy=None, request_id=None, gcp_conn_id='google_cloud_default', api_version='beta', *args, **kwargs): self.zone = zone self.source_template = source_template self.destination_template = destination_template self.request_id = request_id self.update_policy = update_policy self._change_performed = False if api_version == 'v1': raise AirflowException("Api version v1 does not have update/patch " "operations for Instance Group Managers. Use beta" " api version or above") super(GceInstanceGroupManagerUpdateTemplateOperator, self).__init__( project_id=project_id, zone=self.zone, resource_id=resource_id, gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) def _possibly_replace_template(self, dictionary): # type: (dict) -> None if dictionary.get('instanceTemplate') == self.source_template: dictionary['instanceTemplate'] = self.destination_template self._change_performed = True def execute(self, context): old_instance_group_manager = self._hook.get_instance_group_manager( zone=self.zone, resource_id=self.resource_id, project_id=self.project_id) patch_body = {} if 'versions' in old_instance_group_manager: patch_body['versions'] = old_instance_group_manager['versions'] if 'instanceTemplate' in old_instance_group_manager: patch_body['instanceTemplate'] = old_instance_group_manager['instanceTemplate'] if self.update_policy: patch_body['updatePolicy'] = self.update_policy self._possibly_replace_template(patch_body) if 'versions' in patch_body: for version in patch_body['versions']: self._possibly_replace_template(version) if self._change_performed or self.update_policy: self.log.info("Calling patch instance template with updated body: {}". format(patch_body)) return self._hook.patch_instance_group_manager( zone=self.zone, resource_id=self.resource_id, body=patch_body, request_id=self.request_id, project_id=self.project_id) else: # Idempotence achieved return True