Source code for airflow.providers.alibaba.cloud.operators.analyticdb_spark
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromcollections.abcimportSequencefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Anyfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.alibaba.cloud.hooks.analyticdb_sparkimportAnalyticDBSparkHook,AppStateifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classAnalyticDBSparkBaseOperator(BaseOperator):"""Abstract base class that defines how users develop AnalyticDB Spark."""def__init__(self,*,adb_spark_conn_id:str="adb_spark_default",region:str|None=None,polling_interval:int=0,**kwargs:Any,)->None:super().__init__(**kwargs)
[docs]defmonitor_application(self):self.log.info("Monitoring application with %s",self.app_id)ifself.polling_interval>0:self.poll_for_termination(self.app_id)
[docs]defpoll_for_termination(self,app_id:str)->None:""" Pool for spark application termination. :param app_id: id of the spark application to monitor """state=self.hook.get_spark_state(app_id)whileAppState(state)notinAnalyticDBSparkHook.TERMINAL_STATES:self.log.debug("Application with id %s is in state: %s",app_id,state)time.sleep(self.polling_interval)state=self.hook.get_spark_state(app_id)self.log.info("Application with id %s terminated with state: %s",app_id,state)self.log.info("Web ui address is %s for application with id %s",self.hook.get_spark_web_ui_address(app_id),app_id,)self.log.info(self.hook.get_spark_log(app_id))ifAppState(state)!=AppState.COMPLETED:raiseAirflowException(f"Application {app_id} did not succeed")
[docs]defkill(self)->None:"""Delete the specified application."""ifself.app_idisnotNone:self.hook.kill_spark_app(self.app_id)
[docs]classAnalyticDBSparkSQLOperator(AnalyticDBSparkBaseOperator):""" Submits a Spark SQL application to the underlying cluster; wraps the AnalyticDB Spark REST API. :param sql: The SQL query to execute. :param conf: Spark configuration properties. :param driver_resource_spec: The resource specifications of the Spark driver. :param executor_resource_spec: The resource specifications of each Spark executor. :param num_executors: number of executors to launch for this application. :param name: name of this application. :param cluster_id: The cluster ID of AnalyticDB MySQL 3.0 Data Lakehouse. :param rg_name: The name of resource group in AnalyticDB MySQL 3.0 Data Lakehouse cluster. """
[docs]classAnalyticDBSparkBatchOperator(AnalyticDBSparkBaseOperator):""" Submits a Spark batch application to the underlying cluster; wraps the AnalyticDB Spark REST API. :param file: path of the file containing the application to execute. :param class_name: name of the application Java/Spark main class. :param args: application command line arguments. :param conf: Spark configuration properties. :param jars: jars to be used in this application. :param py_files: python files to be used in this application. :param files: files to be used in this application. :param driver_resource_spec: The resource specifications of the Spark driver. :param executor_resource_spec: The resource specifications of each Spark executor. :param num_executors: number of executors to launch for this application. :param archives: archives to be used in this application. :param name: name of this application. :param cluster_id: The cluster ID of AnalyticDB MySQL 3.0 Data Lakehouse. :param rg_name: The name of resource group in AnalyticDB MySQL 3.0 Data Lakehouse cluster. """