Source code for airflow.providers.amazon.aws.utils.sqs
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportloggingfromtypingimportAny,Literalfromjsonpath_ngimportparse
[docs]defprocess_response(response:Any,message_filtering:Literal["literal","jsonpath"]|None=None,message_filtering_match_values:Any=None,message_filtering_config:Any=None,)->Any:""" Process the response from SQS. :param response: The response from SQS :return: The processed response """ifnotisinstance(response,dict):return[]elif"Messages"notinresponse:return[]messages=response["Messages"]num_messages=len(messages)log.info("Received %d messages",num_messages)ifnum_messagesandmessage_filtering:messages=filter_messages(messages,message_filtering,message_filtering_match_values,message_filtering_config)num_messages=len(messages)log.info("There are %d messages left after filtering",num_messages)returnmessages
[docs]deffilter_messages(messages,message_filtering,message_filtering_match_values,message_filtering_config)->list[Any]:ifmessage_filtering=="literal":returnfilter_messages_literal(messages,message_filtering_match_values)ifmessage_filtering=="jsonpath":returnfilter_messages_jsonpath(messages,message_filtering_match_values,message_filtering_config)else:raiseNotImplementedError("Override this method to define custom filters")
[docs]deffilter_messages_jsonpath(messages,message_filtering_match_values,message_filtering_config)->list[Any]:jsonpath_expr=parse(message_filtering_config)filtered_messages=[]formessageinmessages:body=message["Body"]# Body is a string, deserialize to an object and then parsebody=json.loads(body)results=jsonpath_expr.find(body)ifnotresults:continueifmessage_filtering_match_valuesisNone:filtered_messages.append(message)continueforresultinresults:ifresult.valueinmessage_filtering_match_values:filtered_messages.append(message)breakreturnfiltered_messages