Google Cloud Memorystore Operators¶
The Cloud Memorystore for Redis is a fully managed Redis service for the Google Cloud. Applications running on Google Cloud can achieve extreme performance by leveraging the highly scalable, available, secure Redis service without the burden of managing complex Redis deployments.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Instance¶
Operators uses a Instance
for representing instance. The object can be
presented as a compatible dictionary also.
Here is an example of instance
FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
Configuration of bucket permissions for import / export¶
It is necessary to configure permissions for the bucket to import and export data. Too find the service
account for your instance, run the CloudMemorystoreCreateInstanceOperator
or
CloudMemorystoreGetInstanceOperator
and
make a use of the service account listed under persistenceIamIdentity
.
You can use GCSBucketCreateAclEntryOperator
operator to set permissions.
set_acl_permission = GCSBucketCreateAclEntryOperator(
task_id="gcs-set-acl-permission",
bucket=BUCKET_NAME,
entity="user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"
".split(':', 2)[1] }}",
role="OWNER",
)
For further information look at: Granting restricted permissions for import and export
Create instance¶
Create a instance is performed with the
CloudMemorystoreCreateInstanceOperator
operator.
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
project_id=PROJECT_ID,
)
You can use Jinja templating with
location
, instance_id
, instance
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it
to be used by other operators.
create_instance_result = BashOperator(
task_id="create-instance-result",
bash_command=f"echo {create_instance.output}",
)
Delete instance¶
Delete a instance is performed with the
CloudMemorystoreDeleteInstanceOperator
operator.
delete_instance = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
)
You can use Jinja templating with
location
, instance
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Export instance¶
Delete a instance is performed with the
CloudMemorystoreExportInstanceOperator
operator.
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=PROJECT_ID,
)
You can use Jinja templating with
location
, instance
, output_config
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Failover instance¶
Delete a instance is performed with the
CloudMemorystoreFailoverInstanceOperator
operator.
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode(
FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS
),
project_id=PROJECT_ID,
)
You can use Jinja templating with
location
, instance
, data_protection_mode
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Get instance¶
Delete a instance is performed with the
CloudMemorystoreGetInstanceOperator
operator.
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
do_xcom_push=True,
)
You can use Jinja templating with
location
, instance
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Import instance¶
Delete a instance is performed with the
CloudMemorystoreImportOperator
operator.
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=PROJECT_ID,
)
You can use Jinja templating with
location
, instance
, input_config
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
List instances¶
List a instances is performed with the
CloudMemorystoreListInstancesOperator
operator.
list_instances = CloudMemorystoreListInstancesOperator(
task_id="list-instances", location="-", page_size=100, project_id=PROJECT_ID
)
You can use Jinja templating with
location
, page_size
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it
to be used by other operators.
list_instances_result = BashOperator(
task_id="list-instances-result", bash_command=f"echo {get_instance.output}"
)
Update instance¶
Update a instance is performed with the
CloudMemorystoreUpdateInstanceOperator
operator.
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
)
You can use Jinja templating with
update_mask
, instance
, location
, instance_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Scale instance¶
Scale a instance is performed with the
CloudMemorystoreScaleInstanceOperator
operator.
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
project_id=PROJECT_ID,
memory_size_gb=3,
)
You can use Jinja templating with
memory_size_gb
, location
, instance_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Create instance and import¶
If you want to create instances with imported data, you can use
CloudMemorystoreCreateInstanceAndImportOperator
operator.
Export and delete instance¶
If you want to export data and immediately delete instances then you can use
CloudMemorystoreExportAndDeleteInstanceOperator
operator.
You can use Jinja templating with
memory_size_gb
, location
, instance_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.