Source code for airflow.providers.airbyte.hooks.airbyte

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from typing import Any, Optional

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook

[docs]class AirbyteHook(HttpHook): """ Hook for Airbyte API :param airbyte_conn_id: Required. The name of the Airflow connection to get connection information for Airbyte. :type airbyte_conn_id: str :param api_version: Optional. Airbyte API version. :type api_version: str """
[docs] conn_name_attr = 'airbyte_conn_id'
[docs] default_conn_name = 'airbyte_default'
[docs] conn_type = 'airbyte'
[docs] hook_name = 'Airbyte'
[docs] RUNNING = "running"
[docs] SUCCEEDED = "succeeded"
[docs] CANCELLED = "cancelled"
[docs] PENDING = "pending"
[docs] FAILED = "failed"
[docs] ERROR = "error"
[docs] INCOMPLETE = "incomplete"
def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None: super().__init__(http_conn_id=airbyte_conn_id) self.api_version: str = api_version
[docs] def wait_for_job( self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600 ) -> None: """ Helper method which polls a job to check if it finishes. :param job_id: Required. Id of the Airbyte job :type job_id: str :param wait_seconds: Optional. Number of seconds between checks. :type wait_seconds: float :param timeout: Optional. How many seconds wait for job to be ready. Used only if ``asynchronous`` is False. :type timeout: float """ state = None start = time.monotonic() while True: if timeout and start + timeout < time.monotonic(): raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s") time.sleep(wait_seconds) try: job = self.get_job(job_id=job_id) state = job.json()["job"]["status"] except AirflowException as err:"Retrying. Airbyte API returned server error when waiting for job: %s", err) continue if state in (self.RUNNING, self.PENDING, self.INCOMPLETE): continue if state == self.SUCCEEDED: break if state == self.ERROR: raise AirflowException(f"Job failed:\n{job}") elif state == self.CANCELLED: raise AirflowException(f"Job was cancelled:\n{job}") else: raise Exception(f"Encountered unexpected state `{state}` for job_id `{job_id}`")
[docs] def submit_sync_connection(self, connection_id: str) -> Any: """ Submits a job to a Airbyte server. :param connection_id: Required. The ConnectionId of the Airbyte Connection. :type connection_id: str """ return endpoint=f"api/{self.api_version}/connections/sync", json={"connectionId": connection_id}, headers={"accept": "application/json"},
[docs] def get_job(self, job_id: int) -> Any: """ Gets the resource representation for a job in Airbyte. :param job_id: Required. Id of the Airbyte job :type job_id: int """ return endpoint=f"api/{self.api_version}/jobs/get", json={"id": job_id}, headers={"accept": "application/json"},
[docs] def test_connection(self): """Tests the Airbyte connection by hitting the health API""" self.method = 'GET' try: res = endpoint=f"api/{self.api_version}/health", headers={"accept": "application/json"}, extra_options={'check_response': False}, ) if res.status_code == 200: return True, 'Connection successfully tested' else: return False, res.text except Exception as e: return False, str(e) finally: self.method = 'POST'

