Amazon Bedrock¶
Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon via a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Apache Airflow®
Generic Parameters¶
- aws_conn_id
- Reference to Amazon Web Services Connection ID. If this parameter is set to - Nonethen the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default:- aws_default
- region_name
- AWS Region Name. If this parameter is set to - Noneor omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None
- verify
- Whether or not to verify SSL certificates. - False- Do not validate SSL certificates.
- path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. 
 - If this parameter is set to - Noneor is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None
- botocore_config
- The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc. Example, for more detail about parameters please have a look botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - If this parameter is set to - Noneor omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None- Note - Specifying an empty dictionary, - {}, will overwrite the connection configuration for botocore.config.Config
Operators¶
Invoke an existing Amazon Bedrock Model¶
To invoke an existing Amazon Bedrock model, you can use
BedrockInvokeModelOperator.
Note that every model family has different input and output formats. Some examples are included below, but for details on the different formats, see Inference parameters for foundation models
For example, to invoke a Meta Llama model you would use:
invoke_llama_model = BedrockInvokeModelOperator(
    task_id="invoke_llama",
    model_id=LLAMA_SHORT_MODEL_ID,
    input_data={"prompt": PROMPT},
)
To invoke an Amazon Titan model you would use:
invoke_titan_model = BedrockInvokeModelOperator(
    task_id="invoke_titan",
    model_id=TITAN_SHORT_MODEL_ID,
    input_data={"inputText": PROMPT},
)
To invoke a Claude V2 model using the Completions API you would use:
invoke_claude_completions = BedrockInvokeModelOperator(
    task_id="invoke_claude_completions",
    model_id=CLAUDE_MODEL_ID,
    input_data={"max_tokens_to_sample": 4000, "prompt": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
)
Customize an existing Amazon Bedrock Model¶
To create a fine-tuning job to customize a base model, you can use
BedrockCustomizeModelOperator.
Model-customization jobs are asynchronous and the completion time depends on the base model
and the training/validation data size. To monitor the state of the job, you can use the
“model_customization_job_complete” Waiter, the
BedrockCustomizeModelCompletedSensor Sensor,
or the BedrockCustomizeModelCompletedTrigger Trigger.
customize_model = BedrockCustomizeModelOperator(
    task_id="customize_model",
    job_name=custom_model_job_name,
    custom_model_name=custom_model_name,
    role_arn=test_context[ROLE_ARN_KEY],
    base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
    hyperparameters=HYPERPARAMETERS,
    training_data_uri=training_data_uri,
    output_data_uri=f"s3://{bucket_name}/myOutputData",
)
Provision Throughput for an existing Amazon Bedrock Model¶
To create a provisioned throughput with dedicated capacity for a foundation
model or a fine-tuned model, you can use
BedrockCreateProvisionedModelThroughputOperator.
Provision throughput jobs are asynchronous. To monitor the state of the job, you can use the
“provisioned_model_throughput_complete” Waiter, the
BedrockProvisionModelThroughputCompletedSensor Sensor,
or the BedrockProvisionModelThroughputCompletedSensorTrigger
Trigger.
provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
    task_id="provision_throughput",
    model_units=1,
    provisioned_model_name=provisioned_model_name,
    model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)
Create an Amazon Bedrock Knowledge Base¶
To create an Amazon Bedrock Knowledge Base, you can use
BedrockCreateKnowledgeBaseOperator.
For more information on which models support embedding data into a vector store, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
    task_id="create_knowledge_base",
    name=knowledge_base_name,
    embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
    role_arn=test_context[ROLE_ARN_KEY],
    storage_config={
        "type": "OPENSEARCH_SERVERLESS",
        "opensearchServerlessConfiguration": {
            "collectionArn": get_collection_arn(collection),
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata",
            },
        },
    },
)
Delete an Amazon Bedrock Knowledge Base¶
Deleting a Knowledge Base is a simple boto API call and can be done in a TaskFlow task like the example below.
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
    """
    Delete the Amazon Bedrock knowledge base created earlier.
    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto/operator:BedrockDeleteKnowledgeBase`
    :param knowledge_base_id: The unique identifier of the knowledge base to delete.
    """
    log.info("Deleting Knowledge Base %s.", knowledge_base_id)
    bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
Create an Amazon Bedrock Data Source¶
To create an Amazon Bedrock Data Source, you can use
BedrockCreateDataSourceOperator.
create_data_source = BedrockCreateDataSourceOperator(
    task_id="create_data_source",
    knowledge_base_id=create_knowledge_base.output,
    name=data_source_name,
    bucket_name=bucket_name,
)
Delete an Amazon Bedrock Data Source¶
Deleting a Data Source is a simple boto API call and can be done in a TaskFlow task like the example below.
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
    """
    Delete the Amazon Bedrock data source created earlier.
    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto_operator:BedrockDeleteDataSource`
    :param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
    :param data_source_id: The unique identifier of the data source to delete.
    """
    log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
    bedrock_agent_client.delete_data_source(dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id)
Ingest data into an Amazon Bedrock Data Source¶
To add data from an Amazon S3 bucket into an Amazon Bedrock Data Source, you can use
BedrockIngestDataOperator.
ingest_data = BedrockIngestDataOperator(
    task_id="ingest_data",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
)
Amazon Bedrock Retrieve¶
To query a knowledge base, you can use BedrockRetrieveOperator.
The response will only contain citations to sources that are relevant to the query.  If you
would like to pass the results through an LLM in order to generate a text response, see
BedrockRaGOperator
For more information on which models support retrieving information from a knowledge base, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
retrieve = BedrockRetrieveOperator(
    task_id="retrieve",
    knowledge_base_id=create_knowledge_base.output,
    retrieval_query="Who was the CEO of Amazon in 1997?",
)
Amazon Bedrock Retrieve and Generate (RaG)¶
To query a knowledge base or external sources and generate a text response based on the retrieved
results, you can use BedrockRaGOperator.
The response will contain citations to sources that are relevant to the query as well as a generated text reply. For more information on which models support retrieving information from a knowledge base, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
NOTE: Support for “external sources” was added in boto 1.34.90
Example using an Amazon Bedrock Knowledge Base:
knowledge_base_rag = BedrockRaGOperator(
    task_id="knowledge_base_rag",
    input="Who was the CEO of Amazon on 2022?",
    source_type="KNOWLEDGE_BASE",
    model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
    knowledge_base_id=create_knowledge_base.output,
)
Example using a PDF file in an Amazon S3 Bucket:
external_sources_rag = BedrockRaGOperator(
    task_id="external_sources_rag",
    input="Who was the CEO of Amazon in 2022?",
    source_type="EXTERNAL_SOURCES",
    model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
    sources=[
        {
            "sourceType": "S3",
            "s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
        }
    ],
)
Sensors¶
Wait for an Amazon Bedrock customize model job¶
To wait on the state of an Amazon Bedrock customize model job until it reaches a terminal state you can use
BedrockCustomizeModelCompletedSensor
await_custom_model_job = BedrockCustomizeModelCompletedSensor(
    task_id="await_custom_model_job",
    job_name=custom_model_job_name,
)
Wait for an Amazon Bedrock provision model throughput job¶
To wait on the state of an Amazon Bedrock provision model throughput job until it reaches a
terminal state you can use
BedrockProvisionModelThroughputCompletedSensor
await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
    task_id="await_provision_throughput",
    model_id=provision_throughput.output,
)
Wait for an Amazon Bedrock Knowledge Base¶
To wait on the state of an Amazon Bedrock Knowledge Base until it reaches a terminal state you can use
BedrockKnowledgeBaseActiveSensor
await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
    task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)
Wait for an Amazon Bedrock ingestion job to finish¶
To wait on the state of an Amazon Bedrock data ingestion job until it reaches a terminal state you can use
BedrockIngestionJobSensor
await_ingest = BedrockIngestionJobSensor(
    task_id="await_ingest",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
    ingestion_job_id=ingest_data.output,
)