Source code for airflow.providers.slack.transfers.base_sql_to_slack
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any
from airflow.exceptions import AirflowException
from airflow.providers.slack.version_compat import BaseHook, BaseOperator
if TYPE_CHECKING:
    import pandas as pd
    from slack_sdk.http_retry import RetryHandler
    from airflow.providers.common.sql.hooks.sql import DbApiHook
[docs]
class BaseSqlToSlackOperator(BaseOperator):
    """
    Operator implements base sql methods for SQL to Slack Transfer operators.
    :param sql: The SQL query to be executed
    :param sql_conn_id: reference to a specific DB-API Connection.
    :param sql_hook_params: Extra config params to be passed to the underlying hook.
        Should match the desired hook constructor params.
    :param parameters: The parameters to pass to the SQL query.
    :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls. Optional
    :param slack_timeout: The maximum number of seconds the client will wait to connect
        and receive a response from Slack. Optional
    :param slack_retry_handlers: List of handlers to customize retry logic. Optional
    """
    def __init__(
        self,
        *,
        sql: str,
        sql_conn_id: str,
        sql_hook_params: dict | None = None,
        parameters: list | tuple | Mapping[str, Any] | None = None,
        slack_proxy: str | None = None,
        slack_timeout: int | None = None,
        slack_retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
[docs]
        self.sql_conn_id = sql_conn_id 
[docs]
        self.sql_hook_params = sql_hook_params 
[docs]
        self.parameters = parameters 
[docs]
        self.slack_proxy = slack_proxy 
[docs]
        self.slack_timeout = slack_timeout 
[docs]
        self.slack_retry_handlers = slack_retry_handlers 
    def _get_hook(self) -> DbApiHook:
        self.log.debug("Get connection for %s", self.sql_conn_id)
        conn = BaseHook.get_connection(self.sql_conn_id)
        hook = conn.get_hook(hook_params=self.sql_hook_params)
        if not callable(getattr(hook, "get_df", None)):
            raise AirflowException("This hook is not supported. The hook class must have get_df method.")
        return hook
    def _get_query_results(self) -> pd.DataFrame:
        sql_hook = self._get_hook()
        self.log.info("Running SQL query: %s", self.sql)
        df = sql_hook.get_df(self.sql, parameters=self.parameters)
        return df