Source code for airflow.providers.tableau.hooks.tableau
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import time
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from tableauserverclient import JWTAuth, Pager, Server, TableauAuth
from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseHook
from airflow.utils.helpers import exactly_one
if TYPE_CHECKING:
    from tableauserverclient.server import Auth
[docs]
def parse_boolean(val: str) -> str | bool:
    """
    Try to parse a string into boolean.
    The string is returned as-is if it does not look like a boolean value.
    """
    val = val.lower()
    if val in ("y", "yes", "t", "true", "on", "1"):
        return True
    if val in ("n", "no", "f", "false", "off", "0"):
        return False
    return val 
[docs]
class TableauJobFailedException(AirflowException):
    """An exception that indicates that a Job failed to complete.""" 
[docs]
class TableauJobFinishCode(Enum):
    """
    The finish code indicates the status of the job.
    .. seealso:: https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref.htm#query_job
    """
 
[docs]
class TableauHook(BaseHook):
    """
    Connects to the Tableau Server Instance and allows to communicate with it.
    Can be used as a context manager: automatically authenticates the connection
    when opened and signs out when closed.
    .. seealso:: https://tableau.github.io/server-client-python/docs/
    :param site_id: The id of the site where the workbook belongs to.
        It will connect to the default site if you don't provide an id.
    :param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>`
        containing the credentials to authenticate to the Tableau Server.
    """
[docs]
    conn_name_attr = "tableau_conn_id" 
[docs]
    default_conn_name = "tableau_default" 
    def __init__(self, site_id: str | None = None, tableau_conn_id: str = default_conn_name) -> None:
        super().__init__()
[docs]
        self.tableau_conn_id = tableau_conn_id 
[docs]
        self.conn = self.get_connection(self.tableau_conn_id) 
[docs]
        self.site_id = site_id or self.conn.extra_dejson.get("site_id", "") 
[docs]
        self.server = Server(self.conn.host) 
        verify: Any = self.conn.extra_dejson.get("verify", True)
        if isinstance(verify, str):
            verify = parse_boolean(verify)
        self.server.add_http_options(
            options_dict={"verify": verify, "cert": self.conn.extra_dejson.get("cert", None)}
        )
        self.server.use_server_version()
[docs]
        self.tableau_conn = None 
[docs]
    def __enter__(self):
        if not self.tableau_conn:
            self.tableau_conn = self.get_conn()
        return self 
[docs]
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        self.server.auth.sign_out() 
[docs]
    def get_conn(self) -> Auth.contextmgr:
        """
        Sign in to the Tableau Server.
        :return: an authorized Tableau Server Context Manager object.
        """
        extra = self.conn.extra_dejson
        password_auth_set = self.conn.login and self.conn.password
        jwt_auth_set = extra.get("auth") == "jwt"
        if password_auth_set and jwt_auth_set:
            raise AirflowException(
                "Username/password authentication and JWT authentication cannot be used simultaneously. Please specify only one authentication method."
            )
        if password_auth_set:
            return self._auth_via_password()
        if jwt_auth_set:
            if not exactly_one(jwt_file := "jwt_file" in extra, jwt_token := "jwt_token" in extra):
                msg = (
                    "When auth set to 'jwt' then expected exactly one parameter 'jwt_file' or 'jwt_token'"
                    " in connection extra, but "
                )
                if jwt_file and jwt_token:
                    msg += "provided both."
                else:
                    msg += "none of them provided."
                raise ValueError(msg)
            if jwt_file:
                self.jwt_token = Path(extra["jwt_file"]).read_text()
            else:
                self.jwt_token = extra["jwt_token"]
            return self._auth_via_jwt()
        raise NotImplementedError("No Authentication method found for given Credentials!") 
    def _auth_via_password(self) -> Auth.contextmgr:
        tableau_auth = TableauAuth(
            username=cast("str", self.conn.login),
            password=cast("str", self.conn.password),
            site_id=self.site_id,
        )
        return self.server.auth.sign_in(tableau_auth)
    def _auth_via_jwt(self) -> Auth.contextmgr:
        jwt_auth = JWTAuth(jwt=self.jwt_token, site_id=self.site_id)
        return self.server.auth.sign_in(jwt_auth)
[docs]
    def get_all(self, resource_name: str) -> Pager:
        """
        Get all items of the given resource.
        .. see also:: https://tableau.github.io/server-client-python/docs/page-through-results
        :param resource_name: The name of the resource to paginate.
            For example: jobs or workbooks.
        :return: all items by returning a Pager.
        """
        try:
            resource = getattr(self.server, resource_name)
        except AttributeError:
            raise ValueError(f"Resource name {resource_name} is not found.")
        return Pager(resource.get) 
[docs]
    def get_job_status(self, job_id: str) -> TableauJobFinishCode:
        """
        Get the current state of a defined Tableau Job.
        .. see also:: https://tableau.github.io/server-client-python/docs/api-ref#jobs
        :param job_id: The id of the job to check.
        :return: An Enum that describe the Tableau job's return code
        """
        return TableauJobFinishCode(int(self.server.jobs.get_by_id(job_id).finish_code)) 
[docs]
    def wait_for_state(self, job_id: str, target_state: TableauJobFinishCode, check_interval: float) -> bool:
        """
        Wait until the current state of a defined Tableau Job is target_state or different from PENDING.
        :param job_id: The id of the job to check.
        :param target_state: Enum that describe the Tableau job's target state
        :param check_interval: time in seconds that the job should wait in
            between each instance state checks until operation is completed
        :return: return True if the job is equal to the target_status, False otherwise.
        """
        finish_code = self.get_job_status(job_id=job_id)
        while finish_code == TableauJobFinishCode.PENDING and finish_code != target_state:
            self.log.info("job state: %s", finish_code)
            time.sleep(check_interval)
            finish_code = self.get_job_status(job_id=job_id)
        return finish_code == target_state