Source code for airflow.providers.google.cloud.sensors.looker
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Google Cloud Looker sensors."""from__future__importannotationsfromtypingimportTYPE_CHECKINGfromairflow.exceptionsimportAirflowException,AirflowSkipExceptionfromairflow.providers.google.cloud.hooks.lookerimportJobStatus,LookerHookfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classLookerCheckPdtBuildSensor(BaseSensorOperator):""" Check for the state of a previously submitted PDT materialization job. :param materialization_id: Required. The materialization job ID to poll. (templated) :param looker_conn_id: Required. The connection ID to use connecting to Looker. :param cancel_on_kill: Optional. Flag which indicates whether cancel the hook's job or not, when on_kill is called. """
[docs]defpoke(self,context:Context)->bool:self.hook=LookerHook(looker_conn_id=self.looker_conn_id)ifnotself.materialization_id:# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1message="Invalid `materialization_id`."ifself.soft_fail:raiseAirflowSkipException(message)raiseAirflowException(message)# materialization_id is templated var pulling output from start taskstatus_dict=self.hook.pdt_build_status(materialization_id=self.materialization_id)status=status_dict["status"]ifstatus==JobStatus.ERROR.value:msg=status_dict["message"]# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1message=f'PDT materialization job failed. Job id: {self.materialization_id}. Message:\n"{msg}"'ifself.soft_fail:raiseAirflowSkipException(message)raiseAirflowException(message)elifstatus==JobStatus.CANCELLED.value:# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1message=f"PDT materialization job was cancelled. Job id: {self.materialization_id}."ifself.soft_fail:raiseAirflowSkipException(message)raiseAirflowException(message)elifstatus==JobStatus.UNKNOWN.value:# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1message=f"PDT materialization job has unknown status. Job id: {self.materialization_id}."ifself.soft_fail:raiseAirflowSkipException(message)raiseAirflowException(message)elifstatus==JobStatus.DONE.value:self.log.debug("PDT materialization job completed successfully. Job id: %s.",self.materialization_id)returnTrueself.log.info("Waiting for PDT materialization job to complete. Job id: %s.",self.materialization_id)returnFalse