Source code for airflow.providers.slack.transfers.sql_to_slack_webhook
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportIterable,Mapping,SequencefromtypingimportTYPE_CHECKING,Anyfromtabulateimporttabulatefromairflow.exceptionsimportAirflowExceptionfromairflow.providers.slack.hooks.slack_webhookimportSlackWebhookHookfromairflow.providers.slack.transfers.base_sql_to_slackimportBaseSqlToSlackOperatorifTYPE_CHECKING:try:fromairflow.sdk.definitions.contextimportContextexceptImportError:# TODO: Remove once provider drops support for Airflow 2fromairflow.utils.contextimportContext
[docs]classSqlToSlackWebhookOperator(BaseSqlToSlackOperator):""" Executes an SQL statement in a given SQL connection and sends the results to Slack Incoming Webhook. The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ results_df }}'. The 'results_df' variable name can be changed by specifying a different 'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df | tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:SqlToSlackWebhookOperator` .. note:: You cannot override the default channel (chosen by the user who installed your app), Instead, these values will always inherit from the associated Slack App configuration (`link <https://api.slack.com/messaging/webhooks#advanced_message_formatting>`_). It is possible to change this values only in `Legacy Slack Integration Incoming Webhook <https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations>`_. .. warning:: This hook intend to use `Slack Incoming Webhook` connection and might not work correctly with `Slack API` connection. :param sql: The SQL query to be executed (templated) :param slack_message: The templated Slack message to send with the data returned from the SQL connection. You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the SQL results :param sql_conn_id: reference to a specific database. :param sql_hook_params: Extra config params to be passed to the underlying hook. Should match the desired hook constructor params. :param slack_webhook_conn_id: :ref:`Slack Incoming Webhook <howto/connection:slack>` connection id that has Incoming Webhook token in the password field. :param slack_channel: The channel to send message. :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query """
def__init__(self,*,sql:str,sql_conn_id:str,slack_webhook_conn_id:str|None=None,sql_hook_params:dict|None=None,slack_channel:str|None=None,slack_message:str,results_df_name:str="results_df",parameters:list|tuple|Mapping[str,Any]|None=None,**kwargs,)->None:ifnotslack_webhook_conn_id:raiseValueError("Got an empty `slack_webhook_conn_id` value.")super().__init__(sql=sql,sql_conn_id=sql_conn_id,sql_hook_params=sql_hook_params,parameters=parameters,**kwargs)
def_render_and_send_slack_message(self,context,df)->None:# Put the dataframe into the context and render the JINJA template fieldscontext[self.results_df_name]=dfself.render_template_fields(context)slack_hook=self._get_slack_hook()self.log.info("Sending slack message: %s",self.slack_message)slack_hook.send(text=self.slack_message,channel=self.slack_channel)def_get_slack_hook(self)->SlackWebhookHook:returnSlackWebhookHook(slack_webhook_conn_id=self.slack_webhook_conn_id,proxy=self.slack_proxy,timeout=self.slack_timeout,retry_handlers=self.slack_retry_handlers,)
[docs]defrender_template_fields(self,context,jinja_env=None)->None:# If this is the first render of the template fields, exclude slack_message from rendering since# the SQL results haven't been retrieved yet.ifself.times_rendered==0:fields_to_render:Iterable[str]=(xforxinself.template_fieldsifx!="slack_message")else:fields_to_render=self.template_fieldsifnotjinja_env:jinja_env=self.get_template_env()# Add the tabulate library into the JINJA environmentjinja_env.filters["tabulate"]=tabulateself._do_render_template_fields(self,fields_to_render,context,jinja_env,set())self.times_rendered+=1
[docs]defexecute(self,context:Context)->None:ifnotisinstance(self.sql,str):raiseAirflowException("Expected 'sql' parameter should be a string.")ifself.sqlisNoneorself.sql.strip()=="":raiseAirflowException("Expected 'sql' parameter is missing.")ifself.slack_messageisNoneorself.slack_message.strip()=="":raiseAirflowException("Expected 'slack_message' parameter is missing.")df=self._get_query_results()self._render_and_send_slack_message(context,df)self.log.debug("Finished sending SQL data to Slack")