Source code for airflow.models.pool

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from sqlalchemy import Column, Integer, String, Text, func

from airflow.models.base import Base
from airflow.utils.state import State
from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING
from airflow.utils.db import provide_session


[docs]class Pool(Base):
[docs] __tablename__ = "slot_pool"
[docs] id = Column(Integer, primary_key=True)
[docs] pool = Column(String(50), unique=True)
# -1 for infinite
[docs] slots = Column(Integer, default=0)
[docs] description = Column(Text)
[docs] DEFAULT_POOL_NAME = 'default_pool'
[docs] def __repr__(self): return self.pool
@staticmethod @provide_session
[docs] def get_pool(pool_name, session=None): return session.query(Pool).filter(Pool.pool == pool_name).first()
@staticmethod @provide_session
[docs] def get_default_pool(session=None): return Pool.get_pool(Pool.DEFAULT_POOL_NAME, session=session)
[docs] def to_json(self): return { 'id': self.id, 'pool': self.pool, 'slots': self.slots, 'description': self.description,
} @provide_session
[docs] def occupied_slots(self, session): """ Returns the number of slots used by running/queued tasks at the moment. """ from airflow.models.taskinstance import TaskInstance # Avoid circular import return ( session .query(func.sum(TaskInstance.pool_slots)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state.in_(STATES_TO_COUNT_AS_RUNNING)) .scalar() ) or 0
@provide_session
[docs] def used_slots(self, session): """ Returns the number of slots used by running tasks at the moment. """ from airflow.models.taskinstance import TaskInstance # Avoid circular import running = ( session .query(func.sum(TaskInstance.pool_slots)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state == State.RUNNING) .scalar() ) or 0 return running
@provide_session
[docs] def queued_slots(self, session): """ Returns the number of slots used by queued tasks at the moment. """ from airflow.models.taskinstance import TaskInstance # Avoid circular import return ( session .query(func.sum(TaskInstance.pool_slots)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state == State.QUEUED) .scalar() ) or 0
@provide_session
[docs] def open_slots(self, session): """ Returns the number of slots open at the moment """ if self.slots == -1: return float('inf') else: return self.slots - self.occupied_slots(session)

Was this entry helpful?