Source code for airflow.providers.databricks.operators.databricks_repos
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Databricks operators."""from__future__importannotationsimportrefromcollections.abcimportSequencefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKINGfromurllib.parseimporturlsplitfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.databricks.hooks.databricksimportDatabricksHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classDatabricksReposCreateOperator(BaseOperator):""" Creates, and optionally checks out, a Databricks Repo using the POST api/2.0/repos API endpoint. See: https://docs.databricks.com/dev-tools/api/latest/repos.html#operation/create-repo :param git_url: Required HTTPS URL of a Git repository :param git_provider: Optional name of Git provider. Must be provided if we can't guess its name from URL. :param repo_path: optional path for a repository. Must be in the format ``/Repos/{folder}/{repo-name}``. If not specified, it will be created in the user's directory. :param branch: optional name of branch to check out. :param tag: optional name of tag to checkout. :param ignore_existing_repo: don't throw exception if repository with given path already exists. :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. By default and in the common case this will be ``databricks_default``. To use token based authentication, provide the key ``token`` in the extra field for the connection and create the key ``host`` and leave the ``host`` field empty. (templated) :param databricks_retry_limit: Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1. :param databricks_retry_delay: Number of seconds to wait between retries (it might be a floating point number). """# Used in airflow.models.BaseOperator
def__init__(self,*,git_url:str,git_provider:str|None=None,branch:str|None=None,tag:str|None=None,repo_path:str|None=None,ignore_existing_repo:bool=False,databricks_conn_id:str="databricks_default",databricks_retry_limit:int=3,databricks_retry_delay:int=1,**kwargs,)->None:"""Create a new ``DatabricksReposCreateOperator``."""super().__init__(**kwargs)self.databricks_conn_id=databricks_conn_idself.databricks_retry_limit=databricks_retry_limitself.databricks_retry_delay=databricks_retry_delayself.git_url=git_urlself.ignore_existing_repo=ignore_existing_repoifgit_providerisNone:self.git_provider=self.__detect_repo_provider__(git_url)ifself.git_providerisNone:raiseAirflowException(f"git_provider isn't specified and couldn't be guessed for URL {git_url}")else:self.git_provider=git_providerself.repo_path=repo_pathifbranchisnotNoneandtagisnotNone:raiseAirflowException("Only one of branch or tag should be provided, but not both")self.branch=branchself.tag=tag@staticmethod
[docs]defexecute(self,context:Context):""" Create a Databricks Repo. :param context: context :return: Repo ID """payload={"url":self.git_url,"provider":self.git_provider,}ifself.repo_pathisnotNone:ifnotself.__repos_path_regexp__.match(self.repo_path):raiseAirflowException(f"repo_path should have form of /Repos/{{folder}}/{{repo-name}}, got '{self.repo_path}'")payload["path"]=self.repo_pathexisting_repo_id=Noneifself.repo_pathisnotNone:existing_repo_id=self._hook.get_repo_by_path(self.repo_path)ifexisting_repo_idisnotNoneandnotself.ignore_existing_repo:raiseAirflowException(f"Repo with path '{self.repo_path}' already exists")ifexisting_repo_idisNone:result=self._hook.create_repo(payload)repo_id=result["id"]else:repo_id=existing_repo_id# update repo if necessaryifself.branchisnotNone:self._hook.update_repo(str(repo_id),{"branch":str(self.branch)})elifself.tagisnotNone:self._hook.update_repo(str(repo_id),{"tag":str(self.tag)})returnrepo_id
[docs]classDatabricksReposUpdateOperator(BaseOperator):""" Updates specified repository to a given branch or tag using the PATCH api/2.0/repos API endpoint. See: https://docs.databricks.com/dev-tools/api/latest/repos.html#operation/update-repo :param branch: optional name of branch to update to. Should be specified if ``tag`` is omitted :param tag: optional name of tag to update to. Should be specified if ``branch`` is omitted :param repo_id: optional ID of existing repository. Should be specified if ``repo_path`` is omitted :param repo_path: optional path of existing repository. Should be specified if ``repo_id`` is omitted :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. By default and in the common case this will be ``databricks_default``. To use token based authentication, provide the key ``token`` in the extra field for the connection and create the key ``host`` and leave the ``host`` field empty. (templated) :param databricks_retry_limit: Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1. :param databricks_retry_delay: Number of seconds to wait between retries (it might be a floating point number). """# Used in airflow.models.BaseOperator
def__init__(self,*,branch:str|None=None,tag:str|None=None,repo_id:str|None=None,repo_path:str|None=None,databricks_conn_id:str="databricks_default",databricks_retry_limit:int=3,databricks_retry_delay:int=1,**kwargs,)->None:"""Create a new ``DatabricksReposUpdateOperator``."""super().__init__(**kwargs)self.databricks_conn_id=databricks_conn_idself.databricks_retry_limit=databricks_retry_limitself.databricks_retry_delay=databricks_retry_delayifbranchisnotNoneandtagisnotNone:raiseAirflowException("Only one of branch or tag should be provided, but not both")ifbranchisNoneandtagisNone:raiseAirflowException("One of branch or tag should be provided")ifrepo_idisnotNoneandrepo_pathisnotNone:raiseAirflowException("Only one of repo_id or repo_path should be provided, but not both")ifrepo_idisNoneandrepo_pathisNone:raiseAirflowException("One of repo_id or repo_path should be provided")self.repo_path=repo_pathself.repo_id=repo_idself.branch=branchself.tag=tag@cached_propertydef_hook(self)->DatabricksHook:returnDatabricksHook(self.databricks_conn_id,retry_limit=self.databricks_retry_limit,retry_delay=self.databricks_retry_delay,caller="DatabricksReposUpdateOperator",)
[docs]defexecute(self,context:Context):ifself.repo_pathisnotNone:self.repo_id=self._hook.get_repo_by_path(self.repo_path)ifself.repo_idisNone:raiseAirflowException(f"Can't find Repo ID for path '{self.repo_path}'")ifself.branchisnotNone:payload={"branch":str(self.branch)}else:payload={"tag":str(self.tag)}result=self._hook.update_repo(str(self.repo_id),payload)returnresult["head_commit_id"]
[docs]classDatabricksReposDeleteOperator(BaseOperator):""" Deletes specified repository using the DELETE api/2.0/repos API endpoint. See: https://docs.databricks.com/dev-tools/api/latest/repos.html#operation/delete-repo :param repo_id: optional ID of existing repository. Should be specified if ``repo_path`` is omitted :param repo_path: optional path of existing repository. Should be specified if ``repo_id`` is omitted :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. By default and in the common case this will be ``databricks_default``. To use token based authentication, provide the key ``token`` in the extra field for the connection and create the key ``host`` and leave the ``host`` field empty. (templated) :param databricks_retry_limit: Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1. :param databricks_retry_delay: Number of seconds to wait between retries (it might be a floating point number). """# Used in airflow.models.BaseOperator
def__init__(self,*,repo_id:str|None=None,repo_path:str|None=None,databricks_conn_id:str="databricks_default",databricks_retry_limit:int=3,databricks_retry_delay:int=1,**kwargs,)->None:"""Create a new ``DatabricksReposDeleteOperator``."""super().__init__(**kwargs)self.databricks_conn_id=databricks_conn_idself.databricks_retry_limit=databricks_retry_limitself.databricks_retry_delay=databricks_retry_delayifrepo_idisnotNoneandrepo_pathisnotNone:raiseAirflowException("Only one of repo_id or repo_path should be provided, but not both")ifrepo_idisNoneandrepo_pathisNone:raiseAirflowException("One of repo_id repo_path tag should be provided")self.repo_path=repo_pathself.repo_id=repo_id@cached_propertydef_hook(self)->DatabricksHook:returnDatabricksHook(self.databricks_conn_id,retry_limit=self.databricks_retry_limit,retry_delay=self.databricks_retry_delay,caller="DatabricksReposDeleteOperator",)
[docs]defexecute(self,context:Context):ifself.repo_pathisnotNone:self.repo_id=self._hook.get_repo_by_path(self.repo_path)ifself.repo_idisNone:raiseAirflowException(f"Can't find Repo ID for path '{self.repo_path}'")self._hook.delete_repo(str(self.repo_id))