Source code for airflow.providers.databricks.utils.databricks
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#fromtypingimportUnionfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.databricks.hooks.databricksimportRunState
[docs]defdeep_string_coerce(content,json_path:str='json')->Union[str,list,dict]:""" Coerces content or all values of content if it is a dict to a string. The function will throw if content contains non-string or non-numeric types. The reason why we have this function is because the ``self.json`` field must be a dict with only string values. This is because ``render_template`` will fail for numerical values. """coerce=deep_string_coerceifisinstance(content,str):returncontentelifisinstance(content,(int,float,),):# Databricks can tolerate either numeric or string types in the API backend.returnstr(content)elifisinstance(content,(list,tuple)):return[coerce(e,f'{json_path}[{i}]')fori,einenumerate(content)]elifisinstance(content,dict):return{k:coerce(v,f'{json_path}[{k}]')fork,vinlist(content.items())}else:param_type=type(content)msg=f'Type {param_type} used for parameter {json_path} is not a number or a string'raiseAirflowException(msg)
[docs]defvalidate_trigger_event(event:dict):""" Validates correctness of the event received from :class:`~airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger` """keys_to_check=['run_id','run_page_url','run_state']forkeyinkeys_to_check:ifkeynotinevent:raiseAirflowException(f'Could not find `{key}` in the event: {event}')try:RunState.from_json(event['run_state'])exceptException:raiseAirflowException(f'Run state returned by the Trigger is incorrect: {event["run_state"]}')