Source code for airflow.contrib.executors.mesos_executor

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from future import standard_library

from builtins import str
from queue import Queue

import mesos.interface
from mesos.interface import mesos_pb2
import mesos.native

from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.settings import Session
from airflow.utils.state import State
from airflow.exceptions import AirflowException

standard_library.install_aliases()
[docs]DEFAULT_FRAMEWORK_NAME = 'Airflow'
[docs]FRAMEWORK_CONNID_PREFIX = 'mesos_framework_'
[docs]def get_framework_name(): if not configuration.conf.get('mesos', 'FRAMEWORK_NAME'): return DEFAULT_FRAMEWORK_NAME return configuration.conf.get('mesos', 'FRAMEWORK_NAME')
# AirflowMesosScheduler, implements Mesos Scheduler interface # To schedule airflow jobs on mesos
[docs]class AirflowMesosScheduler(mesos.interface.Scheduler): """ Airflow Mesos scheduler implements mesos scheduler interface to schedule airflow tasks on mesos. Basically, it schedules a command like 'airflow run <dag_id> <task_instance_id> <start_date> --local -p=<pickle>' to run on a mesos slave. """ def __init__(self, task_queue, result_queue, task_cpu=1, task_mem=256): self.task_queue = task_queue self.result_queue = result_queue self.task_cpu = task_cpu self.task_mem = task_mem self.task_counter = 0 self.task_key_map = {} if configuration.get('mesos', 'DOCKER_IMAGE_SLAVE'): self.mesos_slave_docker_image = configuration.get( 'mesos', 'DOCKER_IMAGE_SLAVE' )
[docs] def registered(self, driver, frameworkId, masterInfo): self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value) if configuration.conf.getboolean('mesos', 'CHECKPOINT') and \ configuration.conf.get('mesos', 'FAILOVER_TIMEOUT'): # Import here to work around a circular import error from airflow.models.connection import Connection # Update the Framework ID in the database. session = Session() conn_id = FRAMEWORK_CONNID_PREFIX + get_framework_name() connection = Session.query(Connection).filter_by(conn_id=conn_id).first() if connection is None: connection = Connection(conn_id=conn_id, conn_type='mesos_framework-id', extra=frameworkId.value) else: connection.extra = frameworkId.value session.add(connection) session.commit() Session.remove()
[docs] def reregistered(self, driver, masterInfo): self.log.info("AirflowScheduler re-registered to mesos")
[docs] def disconnected(self, driver): self.log.info("AirflowScheduler disconnected from mesos")
[docs] def offerRescinded(self, driver, offerId): self.log.info("AirflowScheduler offer %s rescinded", str(offerId))
[docs] def frameworkMessage(self, driver, executorId, slaveId, message): self.log.info("AirflowScheduler received framework message %s", message)
[docs] def executorLost(self, driver, executorId, slaveId, status): self.log.warning("AirflowScheduler executor %s lost", str(executorId))
[docs] def slaveLost(self, driver, slaveId): self.log.warning("AirflowScheduler slave %s lost", str(slaveId))
[docs] def error(self, driver, message): self.log.error("AirflowScheduler driver aborted %s", message) raise AirflowException("AirflowScheduler driver aborted %s" % message)
[docs] def resourceOffers(self, driver, offers): for offer in offers: tasks = [] offerCpus = 0 offerMem = 0 for resource in offer.resources: if resource.name == "cpus": offerCpus += resource.scalar.value elif resource.name == "mem": offerMem += resource.scalar.value self.log.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem) remainingCpus = offerCpus remainingMem = offerMem while (not self.task_queue.empty()) and \ remainingCpus >= self.task_cpu and \ remainingMem >= self.task_mem: key, cmd = self.task_queue.get() tid = self.task_counter self.task_counter += 1 self.task_key_map[str(tid)] = key self.log.info("Launching task %d using offer %s", tid, offer.id.value) task = mesos_pb2.TaskInfo() task.task_id.value = str(tid) task.slave_id.value = offer.slave_id.value task.name = "AirflowTask %d" % tid cpus = task.resources.add() cpus.name = "cpus" cpus.type = mesos_pb2.Value.SCALAR cpus.scalar.value = self.task_cpu mem = task.resources.add() mem.name = "mem" mem.type = mesos_pb2.Value.SCALAR mem.scalar.value = self.task_mem command = mesos_pb2.CommandInfo() command.shell = True command.value = " ".join(cmd) task.command.MergeFrom(command) # If docker image for airflow is specified in config then pull that # image before running the above airflow command if self.mesos_slave_docker_image: network = mesos_pb2.ContainerInfo.DockerInfo.Network.Value('BRIDGE') docker = mesos_pb2.ContainerInfo.DockerInfo( image=self.mesos_slave_docker_image, force_pull_image=False, network=network ) container = mesos_pb2.ContainerInfo( type=mesos_pb2.ContainerInfo.DOCKER, docker=docker ) task.container.MergeFrom(container) tasks.append(task) remainingCpus -= self.task_cpu remainingMem -= self.task_mem driver.launchTasks(offer.id, tasks)
[docs] def statusUpdate(self, driver, update): self.log.info( "Task %s is in state %s, data %s", update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data) ) try: key = self.task_key_map[update.task_id.value] except KeyError: # The map may not contain an item if the framework re-registered # after a failover. # Discard these tasks. self.log.warning("Unrecognised task key %s", update.task_id.value) return if update.state == mesos_pb2.TASK_FINISHED: self.result_queue.put((key, State.SUCCESS)) self.task_queue.task_done() if update.state == mesos_pb2.TASK_LOST or \ update.state == mesos_pb2.TASK_KILLED or \ update.state == mesos_pb2.TASK_FAILED: self.result_queue.put((key, State.FAILED)) self.task_queue.task_done()
[docs]class MesosExecutor(BaseExecutor): """ MesosExecutor allows distributing the execution of task instances to multiple mesos workers. Apache Mesos is a distributed systems kernel which abstracts CPU, memory, storage, and other compute resources away from machines (physical or virtual), enabling fault-tolerant and elastic distributed systems to easily be built and run effectively. See http://mesos.apache.org/ """
[docs] def start(self): self.task_queue = Queue() self.result_queue = Queue() framework = mesos_pb2.FrameworkInfo() framework.user = '' if not configuration.conf.get('mesos', 'MASTER'): self.log.error("Expecting mesos master URL for mesos executor") raise AirflowException("mesos.master not provided for mesos executor") master = configuration.conf.get('mesos', 'MASTER') framework.name = get_framework_name() if not configuration.conf.get('mesos', 'TASK_CPU'): task_cpu = 1 else: task_cpu = configuration.conf.getint('mesos', 'TASK_CPU') if not configuration.conf.get('mesos', 'TASK_MEMORY'): task_memory = 256 else: task_memory = configuration.conf.getint('mesos', 'TASK_MEMORY') if configuration.conf.getboolean('mesos', 'CHECKPOINT'): framework.checkpoint = True if configuration.conf.get('mesos', 'FAILOVER_TIMEOUT'): # Import here to work around a circular import error from airflow.models.connection import Connection # Query the database to get the ID of the Mesos Framework, if available. conn_id = FRAMEWORK_CONNID_PREFIX + framework.name session = Session() connection = session.query(Connection).filter_by(conn_id=conn_id).first() if connection is not None: # Set the Framework ID to let the scheduler reconnect # with running tasks. framework.id.value = connection.extra framework.failover_timeout = configuration.conf.getint( 'mesos', 'FAILOVER_TIMEOUT' ) else: framework.checkpoint = False self.log.info( 'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s', master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint) ) implicit_acknowledgements = 1 if configuration.conf.getboolean('mesos', 'AUTHENTICATE'): if not configuration.conf.get('mesos', 'DEFAULT_PRINCIPAL'): self.log.error("Expecting authentication principal in the environment") raise AirflowException( "mesos.default_principal not provided in authenticated mode") if not configuration.conf.get('mesos', 'DEFAULT_SECRET'): self.log.error("Expecting authentication secret in the environment") raise AirflowException( "mesos.default_secret not provided in authenticated mode") credential = mesos_pb2.Credential() credential.principal = configuration.conf.get('mesos', 'DEFAULT_PRINCIPAL') credential.secret = configuration.conf.get('mesos', 'DEFAULT_SECRET') framework.principal = credential.principal driver = mesos.native.MesosSchedulerDriver( AirflowMesosScheduler(self.task_queue, self.result_queue, task_cpu, task_memory), framework, master, implicit_acknowledgements, credential) else: framework.principal = 'Airflow' driver = mesos.native.MesosSchedulerDriver( AirflowMesosScheduler(self.task_queue, self.result_queue, task_cpu, task_memory), framework, master, implicit_acknowledgements) self.mesos_driver = driver self.mesos_driver.start()
[docs] def execute_async(self, key, command, queue=None, executor_config=None): self.task_queue.put((key, command))
[docs] def sync(self): while not self.result_queue.empty(): results = self.result_queue.get() self.change_state(*results)
[docs] def end(self): self.task_queue.join() self.mesos_driver.stop()