Amazon Bedrock

Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon via a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI.

Prerequisite Tasks

To use these operators, you must do a few things:

Generic Parameters

aws_conn_id

Reference to Amazon Web Services Connection ID. If this parameter is set to None then the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default: aws_default

region_name

AWS Region Name. If this parameter is set to None or omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

verify

Whether or not to verify SSL certificates.

  • False - Do not validate SSL certificates.

  • path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

If this parameter is set to None or is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

botocore_config

The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc.

Example, for more detail about parameters please have a look botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

If this parameter is set to None or omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

Note

Specifying an empty dictionary, {}, will overwrite the connection configuration for botocore.config.Config

Operators

Invoke an existing Amazon Bedrock Model

To invoke an existing Amazon Bedrock model, you can use BedrockInvokeModelOperator.

Note that every model family has different input and output formats. For example, to invoke a Meta Llama model you would use:

tests/system/providers/amazon/aws/example_bedrock.py[source]

invoke_llama_model = BedrockInvokeModelOperator(
    task_id="invoke_llama",
    model_id=LLAMA_SHORT_MODEL_ID,
    input_data={"prompt": PROMPT},
)

To invoke an Amazon Titan model you would use:

tests/system/providers/amazon/aws/example_bedrock.py[source]

invoke_titan_model = BedrockInvokeModelOperator(
    task_id="invoke_titan",
    model_id=TITAN_SHORT_MODEL_ID,
    input_data={"inputText": PROMPT},
)

For details on the different formats, see Inference parameters for foundation models

Customize an existing Amazon Bedrock Model

To create a fine-tuning job to customize a base model, you can use BedrockCustomizeModelOperator.

Model-customization jobs are asynchronous and the completion time depends on the base model and the training/validation data size. To monitor the state of the job, you can use the “model_customization_job_complete” Waiter, the BedrockCustomizeModelCompletedSensor Sensor, or the BedrockCustomizeModelCompletedTrigger Trigger.

tests/system/providers/amazon/aws/example_bedrock.py[source]

customize_model = BedrockCustomizeModelOperator(
    task_id="customize_model",
    job_name=custom_model_job_name,
    custom_model_name=custom_model_name,
    role_arn=test_context[ROLE_ARN_KEY],
    base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
    hyperparameters=HYPERPARAMETERS,
    training_data_uri=training_data_uri,
    output_data_uri=f"s3://{bucket_name}/myOutputData",
)

Provision Throughput for an existing Amazon Bedrock Model

To create a provisioned throughput with dedicated capacity for a foundation model or a fine-tuned model, you can use BedrockCreateProvisionedModelThroughputOperator.

Provision throughput jobs are asynchronous. To monitor the state of the job, you can use the “provisioned_model_throughput_complete” Waiter, the BedrockProvisionModelThroughputCompletedSensor Sensor, or the BedrockProvisionModelThroughputCompletedSensorTrigger Trigger.

tests/system/providers/amazon/aws/example_bedrock.py[source]

provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
    task_id="provision_throughput",
    model_units=1,
    provisioned_model_name=provisioned_model_name,
    model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)

Create an Amazon Bedrock Knowledge Base

To create an Amazon Bedrock Knowledge Base, you can use BedrockCreateKnowledgeBaseOperator.

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
    task_id="create_knowledge_base",
    name=knowledge_base_name,
    embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1",
    role_arn=test_context[ROLE_ARN_KEY],
    storage_config={
        "type": "OPENSEARCH_SERVERLESS",
        "opensearchServerlessConfiguration": {
            "collectionArn": get_collection_arn(collection),
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata",
            },
        },
    },
)

Delete an Amazon Bedrock Knowledge Base

Deleting a Knowledge Base is a simple boto API call and can be done in a TaskFlow task like the example below.

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
    """
    Delete the Amazon Bedrock knowledge base created earlier.

    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto/operator:BedrockDeleteKnowledgeBase`

    :param knowledge_base_id: The unique identifier of the knowledge base to delete.
    """
    log.info("Deleting Knowledge Base %s.", knowledge_base_id)
    bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)


Create an Amazon Bedrock Data Source

To create an Amazon Bedrock Data Source, you can use BedrockCreateDataSourceOperator.

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

create_data_source = BedrockCreateDataSourceOperator(
    task_id="create_data_source",
    knowledge_base_id=create_knowledge_base.output,
    name=data_source_name,
    bucket_name=bucket_name,
)

Delete an Amazon Bedrock Data Source

Deleting a Data Source is a simple boto API call and can be done in a TaskFlow task like the example below.

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
    """
    Delete the Amazon Bedrock data source created earlier.

    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto_operator:BedrockDeleteDataSource`

    :param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
    :param data_source_id: The unique identifier of the data source to delete.
    """
    log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
    bedrock_agent_client.delete_data_source(dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id)


Ingest data into an Amazon Bedrock Data Source

To add data from an Amazon S3 bucket into an Amazon Bedrock Data Source, you can use BedrockIngestDataOperator.

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

ingest_data = BedrockIngestDataOperator(
    task_id="ingest_data",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
)

Sensors

Wait for an Amazon Bedrock customize model job

To wait on the state of an Amazon Bedrock customize model job until it reaches a terminal state you can use BedrockCustomizeModelCompletedSensor

tests/system/providers/amazon/aws/example_bedrock.py[source]

await_custom_model_job = BedrockCustomizeModelCompletedSensor(
    task_id="await_custom_model_job",
    job_name=custom_model_job_name,
)

Wait for an Amazon Bedrock provision model throughput job

To wait on the state of an Amazon Bedrock provision model throughput job until it reaches a terminal state you can use BedrockProvisionModelThroughputCompletedSensor

tests/system/providers/amazon/aws/example_bedrock.py[source]

await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
    task_id="await_provision_throughput",
    model_id=provision_throughput.output,
)

Wait for an Amazon Bedrock Knowledge Base

To wait on the state of an Amazon Bedrock Knowledge Base until it reaches a terminal state you can use BedrockKnowledgeBaseActiveSensor

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
    task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)

Wait for an Amazon Bedrock ingestion job to finish

To wait on the state of an Amazon Bedrock data ingestion job until it reaches a terminal state you can use BedrockIngestionJobSensor

tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py[source]

await_ingest = BedrockIngestionJobSensor(
    task_id="await_ingest",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
    ingestion_job_id=ingest_data.output,
)

Was this entry helpful?