Complete the airflow survey & get a free airflow 3 certification!

Operators

Use the GithubOperator to execute Operations in a GitHub.

You can build your own operator using GithubOperator and passing github_method and github_method_args from top level PyGithub methods.

You can further process the result using result_processor Callable as you like.

An example of Listing all Repositories owned by a user, client.get_user().get_repos() can be implemented as following:

tests/system/github/example_github.py[source]


github_list_repos = GithubOperator(
    task_id="github_list_repos",
    github_method="get_user",
    result_processor=lambda user: logger.info(list(user.get_repos())),
)

An example of Listing Tags in a Repository, client.get_repo(full_name_or_id=’apache/airflow’).get_tags() can be implemented as following:

tests/system/github/example_github.py[source]


list_repo_tags = GithubOperator(
    task_id="list_repo_tags",
    github_method="get_repo",
    github_method_args={"full_name_or_id": "apache/airflow"},
    result_processor=lambda repo: logger.info(list(repo.get_tags())),
)

Sensors

You can build your own sensor using GithubSensor,

You can also implement your own sensor on Repository using BaseGithubRepositorySensor, an example of this is GithubTagSensor

Use the GithubTagSensor to wait for creation of a Tag in GitHub.

An example for tag v1.0:

tests/system/github/example_github.py[source]


tag_sensor = GithubTagSensor(
    task_id="example_tag_sensor",
    tag_name="v1.0",
    repository_name="apache/airflow",
    timeout=60,
    poke_interval=10,
)

Similar Functionality can be achieved by directly using GithubSensor.

tests/system/github/example_github.py[source]


def tag_checker(repo: Any, tag_name: str) -> bool | None:
    result = None
    try:
        if repo is not None and tag_name is not None:
            all_tags = [x.name for x in repo.get_tags()]
            result = tag_name in all_tags

    except GithubException as github_error:
        raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}")
    except Exception as e:
        raise AirflowException(f"GitHub operator error: {e}")
    return result

github_sensor = GithubSensor(
    task_id="example_sensor",
    method_name="get_repo",
    method_params={"full_name_or_id": "apache/airflow"},
    result_processor=lambda repo: tag_checker(repo, "v1.0"),
    timeout=60,
    poke_interval=10,
)

Was this entry helpful?