Source code for airflow.providers.databricks.hooks.databricks
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Databricks hook.This hook enable the submitting and running of jobs to the Databricks platform. Internally theoperators talk to the``api/2.1/jobs/run-now```endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_`or the ``api/2.1/jobs/runs/submit```endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit>`_."""from__future__importannotationsimportjsonfromtypingimportAnyfromrequestsimportexceptionsasrequests_exceptionsfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.databricks.hooks.databricks_baseimportBaseDatabricksHook
[docs]classRunState:"""Utility class for the run state concept of Databricks runs."""def__init__(self,life_cycle_state:str,result_state:str="",state_message:str="",*args,**kwargs)->None:self.life_cycle_state=life_cycle_stateself.result_state=result_stateself.state_message=state_message@property
[docs]defis_terminal(self)->bool:"""True if the current state is a terminal state."""ifself.life_cycle_statenotinRUN_LIFE_CYCLE_STATES:raiseAirflowException(("Unexpected life cycle state: {}: If the state has ""been introduced recently, please check the Databricks user ""guide for troubleshooting information").format(self.life_cycle_state))returnself.life_cycle_statein("TERMINATED","SKIPPED","INTERNAL_ERROR")
@property
[docs]defis_successful(self)->bool:"""True if the result state is SUCCESS."""returnself.result_state=="SUCCESS"
[docs]classDatabricksHook(BaseDatabricksHook):""" Interact with Databricks. :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. :param timeout_seconds: The amount of time in seconds the requests library will wait before timing-out. :param retry_limit: The number of times to retry the connection in case of service outages. :param retry_delay: The number of seconds to wait between retries (it might be a floating point number). :param retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class. """
[docs]defrun_now(self,json:dict)->int:""" Utility function to call the ``api/2.0/jobs/run-now`` endpoint. :param json: The data used in the body of the request to the ``run-now`` endpoint. :return: the run_id as an int """response=self._do_api_call(RUN_NOW_ENDPOINT,json)returnresponse["run_id"]
[docs]defsubmit_run(self,json:dict)->int:""" Utility function to call the ``api/2.0/jobs/runs/submit`` endpoint. :param json: The data used in the body of the request to the ``submit`` endpoint. :return: the run_id as an int """response=self._do_api_call(SUBMIT_RUN_ENDPOINT,json)returnresponse["run_id"]
[docs]deflist_jobs(self,limit:int=25,offset:int=0,expand_tasks:bool=False,job_name:str|None=None)->list[dict[str,Any]]:""" Lists the jobs in the Databricks Job Service. :param limit: The limit/batch size used to retrieve jobs. :param offset: The offset of the first job to return, relative to the most recently created job. :param expand_tasks: Whether to include task and cluster details in the response. :param job_name: Optional name of a job to search. :return: A list of jobs. """has_more=Trueall_jobs=[]whilehas_more:payload:dict[str,Any]={"limit":limit,"expand_tasks":expand_tasks,"offset":offset,}ifjob_name:payload["name"]=job_nameresponse=self._do_api_call(LIST_JOBS_ENDPOINT,payload)jobs=response.get("jobs",[])ifjob_name:all_jobs+=[jforjinjobsifj["settings"]["name"]==job_name]else:all_jobs+=jobshas_more=response.get("has_more",False)ifhas_more:offset+=len(jobs)returnall_jobs
[docs]deffind_job_id_by_name(self,job_name:str)->int|None:""" Finds job id by its name. If there are multiple jobs with the same name, raises AirflowException. :param job_name: The name of the job to look up. :return: The job_id as an int or None if no job was found. """matching_jobs=self.list_jobs(job_name=job_name)iflen(matching_jobs)>1:raiseAirflowException(f"There are more than one job with name {job_name}. Please delete duplicated jobs first")ifnotmatching_jobs:returnNoneelse:returnmatching_jobs[0]["job_id"]
[docs]defget_run_page_url(self,run_id:int)->str:""" Retrieves run_page_url. :param run_id: id of the run :return: URL of the run page """json={"run_id":run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)returnresponse["run_page_url"]
[docs]asyncdefa_get_run_page_url(self,run_id:int)->str:""" Async version of `get_run_page_url()`. :param run_id: id of the run :return: URL of the run page """json={"run_id":run_id}response=awaitself._a_do_api_call(GET_RUN_ENDPOINT,json)returnresponse["run_page_url"]
[docs]defget_job_id(self,run_id:int)->int:""" Retrieves job_id from run_id. :param run_id: id of the run :return: Job id for given Databricks run """json={"run_id":run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)returnresponse["job_id"]
[docs]defget_run_state(self,run_id:int)->RunState:""" Retrieves run state of the run. Please note that any Airflow tasks that call the ``get_run_state`` method will result in failure unless you have enabled xcom pickling. This can be done using the following environment variable: ``AIRFLOW__CORE__ENABLE_XCOM_PICKLING`` If you do not want to enable xcom pickling, use the ``get_run_state_str`` method to get a string describing state, or ``get_run_state_lifecycle``, ``get_run_state_result``, or ``get_run_state_message`` to get individual components of the run state. :param run_id: id of the run :return: state of the run """json={"run_id":run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)state=response["state"]returnRunState(**state)
[docs]asyncdefa_get_run_state(self,run_id:int)->RunState:""" Async version of `get_run_state()`. :param run_id: id of the run :return: state of the run """json={"run_id":run_id}response=awaitself._a_do_api_call(GET_RUN_ENDPOINT,json)state=response["state"]returnRunState(**state)
[docs]defget_run(self,run_id:int)->dict[str,Any]:""" Retrieve run information. :param run_id: id of the run :return: state of the run """json={"run_id":run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)returnresponse
[docs]asyncdefa_get_run(self,run_id:int)->dict[str,Any]:""" Async version of `get_run`. :param run_id: id of the run :return: state of the run """json={"run_id":run_id}response=awaitself._a_do_api_call(GET_RUN_ENDPOINT,json)returnresponse
[docs]defget_run_state_str(self,run_id:int)->str:""" Return the string representation of RunState. :param run_id: id of the run :return: string describing run state """state=self.get_run_state(run_id)run_state_str=(f"State: {state.life_cycle_state}. Result: {state.result_state}. {state.state_message}")returnrun_state_str
[docs]defget_run_state_lifecycle(self,run_id:int)->str:""" Returns the lifecycle state of the run. :param run_id: id of the run :return: string with lifecycle state """returnself.get_run_state(run_id).life_cycle_state
[docs]defget_run_state_result(self,run_id:int)->str:""" Returns the resulting state of the run. :param run_id: id of the run :return: string with resulting state """returnself.get_run_state(run_id).result_state
[docs]defget_run_state_message(self,run_id:int)->str:""" Returns the state message for the run. :param run_id: id of the run :return: string with state message """returnself.get_run_state(run_id).state_message
[docs]defget_run_output(self,run_id:int)->dict:""" Retrieves run output of the run. :param run_id: id of the run :return: output of the run """json={"run_id":run_id}run_output=self._do_api_call(OUTPUT_RUNS_JOB_ENDPOINT,json)returnrun_output
[docs]defcancel_run(self,run_id:int)->None:""" Cancels the run. :param run_id: id of the run """json={"run_id":run_id}self._do_api_call(CANCEL_RUN_ENDPOINT,json)
[docs]defcancel_all_runs(self,job_id:int)->None:""" Cancels all active runs of a job. The runs are canceled asynchronously. :param job_id: The canonical identifier of the job to cancel all runs of """json={"job_id":job_id}self._do_api_call(CANCEL_ALL_RUNS_ENDPOINT,json)
[docs]defdelete_run(self,run_id:int)->None:""" Deletes a non-active run. :param run_id: id of the run """json={"run_id":run_id}self._do_api_call(DELETE_RUN_ENDPOINT,json)
[docs]defrepair_run(self,json:dict)->None:""" Re-run one or more tasks. :param json: repair a job run. """self._do_api_call(REPAIR_RUN_ENDPOINT,json)
[docs]definstall(self,json:dict)->None:""" Install libraries on the cluster. Utility function to call the ``2.0/libraries/install`` endpoint. :param json: json dictionary containing cluster_id and an array of library """self._do_api_call(INSTALL_LIBS_ENDPOINT,json)
[docs]defuninstall(self,json:dict)->None:""" Uninstall libraries on the cluster. Utility function to call the ``2.0/libraries/uninstall`` endpoint. :param json: json dictionary containing cluster_id and an array of library """self._do_api_call(UNINSTALL_LIBS_ENDPOINT,json)
[docs]defupdate_repo(self,repo_id:str,json:dict[str,Any])->dict:""" Updates given Databricks Repos. :param repo_id: ID of Databricks Repos :param json: payload :return: metadata from update """repos_endpoint=("PATCH",f"api/2.0/repos/{repo_id}")returnself._do_api_call(repos_endpoint,json)
[docs]defdelete_repo(self,repo_id:str):""" Deletes given Databricks Repos. :param repo_id: ID of Databricks Repos :return: """repos_endpoint=("DELETE",f"api/2.0/repos/{repo_id}")self._do_api_call(repos_endpoint)
[docs]defcreate_repo(self,json:dict[str,Any])->dict:""" Creates a Databricks Repos. :param json: payload :return: """repos_endpoint=("POST","api/2.0/repos")returnself._do_api_call(repos_endpoint,json)
[docs]defget_repo_by_path(self,path:str)->str|None:""" Obtains Repos ID by path. :param path: path to a repository :return: Repos ID if it exists, None if doesn't. """try:result=self._do_api_call(WORKSPACE_GET_STATUS_ENDPOINT,{"path":path},wrap_http_errors=False)ifresult.get("object_type","")=="REPO":returnstr(result["object_id"])exceptrequests_exceptions.HTTPErrorase:ife.response.status_code!=404:raiseereturnNone
[docs]deftest_connection(self)->tuple[bool,str]:"""Test the Databricks connectivity from UI."""hook=DatabricksHook(databricks_conn_id=self.databricks_conn_id)try:hook._do_api_call(endpoint_info=SPARK_VERSIONS_ENDPOINT).get("versions")status=Truemessage="Connection successfully tested"exceptExceptionase:status=Falsemessage=str(e)returnstatus,message