Source code for airflow.providers.databricks.hooks.databricks
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Databricks hook.This hook enable the submitting and running of jobs to the Databricks platform. Internally theoperators talk to the``api/2.1/jobs/run-now```endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_`or the ``api/2.1/jobs/runs/submit```endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit>`_."""importjsonfromtypingimportAny,Dict,List,Optionalfromrequestsimportexceptionsasrequests_exceptionsfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.databricks.hooks.databricks_baseimportBaseDatabricksHook
[docs]classRunState:"""Utility class for the run state concept of Databricks runs."""def__init__(self,life_cycle_state:str,result_state:str='',state_message:str='',*args,**kwargs)->None:self.life_cycle_state=life_cycle_stateself.result_state=result_stateself.state_message=state_message@property
[docs]defis_terminal(self)->bool:"""True if the current state is a terminal state."""ifself.life_cycle_statenotinRUN_LIFE_CYCLE_STATES:raiseAirflowException(('Unexpected life cycle state: {}: If the state has ''been introduced recently, please check the Databricks user ''guide for troubleshooting information').format(self.life_cycle_state))returnself.life_cycle_statein('TERMINATED','SKIPPED','INTERNAL_ERROR')
@property
[docs]defis_successful(self)->bool:"""True if the result state is SUCCESS"""returnself.result_state=='SUCCESS'
[docs]classDatabricksHook(BaseDatabricksHook):""" Interact with Databricks. :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. :param timeout_seconds: The amount of time in seconds the requests library will wait before timing-out. :param retry_limit: The number of times to retry the connection in case of service outages. :param retry_delay: The number of seconds to wait between retries (it might be a floating point number). :param retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class. """
[docs]defrun_now(self,json:dict)->int:""" Utility function to call the ``api/2.0/jobs/run-now`` endpoint. :param json: The data used in the body of the request to the ``run-now`` endpoint. :return: the run_id as an int :rtype: str """response=self._do_api_call(RUN_NOW_ENDPOINT,json)returnresponse['run_id']
[docs]defsubmit_run(self,json:dict)->int:""" Utility function to call the ``api/2.0/jobs/runs/submit`` endpoint. :param json: The data used in the body of the request to the ``submit`` endpoint. :return: the run_id as an int :rtype: str """response=self._do_api_call(SUBMIT_RUN_ENDPOINT,json)returnresponse['run_id']
[docs]deflist_jobs(self,limit:int=25,offset:int=0,expand_tasks:bool=False)->List[Dict[str,Any]]:""" Lists the jobs in the Databricks Job Service. :param limit: The limit/batch size used to retrieve jobs. :param offset: The offset of the first job to return, relative to the most recently created job. :param expand_tasks: Whether to include task and cluster details in the response. :return: A list of jobs. """has_more=Truejobs=[]whilehas_more:json={'limit':limit,'offset':offset,'expand_tasks':expand_tasks,}response=self._do_api_call(LIST_JOBS_ENDPOINT,json)jobs+=response['jobs']if'jobs'inresponseelse[]has_more=response.get('has_more',False)ifhas_more:offset+=len(response['jobs'])returnjobs
[docs]deffind_job_id_by_name(self,job_name:str)->Optional[int]:""" Finds job id by its name. If there are multiple jobs with the same name, raises AirflowException. :param job_name: The name of the job to look up. :return: The job_id as an int or None if no job was found. """all_jobs=self.list_jobs()matching_jobs=[jforjinall_jobsifj['settings']['name']==job_name]iflen(matching_jobs)>1:raiseAirflowException(f"There are more than one job with name {job_name}. Please delete duplicated jobs first")ifnotmatching_jobs:returnNoneelse:returnmatching_jobs[0]['job_id']
[docs]defget_run_page_url(self,run_id:int)->str:""" Retrieves run_page_url. :param run_id: id of the run :return: URL of the run page """json={'run_id':run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)returnresponse['run_page_url']
[docs]asyncdefa_get_run_page_url(self,run_id:int)->str:""" Async version of `get_run_page_url()`. :param run_id: id of the run :return: URL of the run page """json={'run_id':run_id}response=awaitself._a_do_api_call(GET_RUN_ENDPOINT,json)returnresponse['run_page_url']
[docs]defget_job_id(self,run_id:int)->int:""" Retrieves job_id from run_id. :param run_id: id of the run :return: Job id for given Databricks run """json={'run_id':run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)returnresponse['job_id']
[docs]defget_run_state(self,run_id:int)->RunState:""" Retrieves run state of the run. Please note that any Airflow tasks that call the ``get_run_state`` method will result in failure unless you have enabled xcom pickling. This can be done using the following environment variable: ``AIRFLOW__CORE__ENABLE_XCOM_PICKLING`` If you do not want to enable xcom pickling, use the ``get_run_state_str`` method to get a string describing state, or ``get_run_state_lifecycle``, ``get_run_state_result``, or ``get_run_state_message`` to get individual components of the run state. :param run_id: id of the run :return: state of the run """json={'run_id':run_id}response=self._do_api_call(GET_RUN_ENDPOINT,json)state=response['state']returnRunState(**state)
[docs]asyncdefa_get_run_state(self,run_id:int)->RunState:""" Async version of `get_run_state()`. :param run_id: id of the run :return: state of the run """json={'run_id':run_id}response=awaitself._a_do_api_call(GET_RUN_ENDPOINT,json)state=response['state']returnRunState(**state)
[docs]defget_run_state_str(self,run_id:int)->str:""" Return the string representation of RunState. :param run_id: id of the run :return: string describing run state """state=self.get_run_state(run_id)run_state_str=(f"State: {state.life_cycle_state}. Result: {state.result_state}. {state.state_message}")returnrun_state_str
[docs]defget_run_state_lifecycle(self,run_id:int)->str:""" Returns the lifecycle state of the run :param run_id: id of the run :return: string with lifecycle state """returnself.get_run_state(run_id).life_cycle_state
[docs]defget_run_state_result(self,run_id:int)->str:""" Returns the resulting state of the run :param run_id: id of the run :return: string with resulting state """returnself.get_run_state(run_id).result_state
[docs]defget_run_state_message(self,run_id:int)->str:""" Returns the state message for the run :param run_id: id of the run :return: string with state message """returnself.get_run_state(run_id).state_message
[docs]defget_run_output(self,run_id:int)->dict:""" Retrieves run output of the run. :param run_id: id of the run :return: output of the run """json={'run_id':run_id}run_output=self._do_api_call(OUTPUT_RUNS_JOB_ENDPOINT,json)returnrun_output
[docs]defcancel_run(self,run_id:int)->None:""" Cancels the run. :param run_id: id of the run """json={'run_id':run_id}self._do_api_call(CANCEL_RUN_ENDPOINT,json)
[docs]definstall(self,json:dict)->None:""" Install libraries on the cluster. Utility function to call the ``2.0/libraries/install`` endpoint. :param json: json dictionary containing cluster_id and an array of library """self._do_api_call(INSTALL_LIBS_ENDPOINT,json)
[docs]defuninstall(self,json:dict)->None:""" Uninstall libraries on the cluster. Utility function to call the ``2.0/libraries/uninstall`` endpoint. :param json: json dictionary containing cluster_id and an array of library """self._do_api_call(UNINSTALL_LIBS_ENDPOINT,json)
[docs]defupdate_repo(self,repo_id:str,json:Dict[str,Any])->dict:""" Updates given Databricks Repos :param repo_id: ID of Databricks Repos :param json: payload :return: metadata from update """repos_endpoint=('PATCH',f'api/2.0/repos/{repo_id}')returnself._do_api_call(repos_endpoint,json)
[docs]defdelete_repo(self,repo_id:str):""" Deletes given Databricks Repos :param repo_id: ID of Databricks Repos :return: """repos_endpoint=('DELETE',f'api/2.0/repos/{repo_id}')self._do_api_call(repos_endpoint)
[docs]defcreate_repo(self,json:Dict[str,Any])->dict:""" Creates a Databricks Repos :param json: payload :return: """repos_endpoint=('POST','api/2.0/repos')returnself._do_api_call(repos_endpoint,json)
[docs]defget_repo_by_path(self,path:str)->Optional[str]:""" Obtains Repos ID by path :param path: path to a repository :return: Repos ID if it exists, None if doesn't. """try:result=self._do_api_call(WORKSPACE_GET_STATUS_ENDPOINT,{'path':path},wrap_http_errors=False)ifresult.get('object_type','')=='REPO':returnstr(result['object_id'])exceptrequests_exceptions.HTTPErrorase:ife.response.status_code!=404:raiseereturnNone
[docs]deftest_connection(self):"""Test the Databricks connectivity from UI"""hook=DatabricksHook(databricks_conn_id=self.databricks_conn_id)try:hook._do_api_call(endpoint_info=LIST_ZONES_ENDPOINT).get('zones')status=Truemessage='Connection successfully tested'exceptExceptionase:status=Falsemessage=str(e)returnstatus,message