Source code for airflow.providers.amazon.aws.hooks.chime
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a web hook for Chime."""
from __future__ import annotations
import json
import re
from typing import Any
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
[docs]class ChimeWebhookHook(HttpHook):
"""Interact with Chime Web Hooks to create notifications.
.. warning:: This hook is only designed to work with web hooks and not chat bots.
:param chime_conn_id: Chime connection ID with Endpoint as "https://hooks.chime.aws" and
the webhook token in the form of ```{webhook.id}?token{webhook.token}```
"""
[docs] conn_name_attr = "chime_conn_id"
[docs] default_conn_name = "chime_default"
[docs] hook_name = "Chime Web Hook"
def __init__(
self,
chime_conn_id: str,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.webhook_endpoint = self._get_webhook_endpoint(chime_conn_id)
def _get_webhook_endpoint(self, conn_id: str) -> str:
"""
Given a Chime conn_id return the default webhook endpoint.
:param conn_id: The provided connection ID.
:return: Endpoint(str) for chime webhook.
"""
conn = self.get_connection(conn_id)
token = conn.get_password()
if token is None:
raise AirflowException("Webhook token field is missing and is required.")
url = conn.schema + "://" + conn.host
endpoint = url + token
# Check to make sure the endpoint matches what Chime expects
if not re.match(r"^[a-zA-Z0-9_-]+\?token=[a-zA-Z0-9_-]+$", token):
raise AirflowException(
"Expected Chime webhook token in the form of '{webhook.id}?token={webhook.token}'."
)
return endpoint
def _build_chime_payload(self, message: str) -> str:
"""
Builds payload for Chime and ensures messages do not exceed max length allowed.
:param message: The message you want to send to your Chime room.
(max 4096 characters)
"""
payload: dict[str, Any] = {}
# We need to make sure that the message does not exceed the max length for Chime
if len(message) > 4096:
raise AirflowException("Chime message must be 4096 characters or less.")
payload["Content"] = message
return json.dumps(payload)
[docs] def send_message(self, message: str) -> None:
"""Execute calling the Chime webhook endpoint.
:param message: The message you want to send to your Chime room.
(max 4096 characters)
"""
chime_payload = self._build_chime_payload(message)
self.run(
endpoint=self.webhook_endpoint, data=chime_payload, headers={"Content-type": "application/json"}
)
@classmethod
[docs] def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Returns custom field behaviour to only get what is needed for Chime webhooks to function."""
return {
"hidden_fields": ["login", "port", "extra"],
"relabeling": {
"host": "Chime Webhook Endpoint",
"password": "Webhook Token",
},
"placeholders": {
"schema": "https",
"host": "hooks.chime.aws/incomingwebhook/",
"password": "T00000000?token=XXXXXXXXXXXXXXXXXXXXXXXX",
},
}