Source code for airflow.hooks.zendesk_hook

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import time
from zdesk import Zendesk, RateLimitError, ZendeskError
from airflow.hooks.base_hook import BaseHook


[docs]class ZendeskHook(BaseHook): """ A hook to talk to Zendesk """ def __init__(self, zendesk_conn_id): self.__zendesk_conn_id = zendesk_conn_id self.__url = None def get_conn(self): conn = self.get_connection(self.__zendesk_conn_id) self.__url = "https://" + conn.host return Zendesk(zdesk_url=self.__url, zdesk_email=conn.login, zdesk_password=conn.password, zdesk_token=True) def __handle_rate_limit_exception(self, rate_limit_exception): """ Sleep for the time specified in the exception. If not specified, wait for 60 seconds. """ retry_after = int( rate_limit_exception.response.headers.get('Retry-After', 60)) self.log.info( "Hit Zendesk API rate limit. Pausing for %s seconds", retry_after ) time.sleep(retry_after)
[docs] def call(self, path, query=None, get_all_pages=True, side_loading=False): """ Call Zendesk API and return results :param path: The Zendesk API to call :param query: Query parameters :param get_all_pages: Accumulate results over all pages before returning. Due to strict rate limiting, this can often timeout. Waits for recommended period between tries after a timeout. :param side_loading: Retrieve related records as part of a single request. In order to enable side-loading, add an 'include' query parameter containing a comma-separated list of resources to load. For more information on side-loading see https://developer.zendesk.com/rest_api/docs/core/side_loading """ zendesk = self.get_conn() first_request_successful = False while not first_request_successful: try: results = zendesk.call(path, query) first_request_successful = True except RateLimitError as rle: self.__handle_rate_limit_exception(rle) # Find the key with the results keys = [path.split("/")[-1].split(".json")[0]] next_page = results['next_page'] if side_loading: keys += query['include'].split(',') results = {key: results[key] for key in keys} if get_all_pages: while next_page is not None: try: # Need to split because the next page URL has # `github.zendesk...` # in it, but the call function needs it removed. next_url = next_page.split(self.__url)[1] self.log.info("Calling %s", next_url) more_res = zendesk.call(next_url) for key in results: results[key].extend(more_res[key]) if next_page == more_res['next_page']: # Unfortunately zdesk doesn't always throw ZendeskError # when we are done getting all the data. Sometimes the # next just refers to the current set of results. # Hence, need to deal with this special case break else: next_page = more_res['next_page'] except RateLimitError as rle: self.__handle_rate_limit_exception(rle) except ZendeskError as ze: if b"Use a start_time older than 5 minutes" in ze.msg: # We have pretty up to date data break else: raise ze return results