#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import ast
import re
import time
from collections.abc import Iterable, Mapping, Sequence
from functools import cached_property
from typing import Any
from urllib.error import HTTPError, URLError
import jenkins
from jenkins import Jenkins, JenkinsException
from requests import Request
from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
[docs]
JenkinsRequest = Mapping[str, Any]
[docs]
ParamType = str | dict | list | None
[docs]
class JenkinsJobTriggerOperator(BaseOperator):
"""
Trigger a Jenkins Job and monitor its execution.
This operator depend on the python-jenkins library version >= 0.4.15 to
communicate with the Jenkins server. You'll also need to configure a Jenkins
connection in the connections screen.
:param jenkins_connection_id: The jenkins connection to use for this job
:param job_name: The name of the job to trigger
:param parameters: The parameters block provided to jenkins for use in
the API call when triggering a build. (templated)
:param sleep_time: How long will the operator sleep between each status
request for the job (min 1, default 10)
:param max_try_before_job_appears: The maximum number of requests to make
while waiting for the job to appears on jenkins server (default 10)
:param allowed_jenkins_states: Iterable of allowed result jenkins states, default is ``['SUCCESS']``
"""
[docs]
template_fields: Sequence[str] = ("parameters",)
[docs]
template_ext: Sequence[str] = (".json",)
def __init__(
self,
*,
jenkins_connection_id: str,
job_name: str,
parameters: ParamType = None,
sleep_time: int = 10,
max_try_before_job_appears: int = 10,
allowed_jenkins_states: Iterable[str] | None = None,
**kwargs,
):
super().__init__(**kwargs)
[docs]
self.job_name = job_name
[docs]
self.parameters = parameters
[docs]
self.sleep_time = max(sleep_time, 1)
[docs]
self.jenkins_connection_id = jenkins_connection_id
[docs]
self.max_try_before_job_appears = max_try_before_job_appears
[docs]
self.allowed_jenkins_states = list(allowed_jenkins_states) if allowed_jenkins_states else ["SUCCESS"]
[docs]
def build_job(self, jenkins_server: Jenkins, params: ParamType = None) -> JenkinsRequest:
"""
Trigger a build job.
This returns a dict with 2 keys ``body`` and ``headers``. ``headers``
contains also a dict-like object which can be queried to get the
location to poll in the queue.
:param jenkins_server: The jenkins server where the job should be triggered
:param params: The parameters block to provide to jenkins API call.
:return: Dict containing the response body (key body)
and the headers coming along (headers)
"""
# Since params can be either JSON string, dictionary, or list,
# check type and pass to build_job_url
if params and isinstance(params, str):
params = ast.literal_eval(params)
request = Request(method="POST", url=jenkins_server.build_job_url(self.job_name, params, None))
return jenkins_request_with_headers(jenkins_server, request)
[docs]
def poll_job_in_queue(self, location: str, jenkins_server: Jenkins) -> int:
"""
Poll the jenkins queue until the job is executed.
When we trigger a job through an API call, the job is first put in the
queue without having a build number assigned. We have to wait until the
job exits the queue to know its build number.
To do so, we use get_queue_item to get information about a queued item
https://python-jenkins.readthedocs.io/en/latest/api.html#jenkins.Jenkins.get_queue_item
:param location: Location to poll, returned in the header of the build_job call
:param jenkins_server: The jenkins server to poll
:return: The build_number corresponding to the triggered job
"""
self.log.info("Polling jenkins queue at the url %s", location)
if not (match := re.search(r"/queue/item/(\d+)/?", location)):
raise ValueError(f"Invalid queue location format: {location}")
queue_id = int(match.group(1))
self.log.info("Polling Jenkins queue item with ID %s", queue_id)
for attempt in range(self.max_try_before_job_appears):
# Initialize it to prevent UnboundLocalError in case of exception raised
json_response = None
if attempt:
time.sleep(self.sleep_time)
try:
json_response = jenkins_server.get_queue_item(queue_id)
except (HTTPError, JenkinsException):
self.log.warning("polling failed, retrying", exc_info=True)
if json_response:
# The returned dict will have an "executable" key if the queued item is running on an executor,
# or has completed running.
if (
json_response.get("executable", None) is not None
and "number" in json_response["executable"]
):
build_number = json_response["executable"]["number"]
self.log.info("Job executed on Jenkins side with the build number %s", build_number)
return build_number
self.log.debug("Job not yet started. Queue item: %s", json_response)
raise AirflowException(
f"The job hasn't been executed after polling the queue {self.max_try_before_job_appears} times"
)
@cached_property
[docs]
def hook(self) -> JenkinsHook:
"""Instantiate the Jenkins hook."""
return JenkinsHook(self.jenkins_connection_id)
[docs]
def execute(self, context: Mapping[Any, Any]) -> str | None:
self.log.info(
"Triggering the job %s on the jenkins : %s with the parameters : %s",
self.job_name,
self.jenkins_connection_id,
self.parameters,
)
jenkins_server = self.hook.get_jenkins_server()
jenkins_response = self.build_job(jenkins_server, self.parameters)
build_number = self.poll_job_in_queue(jenkins_response["headers"]["Location"], jenkins_server)
time.sleep(self.sleep_time)
keep_polling_job = True
build_info = None
try:
while keep_polling_job:
build_info = jenkins_server.get_build_info(name=self.job_name, number=build_number)
if build_info["result"] is not None:
keep_polling_job = False
# Check if job ended with not allowed state.
if build_info["result"] not in self.allowed_jenkins_states:
raise AirflowException(
f"Jenkins job failed, final state : {build_info['result']}. "
f"Find more information on job url : {build_info['url']}"
)
else:
self.log.info("Waiting for job to complete : %s , build %s", self.job_name, build_number)
time.sleep(self.sleep_time)
except jenkins.NotFoundException as err:
raise AirflowException(f"Jenkins job status check failed. Final error was: {err.resp.status}")
except jenkins.JenkinsException as err:
raise AirflowException(
f"Jenkins call failed with error : {err}, if you have parameters "
"double check them, jenkins sends back "
"this exception for unknown parameters"
"You can also check logs for more details on this exception "
"(jenkins_url/log/rss)"
)
if build_info:
# If we can we return the url of the job
# for later use (like retrieving an artifact)
return build_info["url"]
return None