Source code for airflow.providers.amazon.aws.hooks.eventbridge
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import json
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils import trim_none_values
def _validate_json(pattern: str) -> None:
try:
json.loads(pattern)
except ValueError:
raise ValueError("`event_pattern` must be a valid JSON string.")
[docs]class EventBridgeHook(AwsBaseHook):
"""Amazon EventBridge Hook."""
def __init__(self, *args, **kwargs):
super().__init__(client_type="events", *args, **kwargs)
[docs] def put_rule(
self,
name: str,
description: str | None = None,
event_bus_name: str | None = None,
event_pattern: str | None = None,
role_arn: str | None = None,
schedule_expression: str | None = None,
state: str | None = None,
tags: list[dict] | None = None,
**kwargs,
):
"""
Create or update an EventBridge rule.
:param name: name of the rule to create or update (required)
:param description: description of the rule
:param event_bus_name: name or ARN of the event bus to associate with this rule
:param event_pattern: pattern of events to be matched to this rule
:param role_arn: the Amazon Resource Name of the IAM role associated with the rule
:param schedule_expression: the scheduling expression (for example, a cron or rate expression)
:param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
:param tags: list of key-value pairs to associate with the rule
"""
if not (event_pattern or schedule_expression):
raise ValueError(
"One of `event_pattern` or `schedule_expression` are required in order to "
"put or update your rule."
)
if state and state not in ["ENABLED", "DISABLED"]:
raise ValueError("`state` must be specified as ENABLED or DISABLED.")
if event_pattern:
_validate_json(event_pattern)
put_rule_kwargs: dict[str, str | list] = {
**trim_none_values(
{
"Name": name,
"Description": description,
"EventBusName": event_bus_name,
"EventPattern": event_pattern,
"RoleArn": role_arn,
"ScheduleExpression": schedule_expression,
"State": state,
"Tags": tags,
}
)
}
return self.conn.put_rule(**put_rule_kwargs)