# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import subprocess
import tempfile
from airflow import AirflowException
from airflow.contrib.hooks.gcp_container_hook import GKEClusterHook
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
[docs]class GKEClusterDeleteOperator(BaseOperator):
    """
    Deletes the cluster, including the Kubernetes endpoint and all worker nodes.
    To delete a certain cluster, you must specify the ``project_id``, the ``name``
    of the cluster, the ``location`` that the cluster is in, and the ``task_id``.
    **Operator Creation**: ::
        operator = GKEClusterDeleteOperator(
                    task_id='cluster_delete',
                    project_id='my-project',
                    location='cluster-location'
                    name='cluster-name')
    .. seealso::
        For more detail about deleting clusters have a look at the reference:
        https://google-cloud-python.readthedocs.io/en/latest/container/gapic/v1/api.html#google.cloud.container_v1.ClusterManagerClient.delete_cluster
    :param project_id: The Google Developers Console [project ID or project number]
    :type project_id: str
    :param name: The name of the resource to delete, in this case cluster name
    :type name: str
    :param location: The name of the Google Compute Engine zone in which the cluster
        resides.
    :type location: str
    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
    :type gcp_conn_id: str
    :param api_version: The api version to use
    :type api_version: str
    """
[docs]    template_fields = ['project_id', 'gcp_conn_id', 'name', 'location', 'api_version'] 
    @apply_defaults
    def __init__(self,
                 project_id,
                 name,
                 location,
                 gcp_conn_id='google_cloud_default',
                 api_version='v2',
                 *args,
                 **kwargs):
        super(GKEClusterDeleteOperator, self).__init__(*args, **kwargs)
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.location = location
        self.api_version = api_version
        self.name = name
[docs]    def execute(self, context):
        self._check_input()
        hook = GKEClusterHook(gcp_conn_id=self.gcp_conn_id, location=self.location)
        delete_result = hook.delete_cluster(name=self.name, project_id=self.project_id)
        return delete_result  
[docs]class GKEClusterCreateOperator(BaseOperator):
    """
    Create a Google Kubernetes Engine Cluster of specified dimensions
    The operator will wait until the cluster is created.
    The **minimum** required to define a cluster to create is:
    ``dict()`` ::
        cluster_def = {'name': 'my-cluster-name',
                       'initial_node_count': 1}
    or
    ``Cluster`` proto ::
        from google.cloud.container_v1.types import Cluster
        cluster_def = Cluster(name='my-cluster-name', initial_node_count=1)
    **Operator Creation**: ::
        operator = GKEClusterCreateOperator(
                    task_id='cluster_create',
                    project_id='my-project',
                    location='my-location'
                    body=cluster_def)
    .. seealso::
        For more detail on about creating clusters have a look at the reference:
        :class:`google.cloud.container_v1.types.Cluster`
    :param project_id: The Google Developers Console [project ID or project number]
    :type project_id: str
    :param location: The name of the Google Compute Engine zone in which the cluster
        resides.
    :type location: str
    :param body: The Cluster definition to create, can be protobuf or python dict, if
        dict it must match protobuf message Cluster
    :type body: dict or google.cloud.container_v1.types.Cluster
    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
    :type gcp_conn_id: str
    :param api_version: The api version to use
    :type api_version: str
    """
[docs]    template_fields = ['project_id', 'gcp_conn_id', 'location', 'api_version', 'body'] 
    @apply_defaults
    def __init__(self,
                 project_id,
                 location,
                 body=None,
                 gcp_conn_id='google_cloud_default',
                 api_version='v2',
                 *args,
                 **kwargs):
        super(GKEClusterCreateOperator, self).__init__(*args, **kwargs)
        if body is None:
            body = {}
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.location = location
        self.api_version = api_version
        self.body = body
[docs]    def execute(self, context):
        self._check_input()
        hook = GKEClusterHook(gcp_conn_id=self.gcp_conn_id, location=self.location)
        create_op = hook.create_cluster(cluster=self.body, project_id=self.project_id)
        return create_op  
[docs]KUBE_CONFIG_ENV_VAR = "KUBECONFIG" 
[docs]G_APP_CRED = "GOOGLE_APPLICATION_CREDENTIALS" 
[docs]class GKEPodOperator(KubernetesPodOperator):
    """
    Executes a task in a Kubernetes pod in the specified Google Kubernetes
    Engine cluster
    This Operator assumes that the system has gcloud installed and either
    has working default application credentials or has configured a
    connection id with a service account.
    The **minimum** required to define a cluster to create are the variables
    ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``,
    ``namespace``, and ``image``
    **Operator Creation**: ::
        operator = GKEPodOperator(task_id='pod_op',
                                  project_id='my-project',
                                  location='us-central1-a',
                                  cluster_name='my-cluster-name',
                                  name='task-name',
                                  namespace='default',
                                  image='perl')
    .. seealso::
        For more detail about application authentication have a look at the reference:
        https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
    :param project_id: The Google Developers Console project id
    :type project_id: str
    :param location: The name of the Google Kubernetes Engine zone in which the
        cluster resides, e.g. 'us-central1-a'
    :type location: str
    :param cluster_name: The name of the Google Kubernetes Engine cluster the pod
        should be spawned in
    :type cluster_name: str
    :param gcp_conn_id: The google cloud connection id to use. This allows for
        users to specify a service account.
    :type gcp_conn_id: str
    """
[docs]    template_fields = ('project_id', 'location',
                       'cluster_name') + KubernetesPodOperator.template_fields 
    @apply_defaults
    def __init__(self,
                 project_id,
                 location,
                 cluster_name,
                 gcp_conn_id='google_cloud_default',
                 *args,
                 **kwargs):
        super(GKEPodOperator, self).__init__(*args, **kwargs)
        self.project_id = project_id
        self.location = location
        self.cluster_name = cluster_name
        self.gcp_conn_id = gcp_conn_id
[docs]    def execute(self, context):
        # Specifying a service account file allows the user to using non default
        # authentication for creating a Kubernetes Pod. This is done by setting the
        # environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud looks at.
        key_file = None
        # If gcp_conn_id is not specified gcloud will use the default
        # service account credentials.
        if self.gcp_conn_id:
            from airflow.hooks.base_hook import BaseHook
            # extras is a deserialized json object
            extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson
            # key_file only gets set if a json file is created from a JSON string in
            # the web ui, else none
            key_file = self._set_env_from_extras(extras=extras)
        # Write config to a temp file and set the environment variable to point to it.
        # This is to avoid race conditions of reading/writing a single file
        with tempfile.NamedTemporaryFile() as conf_file:
            os.environ[KUBE_CONFIG_ENV_VAR] = conf_file.name
            # Attempt to get/update credentials
            # We call gcloud directly instead of using google-cloud-python api
            # because there is no way to write kubernetes config to a file, which is
            # required by KubernetesPodOperator.
            # The gcloud command looks at the env variable `KUBECONFIG` for where to save
            # the kubernetes config file.
            subprocess.check_call(
                ["gcloud", "container", "clusters", "get-credentials",
                 self.cluster_name,
                 "--zone", self.location,
                 "--project", self.project_id])
            # Since the key file is of type mkstemp() closing the file will delete it from
            # the file system so it cannot be accessed after we don't need it anymore
            if key_file:
                key_file.close()
            # Tell `KubernetesPodOperator` where the config file is located
            self.config_file = os.environ[KUBE_CONFIG_ENV_VAR]
            super(GKEPodOperator, self).execute(context) 
[docs]    def _get_field(self, extras, field, default=None):
        """
        Fetches a field from extras, and returns it. This is some Airflow
        magic. The google_cloud_platform hook type adds custom UI elements
        to the hook page, which allow admins to specify service_account,
        key_path, etc. They get formatted as shown below.
        """
        long_f = 'extra__google_cloud_platform__{}'.format(field)
        if long_f in extras:
            return extras[long_f]
        else:
            self.log.info('Field %s not found in extras.', field)
            return default