Source code for airflow.models.pool
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, Integer, String, Text, func
from airflow.models.base import Base
from airflow.utils.state import State
from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING
from airflow.utils.db import provide_session
[docs]class Pool(Base):
[docs] __tablename__ = "slot_pool"
[docs] id = Column(Integer, primary_key=True)
[docs] pool = Column(String(50), unique=True)
# -1 for infinite
[docs] slots = Column(Integer, default=0)
[docs] description = Column(Text)
[docs] DEFAULT_POOL_NAME = 'default_pool'
[docs] def __repr__(self):
return self.pool
@staticmethod
@provide_session
[docs] def get_pool(pool_name, session=None):
return session.query(Pool).filter(Pool.pool == pool_name).first()
@staticmethod
@provide_session
[docs] def get_default_pool(session=None):
return Pool.get_pool(Pool.DEFAULT_POOL_NAME, session=session)
[docs] def to_json(self):
return {
'id': self.id,
'pool': self.pool,
'slots': self.slots,
'description': self.description,
}
@provide_session
[docs] def occupied_slots(self, session):
"""
Returns the number of slots used by running/queued tasks at the moment.
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import
return (
session
.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state.in_(STATES_TO_COUNT_AS_RUNNING))
.scalar()
) or 0
@provide_session
[docs] def used_slots(self, session):
"""
Returns the number of slots used by running tasks at the moment.
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import
running = (
session
.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.RUNNING)
.scalar()
) or 0
return running
@provide_session
[docs] def queued_slots(self, session):
"""
Returns the number of slots used by queued tasks at the moment.
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import
return (
session
.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.QUEUED)
.scalar()
) or 0
@provide_session
[docs] def open_slots(self, session):
"""
Returns the number of slots open at the moment
"""
if self.slots == -1:
return float('inf')
else:
return self.slots - self.occupied_slots(session)