Source code for tests.system.google.cloud.gcs.example_mysql_to_gcs
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG using MySQLToGCSOperator.This DAG relies on the following OS environment variables* AIRFLOW__API__GOOGLE_KEY_PATH - Path to service account key file. Note, you can skip this variable if you run this DAG in a Composer environment."""from__future__importannotationsimportloggingimportosfromdatetimeimportdatetimeimportpytestfromairflow.decoratorsimporttaskfromairflow.modelsimportConnectionfromairflow.models.dagimportDAGfromairflow.providers.common.sql.operators.sqlimportSQLExecuteQueryOperatorfromairflow.providers.google.cloud.hooks.computeimportComputeEngineHookfromairflow.providers.google.cloud.hooks.compute_sshimportComputeEngineSSHHookfromairflow.providers.google.cloud.operators.computeimport(ComputeEngineDeleteInstanceOperator,ComputeEngineInsertInstanceOperator,)fromairflow.providers.google.cloud.operators.gcsimport(GCSCreateBucketOperator,GCSDeleteBucketOperator,)fromairflow.providers.ssh.operators.sshimportSSHOperatorfromairflow.providers.standard.operators.bashimportBashOperatorfromairflow.settingsimportSessionfromairflow.utils.trigger_ruleimportTriggerRuletry:fromairflow.providers.google.cloud.transfers.mysql_to_gcsimportMySQLToGCSOperatorexceptImportError:pytest.skip("MySQL not available",allow_module_level=True)
create_firewall_rule=BashOperator(task_id="create_firewall_rule",bash_command=CREATE_FIREWALL_RULE_COMMAND,)setup_mysql=SSHOperator(task_id="setup_mysql",ssh_hook=ComputeEngineSSHHook(user="username",instance_name=GCE_INSTANCE_NAME,zone=ZONE,project_id=PROJECT_ID,use_oslogin=False,use_iap_tunnel=False,cmd_timeout=180,),command=SETUP_MYSQL_COMMAND,retries=2,)@taskdefget_public_ip()->str:hook=ComputeEngineHook()address=hook.get_instance_address(resource_id=GCE_INSTANCE_NAME,zone=ZONE,project_id=PROJECT_ID)returnaddressget_public_ip_task=get_public_ip()@taskdefcreate_connection(connection_id:str,ip_address:str)->None:connection=Connection(conn_id=connection_id,description="Example connection",conn_type=CONNECTION_TYPE,host=ip_address,login=DB_USER_NAME,password=DB_USER_PASSWORD,port=DB_PORT,)session=Session()log.info("Removing connection %s if it exists",connection_id)query=session.query(Connection).filter(Connection.conn_id==connection_id)query.delete()session.add(connection)session.commit()log.info("Connection %s created",connection_id)create_connection_task=create_connection(connection_id=CONNECTION_ID,ip_address=get_public_ip_task)create_sql_table=SQLExecuteQueryOperator(task_id="create_sql_table",conn_id=CONNECTION_ID,sql=SQL_CREATE,retries=4,)insert_sql_data=SQLExecuteQueryOperator(task_id="insert_sql_data",conn_id=CONNECTION_ID,sql=SQL_INSERT,)create_gcs_bucket=GCSCreateBucketOperator(task_id="create_gcs_bucket",bucket_name=BUCKET_NAME,)# [START howto_operator_mysql_to_gcs]mysql_to_gcs=MySQLToGCSOperator(task_id="mysql_to_gcs",mysql_conn_id=CONNECTION_ID,sql=SQL_SELECT,bucket=BUCKET_NAME,filename=FILE_NAME,export_format="csv",)# [END howto_operator_mysql_to_gcs]delete_gcs_bucket=GCSDeleteBucketOperator(task_id="delete_gcs_bucket",bucket_name=BUCKET_NAME,trigger_rule=TriggerRule.ALL_DONE,)delete_firewall_rule=BashOperator(task_id="delete_firewall_rule",bash_command=DELETE_FIREWALL_RULE_COMMAND,trigger_rule=TriggerRule.ALL_DONE,)delete_gce_instance=ComputeEngineDeleteInstanceOperator(task_id="delete_gce_instance",resource_id=GCE_INSTANCE_NAME,zone=ZONE,project_id=PROJECT_ID,trigger_rule=TriggerRule.ALL_DONE,)delete_persistent_disk=BashOperator(task_id="delete_persistent_disk",bash_command=DELETE_PERSISTENT_DISK_COMMAND,trigger_rule=TriggerRule.ALL_DONE,)@task(task_id="delete_connection")defdelete_connection(connection_id:str)->None:session=Session()log.info("Removing connection %s",connection_id)query=session.query(Connection).filter(Connection.conn_id==connection_id)query.delete()session.commit()delete_connection_task=delete_connection(connection_id=CONNECTION_ID)(# TEST SETUPcreate_gce_instance>>setup_mysql>>create_firewall_rule>>get_public_ip_task>>create_connection_task>>create_sql_table>>insert_sql_data>>create_gcs_bucket# TEST BODY>>mysql_to_gcs)# TEST TEARDOWNmysql_to_gcs>>[delete_gcs_bucket,delete_firewall_rule,delete_gce_instance,delete_connection_task]delete_gce_instance>>delete_persistent_diskfromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)