## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""DaskExecutor.. seealso:: For more information on how the DaskExecutor works, take a look at the guide: :ref:`executor:DaskExecutor`"""importsubprocessfromtypingimportAny,Dict,OptionalfromdistributedimportClient,Future,as_completedfromdistributed.securityimportSecurityfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.executors.base_executorimportNOT_STARTED_MESSAGE,BaseExecutor,CommandTypefromairflow.models.taskinstanceimportTaskInstanceKey# queue="default" is a special case since this is the base config default queue name,# with respect to DaskExecutor, treat it as if no queue is provided_UNDEFINED_QUEUES={None,'default'}
[docs]classDaskExecutor(BaseExecutor):"""DaskExecutor submits tasks to a Dask Distributed cluster."""def__init__(self,cluster_address=None):super().__init__(parallelism=0)ifcluster_addressisNone:cluster_address=conf.get('dask','cluster_address')ifnotcluster_address:raiseValueError('Please provide a Dask cluster address in airflow.cfg')self.cluster_address=cluster_address# ssl / tls parametersself.tls_ca=conf.get('dask','tls_ca')self.tls_key=conf.get('dask','tls_key')self.tls_cert=conf.get('dask','tls_cert')self.client:Optional[Client]=Noneself.futures:Optional[Dict[Future,TaskInstanceKey]]=None
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:Optional[str]=None,executor_config:Optional[Any]=None,)->None:self.validate_command(command)defairflow_run():returnsubprocess.check_call(command,close_fds=True)ifnotself.client:raiseAirflowException(NOT_STARTED_MESSAGE)resources=Noneifqueuenotin_UNDEFINED_QUEUES:scheduler_info=self.client.scheduler_info()avail_queues={resourcefordinscheduler_info['workers'].values()forresourceind['resources']}ifqueuenotinavail_queues:raiseAirflowException(f"Attempted to submit task to an unavailable queue: '{queue}'")resources={queue:1}future=self.client.submit(subprocess.check_call,command,pure=False,resources=resources)self.futures[future]=key# type: ignore
def_process_future(self,future:Future)->None:ifnotself.futures:raiseAirflowException(NOT_STARTED_MESSAGE)iffuture.done():key=self.futures[future]iffuture.exception():self.log.error("Failed to execute task: %s",repr(future.exception()))self.fail(key)eliffuture.cancelled():self.log.error("Failed to execute task")self.fail(key)else:self.success(key)self.futures.pop(future)
[docs]defsync(self)->None:ifself.futuresisNone:raiseAirflowException(NOT_STARTED_MESSAGE)# make a copy so futures can be popped during iterationforfutureinself.futures.copy():self._process_future(future)