Source code for airflow.providers.alibaba.cloud.hooks.oss
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromfunctoolsimportwrapsfrominspectimportsignaturefromtypingimportTYPE_CHECKING,Callable,TypeVar,castfromurllib.parseimporturlsplitimportoss2fromoss2.exceptionsimportClientErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookifTYPE_CHECKING:fromairflow.models.connectionimportConnection
[docs]defprovide_bucket_name(func:T)->T:"""Unify bucket name and key if a key is provided but not a bucket name."""function_signature=signature(func)@wraps(func)defwrapper(*args,**kwargs)->T:bound_args=function_signature.bind(*args,**kwargs)self=args[0]ifbound_args.arguments.get("bucket_name")isNoneandself.oss_conn_id:connection=self.get_connection(self.oss_conn_id)ifconnection.schema:bound_args.arguments["bucket_name"]=connection.schemareturnfunc(*bound_args.args,**bound_args.kwargs)returncast("T",wrapper)
[docs]defunify_bucket_name_and_key(func:T)->T:"""Unify bucket name and key if a key is provided but not a bucket name."""function_signature=signature(func)@wraps(func)defwrapper(*args,**kwargs)->T:bound_args=function_signature.bind(*args,**kwargs)defget_key()->str:if"key"inbound_args.arguments:return"key"raiseValueError("Missing key parameter!")key_name=get_key()ifbound_args.arguments.get("bucket_name")isNone:bound_args.arguments["bucket_name"],bound_args.arguments["key"]=OSSHook.parse_oss_url(bound_args.arguments[key_name])returnfunc(*bound_args.args,**bound_args.kwargs)returncast("T",wrapper)
[docs]classOSSHook(BaseHook):"""Interact with Alibaba Cloud OSS, using the oss2 library."""
[docs]defget_conn(self)->Connection:"""Return connection for the hook."""returnself.oss_conn
@staticmethod
[docs]defparse_oss_url(ossurl:str)->tuple:""" Parse the OSS Url into a bucket name and key. :param ossurl: The OSS Url to parse. :return: the parsed bucket name and key """parsed_url=urlsplit(ossurl)ifnotparsed_url.netloc:raiseAirflowException(f'Please provide a bucket_name instead of "{ossurl}"')bucket_name=parsed_url.netlockey=parsed_url.path.lstrip("/")returnbucket_name,key
@provide_bucket_name@unify_bucket_name_and_key
[docs]defobject_exists(self,key:str,bucket_name:str|None=None)->bool:""" Check if object exists. :param key: the path of the object :param bucket_name: the name of the bucket :return: True if it exists and False if not. """try:returnself.get_bucket(bucket_name).object_exists(key)exceptClientErrorase:self.log.error(e.message)returnFalse
@provide_bucket_name
[docs]defget_bucket(self,bucket_name:str|None=None)->oss2.api.Bucket:""" Return a oss2.Bucket object. :param bucket_name: the name of the bucket :return: the bucket object to the bucket name. """auth=self.get_credential()returnoss2.Bucket(auth,f"https://oss-{self.region}.aliyuncs.com",bucket_name)
@provide_bucket_name@unify_bucket_name_and_key
[docs]defload_string(self,key:str,content:str,bucket_name:str|None=None)->None:""" Load a string to OSS. :param key: the path of the object :param content: str to set as content for the key. :param bucket_name: the name of the bucket """try:self.get_bucket(bucket_name).put_object(key,content)exceptExceptionase:raiseAirflowException(f"Errors: {e}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defupload_local_file(self,key:str,file:str,bucket_name:str|None=None,)->None:""" Upload a local file to OSS. :param key: the OSS path of the object :param file: local file to upload. :param bucket_name: the name of the bucket """try:self.get_bucket(bucket_name).put_object_from_file(key,file)exceptExceptionase:raiseAirflowException(f"Errors when upload file: {e}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defdownload_file(self,key:str,local_file:str,bucket_name:str|None=None,)->str|None:""" Download file from OSS. :param key: key of the file-like object to download. :param local_file: local path + file name to save. :param bucket_name: the name of the bucket :return: the file name. """try:self.get_bucket(bucket_name).get_object_to_file(key,local_file)exceptExceptionase:self.log.error(e)returnNonereturnlocal_file
@provide_bucket_name@unify_bucket_name_and_key
[docs]defdelete_object(self,key:str,bucket_name:str|None=None,)->None:""" Delete object from OSS. :param key: key of the object to delete. :param bucket_name: the name of the bucket """try:self.get_bucket(bucket_name).delete_object(key)exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when deleting: {key}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defdelete_objects(self,key:list,bucket_name:str|None=None,)->None:""" Delete objects from OSS. :param key: keys list of the objects to delete. :param bucket_name: the name of the bucket """try:self.get_bucket(bucket_name).batch_delete_objects(key)exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when deleting: {key}")
@provide_bucket_name
[docs]defdelete_bucket(self,bucket_name:str|None=None,)->None:""" Delete bucket from OSS. :param bucket_name: the name of the bucket """try:self.get_bucket(bucket_name).delete_bucket()exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when deleting: {bucket_name}")
@provide_bucket_name
[docs]defcreate_bucket(self,bucket_name:str|None=None,)->None:""" Create bucket. :param bucket_name: the name of the bucket """try:self.get_bucket(bucket_name).create_bucket()exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when create bucket: {bucket_name}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defappend_string(self,bucket_name:str|None,content:str,key:str,pos:int)->None:""" Append string to a remote existing file. :param bucket_name: the name of the bucket :param content: content to be appended :param key: oss bucket key :param pos: position of the existing file where the content will be appended """self.log.info("Write oss bucket. key: %s, pos: %s",key,pos)try:self.get_bucket(bucket_name).append_object(key,pos,content)exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when append string for object: {key}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defread_key(self,bucket_name:str|None,key:str)->str:""" Read oss remote object content with the specified key. :param bucket_name: the name of the bucket :param key: oss bucket key """self.log.info("Read oss key: %s",key)try:returnself.get_bucket(bucket_name).get_object(key).read().decode("utf-8")exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when read bucket object: {key}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defhead_key(self,bucket_name:str|None,key:str)->oss2.models.HeadObjectResult:""" Get meta info of the specified remote object. :param bucket_name: the name of the bucket :param key: oss bucket key """self.log.info("Head Object oss key: %s",key)try:returnself.get_bucket(bucket_name).head_object(key)exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when head bucket object: {key}")
@provide_bucket_name@unify_bucket_name_and_key
[docs]defkey_exist(self,bucket_name:str|None,key:str)->bool:""" Find out whether the specified key exists in the oss remote storage. :param bucket_name: the name of the bucket :param key: oss bucket key """# full_path = Noneself.log.info("Looking up oss bucket %s for bucket key %s ...",bucket_name,key)try:returnself.get_bucket(bucket_name).object_exists(key)exceptExceptionase:self.log.error(e)raiseAirflowException(f"Errors when check bucket object existence: {key}")
[docs]defget_credential(self)->oss2.auth.Auth:extra_config=self.oss_conn.extra_dejsonauth_type=extra_config.get("auth_type",None)ifnotauth_type:raiseValueError("No auth_type specified in extra_config. ")ifauth_type!="AK":raiseValueError(f"Unsupported auth_type: {auth_type}")oss_access_key_id=extra_config.get("access_key_id",None)oss_access_key_secret=extra_config.get("access_key_secret",None)ifnotoss_access_key_id:raiseValueError(f"No access_key_id is specified for connection: {self.oss_conn_id}")ifnotoss_access_key_secret:raiseValueError(f"No access_key_secret is specified for connection: {self.oss_conn_id}")returnoss2.Auth(oss_access_key_id,oss_access_key_secret)
[docs]defget_default_region(self)->str:extra_config=self.oss_conn.extra_dejsonauth_type=extra_config.get("auth_type",None)ifnotauth_type:raiseValueError("No auth_type specified in extra_config. ")ifauth_type!="AK":raiseValueError(f"Unsupported auth_type: {auth_type}")default_region=extra_config.get("region",None)ifnotdefault_region:raiseValueError(f"No region is specified for connection: {self.oss_conn_id}")returndefault_region