Source code for airflow.providers.github.sensors.github
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKING,Any,CallablefromgithubimportGithubExceptionfromairflowimportAirflowExceptionfromairflow.providers.github.hooks.githubimportGithubHookfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classGithubSensor(BaseSensorOperator):""" Base GithubSensor which can monitor for any change. :param github_conn_id: reference to a pre-defined GitHub Connection :param method_name: method name from PyGithub to be executed :param method_params: parameters for the method method_name :param result_processor: function that return boolean and act as a sensor response """def__init__(self,*,method_name:str,github_conn_id:str='github_default',method_params:dict|None=None,result_processor:Callable|None=None,**kwargs,)->None:super().__init__(**kwargs)self.github_conn_id=github_conn_idself.result_processor=Noneifresult_processorisnotNone:self.result_processor=result_processorself.method_name=method_nameself.method_params=method_params
[docs]classBaseGithubRepositorySensor(GithubSensor):""" Base GitHub sensor at Repository level. :param github_conn_id: reference to a pre-defined GitHub Connection :param repository_name: full qualified name of the repository to be monitored, ex. "apache/airflow" """def__init__(self,*,github_conn_id:str='github_default',repository_name:str|None=None,result_processor:Callable|None=None,**kwargs,)->None:super().__init__(github_conn_id=github_conn_id,result_processor=result_processor,method_name="get_repo",method_params={'full_name_or_id':repository_name},**kwargs,)
[docs]defpoke(self,context:Context)->bool:""" Function that the sensors defined while deriving this class should override. """raiseAirflowException('Override me.')
[docs]classGithubTagSensor(BaseGithubRepositorySensor):""" Monitors a github tag for its creation. :param github_conn_id: reference to a pre-defined GitHub Connection :param tag_name: name of the tag to be monitored :param repository_name: fully qualified name of the repository to be monitored, ex. "apache/airflow" """
[docs]defpoke(self,context:Context)->bool:self.log.info('Poking for tag: %s in repository: %s',self.tag_name,self.repository_name)returnGithubSensor.poke(self,context=context)
[docs]deftag_checker(self,repo:Any)->bool|None:"""Checking existence of Tag in a Repository"""result=Nonetry:ifrepoisnotNoneandself.tag_nameisnotNone:all_tags=[x.nameforxinrepo.get_tags()]result=self.tag_nameinall_tagsexceptGithubExceptionasgithub_error:# type: ignore[misc]raiseAirflowException(f"Failed to execute GithubSensor, error: {str(github_error)}")exceptExceptionase:raiseAirflowException(f"GitHub operator error: {str(e)}")ifresultisTrue:self.log.info("Tag %s exists in %s repository, Success.",self.tag_name,self.repository_name)else:self.log.info("Tag %s doesn't exists in %s repository yet.",self.tag_name,self.repository_name)returnresult