Source code for airflow.providers.amazon.aws.hooks.lambda_function

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains AWS Lambda hook."""
from __future__ import annotations

import base64
from typing import Any

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils import trim_none_values
from airflow.providers.amazon.aws.utils.suppress import return_on_error


[docs]class LambdaHook(AwsBaseHook): """ Interact with AWS Lambda. Provide thin wrapper around :external+boto3:py:class:`boto3.client("lambda") <Lambda.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """ def __init__(self, *args, **kwargs) -> None: kwargs["client_type"] = "lambda" super().__init__(*args, **kwargs)
[docs] def invoke_lambda( self, *, function_name: str, invocation_type: str | None = None, log_type: str | None = None, client_context: str | None = None, payload: bytes | str | None = None, qualifier: str | None = None, ): """ Invoke Lambda Function. .. seealso:: - :external+boto3:py:meth:`Lambda.Client.invoke` :param function_name: AWS Lambda Function Name :param invocation_type: AWS Lambda Invocation Type (RequestResponse, Event etc) :param log_type: Set to Tail to include the execution log in the response. Applies to synchronously invoked functions only. :param client_context: Up to 3,583 bytes of base64-encoded data about the invoking client to pass to the function in the context object. :param payload: The JSON that you want to provide to your Lambda function as input. :param qualifier: AWS Lambda Function Version or Alias Name """ if isinstance(payload, str): payload = payload.encode() invoke_args = { "FunctionName": function_name, "InvocationType": invocation_type, "LogType": log_type, "ClientContext": client_context, "Payload": payload, "Qualifier": qualifier, } return self.conn.invoke(**trim_none_values(invoke_args))
[docs] def create_lambda( self, *, function_name: str, runtime: str | None = None, role: str, handler: str | None = None, code: dict, description: str | None = None, timeout: int | None = None, memory_size: int | None = None, publish: bool | None = None, vpc_config: Any | None = None, package_type: str | None = None, dead_letter_config: Any | None = None, environment: Any | None = None, kms_key_arn: str | None = None, tracing_config: Any | None = None, tags: Any | None = None, layers: list | None = None, file_system_configs: list[Any] | None = None, image_config: Any | None = None, code_signing_config_arn: str | None = None, architectures: list[str] | None = None, ) -> dict: """ Create a Lambda function. .. seealso:: - :external+boto3:py:meth:`Lambda.Client.create_function` - `Configuring a Lambda function to access resources in a VPC \ <https://docs.aws.amazon.com/lambda/latest/dg/configuration-vpc.html>`__ :param function_name: AWS Lambda Function Name :param runtime: The identifier of the function's runtime. Runtime is required if the deployment package is a .zip file archive. :param role: The Amazon Resource Name (ARN) of the function's execution role. :param handler: The name of the method within your code that Lambda calls to run your function. Handler is required if the deployment package is a .zip file archive. :param code: The code for the function. :param description: A description of the function. :param timeout: The amount of time (in seconds) that Lambda allows a function to run before stopping it. :param memory_size: The amount of memory available to the function at runtime. Increasing the function memory also increases its CPU allocation. :param publish: Set to true to publish the first version of the function during creation. :param vpc_config: For network connectivity to Amazon Web Services resources in a VPC, specify a list of security groups and subnets in the VPC. :param package_type: The type of deployment package. Set to `Image` for container image and set to `Zip` for .zip file archive. :param dead_letter_config: A dead-letter queue configuration that specifies the queue or topic where Lambda sends asynchronous events when they fail processing. :param environment: Environment variables that are accessible from function code during execution. :param kms_key_arn: The ARN of the Key Management Service (KMS) key that's used to encrypt your function's environment variables. If it's not provided, Lambda uses a default service key. :param tracing_config: Set `Mode` to `Active` to sample and trace a subset of incoming requests with X-Ray. :param tags: A list of tags to apply to the function. :param layers: A list of function layers to add to the function's execution environment. Specify each layer by its ARN, including the version. :param file_system_configs: Connection settings for an Amazon EFS file system. :param image_config: Container image configuration values that override the values in the container image Dockerfile. :param code_signing_config_arn: To enable code signing for this function, specify the ARN of a code-signing configuration. A code-signing configuration includes a set of signing profiles, which define the trusted publishers for this function. :param architectures: The instruction set architecture that the function supports. """ if package_type == "Zip": if handler is None: raise TypeError("Parameter 'handler' is required if 'package_type' is 'Zip'") if runtime is None: raise TypeError("Parameter 'runtime' is required if 'package_type' is 'Zip'") """Create a Lambda Function""" create_function_args = { "FunctionName": function_name, "Runtime": runtime, "Role": role, "Handler": handler, "Code": code, "Description": description, "Timeout": timeout, "MemorySize": memory_size, "Publish": publish, "VpcConfig": vpc_config, "PackageType": package_type, "DeadLetterConfig": dead_letter_config, "Environment": environment, "KMSKeyArn": kms_key_arn, "TracingConfig": tracing_config, "Tags": tags, "Layers": layers, "FileSystemConfigs": file_system_configs, "ImageConfig": image_config, "CodeSigningConfigArn": code_signing_config_arn, "Architectures": architectures, } return self.conn.create_function(**trim_none_values(create_function_args))
@staticmethod @return_on_error(None)
[docs] def encode_log_result(log_result: str, *, keep_empty_lines: bool = True) -> list[str] | None: """ Encode execution log from the response and return list of log records. Returns ``None`` on error, e.g. invalid base64-encoded string :param log_result: base64-encoded string which contain Lambda execution Log. :param keep_empty_lines: Whether or not keep empty lines. """ encoded_log_result = base64.b64decode(log_result.encode("ascii")).decode() return [log_row for log_row in encoded_log_result.splitlines() if keep_empty_lines or log_row]

Was this entry helpful?