Source code for airflow.models.pool
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, Integer, String, Text, func
from airflow.models.base import Base
from airflow.utils.state import State
from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING
from airflow.utils.db import provide_session
[docs]class Pool(Base):
[docs]    __tablename__ = "slot_pool" 
[docs]    id = Column(Integer, primary_key=True) 
[docs]    pool = Column(String(50), unique=True) 
    # -1 for infinite
[docs]    slots = Column(Integer, default=0) 
[docs]    description = Column(Text) 
[docs]    DEFAULT_POOL_NAME = 'default_pool' 
[docs]    def __repr__(self):
        return self.pool 
    @staticmethod
    @provide_session
[docs]    def get_pool(pool_name, session=None):
        return session.query(Pool).filter(Pool.pool == pool_name).first() 
    @staticmethod
    @provide_session
[docs]    def get_default_pool(session=None):
        return Pool.get_pool(Pool.DEFAULT_POOL_NAME, session=session) 
[docs]    def to_json(self):
        return {
            'id': self.id,
            'pool': self.pool,
            'slots': self.slots,
            'description': self.description, 
        }
    @provide_session
[docs]    def occupied_slots(self, session):
        """
        Returns the number of slots used by running/queued tasks at the moment.
        """
        from airflow.models.taskinstance import TaskInstance  # Avoid circular import
        return (
            session
            .query(func.sum(TaskInstance.pool_slots))
            .filter(TaskInstance.pool == self.pool)
            .filter(TaskInstance.state.in_(STATES_TO_COUNT_AS_RUNNING))
            .scalar()
        ) or 0 
    @provide_session
[docs]    def used_slots(self, session):
        """
        Returns the number of slots used by running tasks at the moment.
        """
        from airflow.models.taskinstance import TaskInstance  # Avoid circular import
        running = (
            session
            .query(func.sum(TaskInstance.pool_slots))
            .filter(TaskInstance.pool == self.pool)
            .filter(TaskInstance.state == State.RUNNING)
            .scalar()
        ) or 0
        return running 
    @provide_session
[docs]    def queued_slots(self, session):
        """
        Returns the number of slots used by queued tasks at the moment.
        """
        from airflow.models.taskinstance import TaskInstance  # Avoid circular import
        return (
            session
            .query(func.sum(TaskInstance.pool_slots))
            .filter(TaskInstance.pool == self.pool)
            .filter(TaskInstance.state == State.QUEUED)
            .scalar()
        ) or 0 
    @provide_session
[docs]    def open_slots(self, session):
        """
        Returns the number of slots open at the moment
        """
        if self.slots == -1:
            return float('inf')
        else:
            return self.slots - self.occupied_slots(session)