## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""DaskExecutor.. seealso:: For more information on how the DaskExecutor works, take a look at the guide: :ref:`executor:DaskExecutor`"""from__future__importannotationsimportsubprocessfromtypingimportTYPE_CHECKING,AnyfromdistributedimportClient,Future,as_completedfromdistributed.securityimportSecurityfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.executors.base_executorimportBaseExecutor,CommandTypefromairflow.models.taskinstanceimportTaskInstanceKey# queue="default" is a special case since this is the base config default queue name,# with respect to DaskExecutor, treat it as if no queue is provided_UNDEFINED_QUEUES={None,"default"}
[docs]classDaskExecutor(BaseExecutor):"""DaskExecutor submits tasks to a Dask Distributed cluster."""def__init__(self,cluster_address=None):super().__init__(parallelism=0)ifcluster_addressisNone:cluster_address=conf.get("dask","cluster_address")ifnotcluster_address:raiseValueError("Please provide a Dask cluster address in airflow.cfg")self.cluster_address=cluster_address# ssl / tls parametersself.tls_ca=conf.get("dask","tls_ca")self.tls_key=conf.get("dask","tls_key")self.tls_cert=conf.get("dask","tls_cert")self.client:Client|None=Noneself.futures:dict[Future,TaskInstanceKey]|None=None
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:str|None=None,executor_config:Any|None=None,)->None:ifTYPE_CHECKING:assertself.clientself.validate_airflow_tasks_run_command(command)defairflow_run():returnsubprocess.check_call(command,close_fds=True)resources=Noneifqueuenotin_UNDEFINED_QUEUES:scheduler_info=self.client.scheduler_info()avail_queues={resourcefordinscheduler_info["workers"].values()forresourceind["resources"]}ifqueuenotinavail_queues:raiseAirflowException(f"Attempted to submit task to an unavailable queue: '{queue}'")resources={queue:1}future=self.client.submit(subprocess.check_call,command,pure=False,resources=resources)self.futures[future]=key# type: ignore
def_process_future(self,future:Future)->None:ifTYPE_CHECKING:assertself.futuresiffuture.done():key=self.futures[future]iffuture.exception():self.log.error("Failed to execute task: %s",repr(future.exception()))self.fail(key)eliffuture.cancelled():self.log.error("Failed to execute task")self.fail(key)else:self.success(key)self.futures.pop(future)
[docs]defsync(self)->None:ifTYPE_CHECKING:assertself.futures# make a copy so futures can be popped during iterationforfutureinself.futures.copy():self._process_future(future)