#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the DingdingOperator.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.dingding.operators.dingding import DingdingOperator
# [START howto_operator_dingding_failure_callback]
def failure_callback(context):
"""
The function that will be executed on failure.
:param context: The context of the executed task.
"""
message = (
f"AIRFLOW TASK FAILURE TIPS:\n"
f"DAG: {context['task_instance'].dag_id}\n"
f"TASKS: {context['task_instance'].task_id}\n"
f"Reason: {context['exception']}\n"
)
return DingdingOperator(
task_id='dingding_success_callback',
message_type='text',
message=message,
at_all=True,
).execute(context)
# [END howto_operator_dingding_failure_callback]
with DAG(
dag_id='example_dingding_operator',
default_args={'retries': 3, 'on_failure_callback': failure_callback},
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=60),
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
# [START howto_operator_dingding]
text_msg_remind_none = DingdingOperator(
task_id='text_msg_remind_none',
message_type='text',
message='Airflow dingding text message remind none',
at_mobiles=None,
at_all=False,
)
# [END howto_operator_dingding]
text_msg_remind_specific = DingdingOperator(
task_id='text_msg_remind_specific',
message_type='text',
message='Airflow dingding text message remind specific users',
at_mobiles=['156XXXXXXXX', '130XXXXXXXX'],
at_all=False,
)
text_msg_remind_include_invalid = DingdingOperator(
task_id='text_msg_remind_include_invalid',
message_type='text',
message='Airflow dingding text message remind users including invalid',
# 123 is invalid user or user not in the group
at_mobiles=['156XXXXXXXX', '123'],
at_all=False,
)
# [START howto_operator_dingding_remind_users]
text_msg_remind_all = DingdingOperator(
task_id='text_msg_remind_all',
message_type='text',
message='Airflow dingding text message remind all users in group',
# list of user phone/email here in the group
# when at_all is specific will cover at_mobiles
at_mobiles=['156XXXXXXXX', '130XXXXXXXX'],
at_all=True,
)
# [END howto_operator_dingding_remind_users]
link_msg = DingdingOperator(
task_id='link_msg',
message_type='link',
message={
'title': 'Airflow dingding link message',
'text': 'Airflow official documentation link',
'messageUrl': 'https://airflow.apache.org',
'picURL': 'https://airflow.apache.org/_images/pin_large.png',
},
)
# [START howto_operator_dingding_rich_text]
markdown_msg = DingdingOperator(
task_id='markdown_msg',
message_type='markdown',
message={
'title': 'Airflow dingding markdown message',
'text': '# Markdown message title\n'
'content content .. \n'
'### sub-title\n'
'![logo](https://airflow.apache.org/_images/pin_large.png)',
},
at_mobiles=['156XXXXXXXX'],
at_all=False,
)
# [END howto_operator_dingding_rich_text]
single_action_card_msg = DingdingOperator(
task_id='single_action_card_msg',
message_type='actionCard',
message={
'title': 'Airflow dingding single actionCard message',
'text': 'Airflow dingding single actionCard message\n'
'![logo](https://airflow.apache.org/_images/pin_large.png)\n'
'This is a official logo in Airflow website.',
'hideAvatar': '0',
'btnOrientation': '0',
'singleTitle': 'read more',
'singleURL': 'https://airflow.apache.org',
},
)
multi_action_card_msg = DingdingOperator(
task_id='multi_action_card_msg',
message_type='actionCard',
message={
'title': 'Airflow dingding multi actionCard message',
'text': 'Airflow dingding multi actionCard message\n'
'![logo](https://airflow.apache.org/_images/pin_large.png)\n'
'Airflow documentation and GitHub',
'hideAvatar': '0',
'btnOrientation': '0',
'btns': [
{'title': 'Airflow Documentation', 'actionURL': 'https://airflow.apache.org'},
{'title': 'Airflow GitHub', 'actionURL': 'https://github.com/apache/airflow'},
],
},
)
feed_card_msg = DingdingOperator(
task_id='feed_card_msg',
message_type='feedCard',
message={
"links": [
{
"title": "Airflow DAG feed card",
"messageURL": "https://airflow.apache.org/docs/apache-airflow/stable/ui.html",
"picURL": "https://airflow.apache.org/_images/dags.png",
},
{
"title": "Airflow grid feed card",
"messageURL": "https://airflow.apache.org/docs/apache-airflow/stable/ui.html",
"picURL": "https://airflow.apache.org/_images/grid.png",
},
{
"title": "Airflow graph feed card",
"messageURL": "https://airflow.apache.org/docs/apache-airflow/stable/ui.html",
"picURL": "https://airflow.apache.org/_images/graph.png",
},
]
},
)
msg_failure_callback = DingdingOperator(
task_id='msg_failure_callback',
message_type='not_support_msg_type',
message="",
)
(
[
text_msg_remind_none,
text_msg_remind_specific,
text_msg_remind_include_invalid,
text_msg_remind_all,
]
>> link_msg
>> markdown_msg
>> [
single_action_card_msg,
multi_action_card_msg,
]
>> feed_card_msg
>> msg_failure_callback
)