Source code for airflow.providers.google.cloud.hooks.cloud_sql
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a Google Cloud SQL Hook."""from__future__importannotationsimporterrnoimportjsonimportosimportplatformimportrandomimportreimportshutilimportsocketimportstringimportsubprocessimporttimeimportuuidfrominspectimportsignaturefrompathlibimportPathfromsubprocessimportPIPE,PopenfromtempfileimportgettempdirfromtypingimportTYPE_CHECKING,Any,Sequencefromurllib.parseimportquote_plusimporthttpxfromaiohttpimportClientSessionfromgcloud.aio.authimportAioSession,Tokenfromgoogleapiclient.discoveryimportResource,buildfromgoogleapiclient.errorsimportHttpError# Number of retries - used by googleapiclient method calls to perform retries# For requests that are "retriable"fromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.modelsimportConnectionfromairflow.providers.google.common.hooks.base_googleimportGoogleBaseAsyncHook,GoogleBaseHook,get_fieldfromairflow.providers.mysql.hooks.mysqlimportMySqlHookfromairflow.providers.postgres.hooks.postgresimportPostgresHookfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromrequestsimportSession
[docs]classCloudSQLHook(GoogleBaseHook):""" Hook for Google Cloud SQL APIs. All the methods in the hook where project_id is used must be called with keyword arguments rather than positional. :param api_version: This is the version of the api. :param gcp_conn_id: The Airflow connection used for GCP credentials. :param impersonation_chain: This is the optional service account to impersonate using short term credentials. """
def__init__(self,api_version:str,gcp_conn_id:str=default_conn_name,impersonation_chain:str|Sequence[str]|None=None,**kwargs,)->None:ifkwargs.get("delegate_to")isnotNone:raiseRuntimeError("The `delegate_to` parameter has been deprecated before and finally removed in this version"" of Google Provider. You MUST convert it to `impersonate_chain`")super().__init__(gcp_conn_id=gcp_conn_id,impersonation_chain=impersonation_chain,)self.api_version=api_versionself._conn=None
[docs]defget_conn(self)->Resource:""" Retrieve connection to Cloud SQL. :return: Google Cloud SQL services object. """ifnotself._conn:http_authorized=self._authorize()self._conn=build("sqladmin",self.api_version,http=http_authorized,cache_discovery=False)returnself._conn
@GoogleBaseHook.fallback_to_default_project_id
[docs]defget_instance(self,instance:str,project_id:str)->dict:""" Retrieve a resource containing information about a Cloud SQL instance. :param instance: Database instance ID. This does not include the project ID. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: A Cloud SQL instance resource. """return(self.get_conn().instances().get(project=project_id,instance=instance).execute(num_retries=self.num_retries))
[docs]defcreate_instance(self,body:dict,project_id:str)->None:""" Create a new Cloud SQL instance. :param body: Body required by the Cloud SQL insert API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instances().insert(project=project_id,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)
[docs]defpatch_instance(self,body:dict,instance:str,project_id:str)->None:""" Update settings of a Cloud SQL instance. Caution: This is not a partial update, so you must include values for all the settings that you want to retain. :param body: Body required by the Cloud SQL patch API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body. :param instance: Cloud SQL instance ID. This does not include the project ID. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instances().patch(project=project_id,instance=instance,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)
[docs]defdelete_instance(self,instance:str,project_id:str)->None:""" Delete a Cloud SQL instance. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :param instance: Cloud SQL instance ID. This does not include the project ID. :return: None """response=(self.get_conn().instances().delete(project=project_id,instance=instance).execute(num_retries=self.num_retries))operation_name=response["name"]# For some delete instance operations, the operation stops being available ~9 seconds after# completion, so we need a shorter sleep time to make sure we don't miss the DONE status.self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name,time_to_sleep=5)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defget_database(self,instance:str,database:str,project_id:str)->dict:""" Retrieve a database resource from a Cloud SQL instance. :param instance: Database instance ID. This does not include the project ID. :param database: Name of the database in the instance. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: A Cloud SQL database resource, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource. """return(self.get_conn().databases().get(project=project_id,instance=instance,database=database).execute(num_retries=self.num_retries))
[docs]defcreate_database(self,instance:str,body:dict,project_id:str)->None:""" Create a new database inside a Cloud SQL instance. :param instance: Database instance ID. This does not include the project ID. :param body: The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().databases().insert(project=project_id,instance=instance,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)
[docs]defpatch_database(self,instance:str,database:str,body:dict,project_id:str,)->None:""" Update a database resource inside a Cloud SQL instance. This method supports patch semantics. See https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch. :param instance: Database instance ID. This does not include the project ID. :param database: Name of the database to be updated in the instance. :param body: The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().databases().patch(project=project_id,instance=instance,database=database,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)
[docs]defdelete_database(self,instance:str,database:str,project_id:str)->None:""" Delete a database from a Cloud SQL instance. :param instance: Database instance ID. This does not include the project ID. :param database: Name of the database to be deleted in the instance. :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().databases().delete(project=project_id,instance=instance,database=database).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defexport_instance(self,instance:str,body:dict,project_id:str):""" Export data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file. :param instance: Database instance ID of the Cloud SQL instance. This does not include the project ID. :param body: The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instances().export(project=project_id,instance=instance,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]returnoperation_name
@GoogleBaseHook.fallback_to_default_project_id
[docs]defimport_instance(self,instance:str,body:dict,project_id:str)->None:""" Import data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage. :param instance: Database instance ID. This does not include the project ID. :param body: The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/import#request-body :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """try:response=(self.get_conn().instances().import_(project=project_id,instance=instance,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)exceptHttpErrorasex:raiseAirflowException(f"Importing instance {instance} failed: {ex.content}")
@GoogleBaseHook.fallback_to_default_project_id
[docs]defclone_instance(self,instance:str,body:dict,project_id:str)->None:""" Clones an instance to a target instance. :param instance: Database instance ID to be cloned. This does not include the project ID. :param instance: Database instance ID to be used for the clone. This does not include the project ID. :param body: The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1beta4/instances/clone :param project_id: Project ID of the project that contains the instance. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """try:response=(self.get_conn().instances().clone(project=project_id,instance=instance,body=body).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)exceptHttpErrorasex:raiseAirflowException(f"Cloning of instance {instance} failed: {ex.content}")
@GoogleBaseHook.fallback_to_default_project_iddef_wait_for_operation_to_complete(self,project_id:str,operation_name:str,time_to_sleep:int=TIME_TO_SLEEP_IN_SECONDS)->None:""" Wait for the named operation to complete - checks status of the asynchronous call. :param project_id: Project ID of the project that contains the instance. :param operation_name: Name of the operation. :param time_to_sleep: Time to sleep between active checks of the operation results. :return: None """service=self.get_conn()whileTrue:operation_response=(service.operations().get(project=project_id,operation=operation_name).execute(num_retries=self.num_retries))ifoperation_response.get("status")==CloudSqlOperationStatus.DONE:error=operation_response.get("error")iferror:# Extracting the errors list as string and trimming square braceserror_msg=str(error.get("errors"))[1:-1]raiseAirflowException(error_msg)# No meaningful info to return from the response in case of successreturntime.sleep(time_to_sleep)
[docs]classCloudSqlProxyRunner(LoggingMixin):""" Downloads and runs cloud-sql-proxy as subprocess of the Python process. The cloud-sql-proxy needs to be downloaded and started before we can connect to the Google Cloud SQL instance via database connection. It establishes secure tunnel connection to the database. It authorizes using the Google Cloud credentials that are passed by the configuration. More details about the proxy can be found here: https://cloud.google.com/sql/docs/mysql/sql-proxy :param path_prefix: Unique path prefix where proxy will be downloaded and directories created for unix sockets. :param instance_specification: Specification of the instance to connect the proxy to. It should be specified in the form that is described in https://cloud.google.com/sql/docs/mysql/sql-proxy#multiple-instances in -instances parameter (typically in the form of ``<project>:<region>:<instance>`` for UNIX socket connections and in the form of ``<project>:<region>:<instance>=tcp:<port>`` for TCP connections. :param gcp_conn_id: Id of Google Cloud connection to use for authentication :param project_id: Optional id of the Google Cloud project to connect to - it overwrites default project id taken from the Google Cloud connection. :param sql_proxy_version: Specific version of SQL proxy to download (for example 'v1.13'). By default latest version is downloaded. :param sql_proxy_binary_path: If specified, then proxy will be used from the path specified rather than dynamically generated. This means that if the binary is not present in that path it will also be downloaded. """def__init__(self,path_prefix:str,instance_specification:str,gcp_conn_id:str="google_cloud_default",project_id:str|None=None,sql_proxy_version:str|None=None,sql_proxy_binary_path:str|None=None,)->None:super().__init__()self.path_prefix=path_prefixifnotself.path_prefix:raiseAirflowException("The path_prefix must not be empty!")self.sql_proxy_was_downloaded=Falseself.sql_proxy_version=sql_proxy_versionself.download_sql_proxy_dir=Noneself.sql_proxy_process:Popen|None=Noneself.instance_specification=instance_specificationself.project_id=project_idself.gcp_conn_id=gcp_conn_idself.command_line_parameters:list[str]=[]self.cloud_sql_proxy_socket_directory=self.path_prefixself.sql_proxy_path=sql_proxy_binary_pathorf"{self.path_prefix}_cloud_sql_proxy"self.credentials_path=self.path_prefix+"_credentials.json"self._build_command_line_parameters()def_build_command_line_parameters(self)->None:self.command_line_parameters.extend(["-dir",self.cloud_sql_proxy_socket_directory])self.command_line_parameters.extend(["-instances",self.instance_specification])@staticmethoddef_is_os_64bit()->bool:returnplatform.machine().endswith("64")def_download_sql_proxy_if_needed(self)->None:ifos.path.isfile(self.sql_proxy_path):self.log.info("cloud-sql-proxy is already present")returndownload_url=self._get_sql_proxy_download_url()proxy_path_tmp=self.sql_proxy_path+".tmp"self.log.info("Downloading cloud_sql_proxy from %s to %s",download_url,proxy_path_tmp)# httpx has a breaking API change (follow_redirects vs allow_redirects)# and this should work with both versions (cf. issue #20088)if"follow_redirects"insignature(httpx.get).parameters.keys():response=httpx.get(download_url,follow_redirects=True)else:response=httpx.get(download_url,allow_redirects=True)# type: ignore[call-arg]# Downloading to .tmp file first to avoid case where partially downloaded# binary is used by parallel operator which uses the same fixed binary pathwithopen(proxy_path_tmp,"wb")asfile:file.write(response.content)ifresponse.status_code!=200:raiseAirflowException("The cloud-sql-proxy could not be downloaded. "f"Status code = {response.status_code}. Reason = {response.reason_phrase}")self.log.info("Moving sql_proxy binary from %s to %s",proxy_path_tmp,self.sql_proxy_path)shutil.move(proxy_path_tmp,self.sql_proxy_path)os.chmod(self.sql_proxy_path,0o744)# Set executable bitself.sql_proxy_was_downloaded=Truedef_get_sql_proxy_download_url(self):system=platform.system().lower()processor=os.uname().machineifprocessor=="x86_64":processor="amd64"ifnotself.sql_proxy_version:download_url=CLOUD_SQL_PROXY_DOWNLOAD_URL.format(system,processor)else:ifnotCLOUD_SQL_PROXY_VERSION_REGEX.match(self.sql_proxy_version):raiseValueError("The sql_proxy_version should match the regular expression "f"{CLOUD_SQL_PROXY_VERSION_REGEX.pattern}")download_url=CLOUD_SQL_PROXY_VERSION_DOWNLOAD_URL.format(self.sql_proxy_version,system,processor)returndownload_urldef_get_credential_parameters(self)->list[str]:extras=GoogleBaseHook.get_connection(conn_id=self.gcp_conn_id).extra_dejsonkey_path=get_field(extras,"key_path")keyfile_dict=get_field(extras,"keyfile_dict")ifkey_path:credential_params=["-credential_file",key_path]elifkeyfile_dict:keyfile_content=keyfile_dictifisinstance(keyfile_dict,dict)elsejson.loads(keyfile_dict)self.log.info("Saving credentials to %s",self.credentials_path)withopen(self.credentials_path,"w")asfile:json.dump(keyfile_content,file)credential_params=["-credential_file",self.credentials_path]else:self.log.info("The credentials are not supplied by neither key_path nor ""keyfile_dict of the gcp connection %s. Falling back to ""default activated account",self.gcp_conn_id,)credential_params=[]ifnotself.instance_specification:project_id=get_field(extras,"project")ifself.project_id:project_id=self.project_idifnotproject_id:raiseAirflowException("For forwarding all instances, the project id ""for Google Cloud should be provided either ""by project_id extra in the Google Cloud connection or by ""project_id provided in the operator.")credential_params.extend(["-projects",project_id])returncredential_params
[docs]defstart_proxy(self)->None:""" Start Cloud SQL Proxy. You have to remember to stop the proxy if you started it! """self._download_sql_proxy_if_needed()ifself.sql_proxy_process:raiseAirflowException(f"The sql proxy is already running: {self.sql_proxy_process}")else:command_to_run=[self.sql_proxy_path]command_to_run.extend(self.command_line_parameters)self.log.info("Creating directory %s",self.cloud_sql_proxy_socket_directory)Path(self.cloud_sql_proxy_socket_directory).mkdir(parents=True,exist_ok=True)command_to_run.extend(self._get_credential_parameters())self.log.info("Running the command: `%s`"," ".join(command_to_run))self.sql_proxy_process=Popen(command_to_run,stdin=PIPE,stdout=PIPE,stderr=PIPE)self.log.info("The pid of cloud_sql_proxy: %s",self.sql_proxy_process.pid)whileTrue:line=(self.sql_proxy_process.stderr.readline().decode("utf-8")ifself.sql_proxy_process.stderrelse"")return_code=self.sql_proxy_process.poll()ifline==""andreturn_codeisnotNone:self.sql_proxy_process=NoneraiseAirflowException(f"The cloud_sql_proxy finished early with return code {return_code}!")ifline!="":self.log.info(line)if"googleapi: Error"inlineor"invalid instance name:"inline:self.stop_proxy()raiseAirflowException(f"Error when starting the cloud_sql_proxy {line}!")if"Ready for new connections"inline:return
[docs]defstop_proxy(self)->None:""" Stop running proxy. You should stop the proxy after you stop using it. """ifnotself.sql_proxy_process:raiseAirflowException("The sql proxy is not started yet")else:self.log.info("Stopping the cloud_sql_proxy pid: %s",self.sql_proxy_process.pid)self.sql_proxy_process.kill()self.sql_proxy_process=None# Cleanup!self.log.info("Removing the socket directory: %s",self.cloud_sql_proxy_socket_directory)shutil.rmtree(self.cloud_sql_proxy_socket_directory,ignore_errors=True)ifself.sql_proxy_was_downloaded:self.log.info("Removing downloaded proxy: %s",self.sql_proxy_path)# Silently ignore if the file has already been removed (concurrency)try:os.remove(self.sql_proxy_path)exceptOSErrorase:ife.errno!=errno.ENOENT:raiseelse:self.log.info("Skipped removing proxy - it was not downloaded: %s",self.sql_proxy_path)ifos.path.isfile(self.credentials_path):self.log.info("Removing generated credentials file %s",self.credentials_path)# Here file cannot be delete by concurrent task (each task has its own copy)os.remove(self.credentials_path)
[docs]defget_proxy_version(self)->str|None:"""Return version of the Cloud SQL Proxy."""self._download_sql_proxy_if_needed()command_to_run=[self.sql_proxy_path]command_to_run.extend(["--version"])command_to_run.extend(self._get_credential_parameters())result=subprocess.check_output(command_to_run).decode("utf-8")matched=re.search("[Vv]ersion (.*?);",result)ifmatched:returnmatched.group(1)else:returnNone
[docs]defget_socket_path(self)->str:""" Retrieve UNIX socket path used by Cloud SQL Proxy. :return: The dynamically generated path for the socket created by the proxy. """returnself.cloud_sql_proxy_socket_directory+"/"+self.instance_specification
[docs]classCloudSQLDatabaseHook(BaseHook):""" Serves DB connection configuration for Google Cloud SQL (Connections of *gcpcloudsqldb://* type). The hook is a "meta" one. It does not perform an actual connection. It is there to retrieve all the parameters configured in gcpcloudsql:// connection, start/stop Cloud SQL Proxy if needed, dynamically generate Postgres or MySQL connection in the database and return an actual Postgres or MySQL hook. The returned Postgres/MySQL hooks are using direct connection or Cloud SQL Proxy socket/TCP as configured. Main parameters of the hook are retrieved from the standard URI components: * **user** - User name to authenticate to the database (from login of the URI). * **password** - Password to authenticate to the database (from password of the URI). * **public_ip** - IP to connect to for public connection (from host of the URI). * **public_port** - Port to connect to for public connection (from port of the URI). * **database** - Database to connect to (from schema of the URI). * **sql_proxy_binary_path** - Optional path to Cloud SQL Proxy binary. If the binary is not specified or the binary is not present, it is automatically downloaded. Remaining parameters are retrieved from the extras (URI query parameters): * **project_id** - Optional, Google Cloud project where the Cloud SQL instance exists. If missing, default project id passed is used. * **instance** - Name of the instance of the Cloud SQL database instance. * **location** - The location of the Cloud SQL instance (for example europe-west1). * **database_type** - The type of the database instance (MySQL or Postgres). * **use_proxy** - (default False) Whether SQL proxy should be used to connect to Cloud SQL DB. * **use_ssl** - (default False) Whether SSL should be used to connect to Cloud SQL DB. You cannot use proxy and SSL together. * **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to connect via proxy, otherwise UNIX sockets are used. * **sql_proxy_version** - Specific version of the proxy to download (for example v1.13). If not specified, the latest version is downloaded. * **sslcert** - Path to client certificate to authenticate when SSL is used. * **sslkey** - Path to client private key to authenticate when SSL is used. * **sslrootcert** - Path to server's certificate to authenticate when SSL is used. :param gcp_cloudsql_conn_id: URL of the connection :param gcp_conn_id: The connection ID used to connect to Google Cloud for cloud-sql-proxy authentication. :param default_gcp_project_id: Default project id used if project_id not specified in the connection URL """
def__init__(self,gcp_cloudsql_conn_id:str="google_cloud_sql_default",gcp_conn_id:str="google_cloud_default",default_gcp_project_id:str|None=None,sql_proxy_binary_path:str|None=None,)->None:super().__init__()self.gcp_conn_id=gcp_conn_idself.gcp_cloudsql_conn_id=gcp_cloudsql_conn_idself.cloudsql_connection=self.get_connection(self.gcp_cloudsql_conn_id)self.extras=self.cloudsql_connection.extra_dejsonself.project_id=self.extras.get("project_id",default_gcp_project_id)self.instance=self.extras.get("instance")self.database=self.cloudsql_connection.schemaself.location=self.extras.get("location")self.database_type=self.extras.get("database_type")self.use_proxy=self._get_bool(self.extras.get("use_proxy","False"))self.use_ssl=self._get_bool(self.extras.get("use_ssl","False"))self.sql_proxy_use_tcp=self._get_bool(self.extras.get("sql_proxy_use_tcp","False"))self.sql_proxy_version=self.extras.get("sql_proxy_version")self.sql_proxy_binary_path=sql_proxy_binary_pathself.user=self.cloudsql_connection.loginself.password=self.cloudsql_connection.passwordself.public_ip=self.cloudsql_connection.hostself.public_port=self.cloudsql_connection.portself.sslcert=self.extras.get("sslcert")self.sslkey=self.extras.get("sslkey")self.sslrootcert=self.extras.get("sslrootcert")# Port and socket path and db_hook are automatically generatedself.sql_proxy_tcp_port=Noneself.sql_proxy_unique_path:str|None=Noneself.db_hook:PostgresHook|MySqlHook|None=Noneself.reserved_tcp_socket:socket.socket|None=None# Generated based on clock + clock sequence. Unique per host (!).# This is important as different hosts share the databaseself.db_conn_id=str(uuid.uuid1())self._validate_inputs()@staticmethoddef_get_bool(val:Any)->bool:ifval=="False"orvalisFalse:returnFalsereturnTrue@staticmethoddef_check_ssl_file(file_to_check,name)->None:ifnotfile_to_check:raiseAirflowException(f"SSL connections requires {name} to be set")ifnotos.path.isfile(file_to_check):raiseAirflowException(f"The {file_to_check} must be a readable file")def_validate_inputs(self)->None:ifself.project_id=="":raiseAirflowException("The required extra 'project_id' is empty")ifnotself.location:raiseAirflowException("The required extra 'location' is empty or None")ifnotself.instance:raiseAirflowException("The required extra 'instance' is empty or None")ifself.database_typenotinCLOUD_SQL_VALID_DATABASE_TYPES:raiseAirflowException(f"Invalid database type '{self.database_type}'. "f"Must be one of {CLOUD_SQL_VALID_DATABASE_TYPES}")ifself.use_proxyandself.use_ssl:raiseAirflowException("Cloud SQL Proxy does not support SSL connections."" SSL is not needed as Cloud SQL Proxy ""provides encryption on its own")
[docs]defvalidate_socket_path_length(self)->None:""" Validate sockets path length. :return: None or rises AirflowException """ifself.use_proxyandnotself.sql_proxy_use_tcp:ifself.database_type=="postgres":suffix="/.s.PGSQL.5432"else:suffix=""expected_path=(f"{self._generate_unique_path()}/{self.project_id}:{self.instance}:{self.database}{suffix}")iflen(expected_path)>UNIX_PATH_MAX:self.log.info("Too long (%s) path: %s",len(expected_path),expected_path)raiseAirflowException(f"The UNIX socket path length cannot exceed {UNIX_PATH_MAX} characters on Linux system. ""Either use shorter instance/database name or switch to TCP connection. "f"The socket path for Cloud SQL proxy is now:{expected_path}")
@staticmethoddef_generate_unique_path()->str:"""Generate a unique path. We don't using mkdtemp here since it can generate paths close to 60 characters. We append project/location/instance to the path, Postgres will then appends its own prefix, making the resulting path exceed the 100 character length limitation of a socket path. This generates a shorter path ``${tempdir()}[8 random characters]``. """random.seed()whileTrue:candidate=os.path.join(gettempdir(),"".join(random.choices(string.ascii_lowercase+string.digits,k=8)))ifnotos.path.exists(candidate):returncandidate@staticmethoddef_quote(value)->str|None:returnquote_plus(value)ifvalueelseNonedef_generate_connection_uri(self)->str:ifself.use_proxy:ifself.sql_proxy_use_tcp:ifnotself.sql_proxy_tcp_port:self.reserve_free_tcp_port()ifnotself.sql_proxy_unique_path:self.sql_proxy_unique_path=self._generate_unique_path()ifnotself.database_type:raiseValueError("The database_type should be set")database_uris=CONNECTION_URIS[self.database_type]ssl_spec=Nonesocket_path=Noneifself.use_proxy:proxy_uris=database_uris["proxy"]ifself.sql_proxy_use_tcp:format_string=proxy_uris["tcp"]else:format_string=proxy_uris["socket"]socket_path=f"{self.sql_proxy_unique_path}/{self._get_instance_socket_name()}"else:public_uris=database_uris["public"]ifself.use_ssl:format_string=public_uris["ssl"]ssl_spec={"cert":self.sslcert,"key":self.sslkey,"ca":self.sslrootcert}else:format_string=public_uris["non-ssl"]ifnotself.user:raiseAirflowException("The login parameter needs to be set in connection")ifnotself.public_ip:raiseAirflowException("The location parameter needs to be set in connection")ifnotself.password:raiseAirflowException("The password parameter needs to be set in connection")ifnotself.database:raiseAirflowException("The database parameter needs to be set in connection")connection_uri=format_string.format(user=quote_plus(self.user)ifself.userelse"",password=quote_plus(self.password)ifself.passwordelse"",database=quote_plus(self.database)ifself.databaseelse"",public_ip=self.public_ip,public_port=self.public_port,proxy_port=self.sql_proxy_tcp_port,socket_path=self._quote(socket_path),ssl_spec=self._quote(json.dumps(ssl_spec))ifssl_specelse"",client_cert_file=self._quote(self.sslcert)ifself.sslcertelse"",client_key_file=self._quote(self.sslkey)ifself.sslcertelse"",server_ca_file=self._quote(self.sslrootcertifself.sslcertelse""),)self.log.info("DB connection URI %s",connection_uri.replace(quote_plus(self.password)ifself.passwordelse"PASSWORD","XXXXXXXXXXXX"),)returnconnection_uridef_get_instance_socket_name(self)->str:returnself.project_id+":"+self.location+":"+self.instancedef_get_sqlproxy_instance_specification(self)->str:instance_specification=self._get_instance_socket_name()ifself.sql_proxy_use_tcp:instance_specification+=f"=tcp:{self.sql_proxy_tcp_port}"returninstance_specification
[docs]defcreate_connection(self)->Connection:"""Create a connection. Connection ID will be randomly generated according to whether it uses proxy, TCP, UNIX sockets, SSL. """uri=self._generate_connection_uri()connection=Connection(conn_id=self.db_conn_id,uri=uri)self.log.info("Creating connection %s",self.db_conn_id)returnconnection
[docs]defget_sqlproxy_runner(self)->CloudSqlProxyRunner:"""Retrieve Cloud SQL Proxy runner. It is used to manage the proxy lifecycle per task. :return: The Cloud SQL Proxy runner. """ifnotself.use_proxy:raiseValueError("Proxy runner can only be retrieved in case of use_proxy = True")ifnotself.sql_proxy_unique_path:raiseValueError("The sql_proxy_unique_path should be set")returnCloudSqlProxyRunner(path_prefix=self.sql_proxy_unique_path,instance_specification=self._get_sqlproxy_instance_specification(),project_id=self.project_id,sql_proxy_version=self.sql_proxy_version,sql_proxy_binary_path=self.sql_proxy_binary_path,gcp_conn_id=self.gcp_conn_id,)
[docs]defget_database_hook(self,connection:Connection)->PostgresHook|MySqlHook:"""Retrieve database hook. This is the actual Postgres or MySQL database hook that uses proxy or connects directly to the Google Cloud SQL database. """ifself.database_type=="postgres":db_hook:PostgresHook|MySqlHook=PostgresHook(connection=connection,schema=self.database)else:db_hook=MySqlHook(connection=connection,schema=self.database)self.db_hook=db_hookreturndb_hook
[docs]defcleanup_database_hook(self)->None:"""Clean up database hook after it was used."""ifself.database_type=="postgres":ifnotself.db_hook:raiseValueError("The db_hook should be set")ifnotisinstance(self.db_hook,PostgresHook):raiseValueError(f"The db_hook should be PostgresHook and is {type(self.db_hook)}")conn=getattr(self.db_hook,"conn")ifconnandconn.notices:foroutputinself.db_hook.conn.notices:self.log.info(output)
[docs]defreserve_free_tcp_port(self)->None:"""Reserve free TCP port to be used by Cloud SQL Proxy."""self.reserved_tcp_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)self.reserved_tcp_socket.bind(("127.0.0.1",0))self.sql_proxy_tcp_port=self.reserved_tcp_socket.getsockname()[1]
[docs]deffree_reserved_port(self)->None:"""Free TCP port. Makes it immediately ready to be used by Cloud SQL Proxy. """ifself.reserved_tcp_socket:self.reserved_tcp_socket.close()self.reserved_tcp_socket=None