Operators¶
Use the GithubOperator to execute
Operations in a GitHub.
You can build your own operator using GithubOperator
and passing github_method
and github_method_args
from top level PyGithub methods.
You can further process the result using
result_processor Callable as you like.
An example of Listing all Repositories owned by a user, client.get_user().get_repos() can be implemented as following:
github_list_repos = GithubOperator(
task_id="github_list_repos",
github_method="get_user",
result_processor=lambda user: logger.info(list(user.get_repos())),
)
An example of Listing Tags in a Repository, client.get_repo(full_name_or_id=’apache/airflow’).get_tags() can be implemented as following:
list_repo_tags = GithubOperator(
task_id="list_repo_tags",
github_method="get_repo",
github_method_args={"full_name_or_id": "apache/airflow"},
result_processor=lambda repo: logger.info(list(repo.get_tags())),
)
Sensors¶
You can build your own sensor using GithubSensor,
You can also implement your own sensor on Repository using BaseGithubRepositorySensor,
an example of this is GithubTagSensor
Use the GithubTagSensor to wait for creation of
a Tag in GitHub.
An example for tag v1.0:
tag_sensor = GithubTagSensor(
task_id="example_tag_sensor",
tag_name="v1.0",
repository_name="apache/airflow",
timeout=60,
poke_interval=10,
)
Similar Functionality can be achieved by directly using
GithubSensor.
def tag_checker(repo: Any, tag_name: str) -> bool | None:
result = None
try:
if repo is not None and tag_name is not None:
all_tags = [x.name for x in repo.get_tags()]
result = tag_name in all_tags
except GithubException as github_error:
raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}")
except Exception as e:
raise AirflowException(f"GitHub operator error: {e}")
return result
github_sensor = GithubSensor(
task_id="example_sensor",
method_name="get_repo",
method_params={"full_name_or_id": "apache/airflow"},
result_processor=lambda repo: tag_checker(repo, "v1.0"),
timeout=60,
poke_interval=10,
)