# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
import pickle
from sqlalchemy import Column, Integer, String, Index, LargeBinary, and_
from sqlalchemy.orm import reconstructor
from airflow.configuration import conf
from airflow.models.base import Base, ID_LEN
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.helpers import as_tuple
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime
[docs]log = logging.getLogger(__name__)
# MAX XCOM Size is 48KB
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
[docs]XCOM_RETURN_KEY = 'return_value'
[docs]class BaseXCom(Base, LoggingMixin):
"""
Base class for XCom objects.
"""
[docs] id = Column(Integer, primary_key=True)
[docs] key = Column(String(512))
[docs] value = Column(LargeBinary)
[docs] timestamp = Column(
UtcDateTime, default=timezone.utcnow, nullable=False)
[docs] execution_date = Column(UtcDateTime, nullable=False)
# source information
[docs] task_id = Column(String(ID_LEN), nullable=False)
[docs] dag_id = Column(String(ID_LEN), nullable=False)
[docs] __table_args__ = (
Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False),
)
"""
TODO: "pickling" has been deprecated and JSON is preferred.
"pickling" will be removed in Airflow 2.0.
"""
@reconstructor
[docs] def init_on_load(self):
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
self.value = pickle.loads(self.value)
else:
try:
self.value = json.loads(self.value.decode('UTF-8'))
except (UnicodeEncodeError, ValueError):
# For backward-compatibility.
# Preventing errors in webserver
# due to XComs mixed with pickled and unpickled.
self.value = pickle.loads(self.value)
[docs] def __repr__(self):
return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
key=self.key,
task_id=self.task_id,
execution_date=self.execution_date)
@classmethod
@provide_session
[docs] def set(
cls,
key,
value,
execution_date,
task_id,
dag_id,
session=None):
"""
Store an XCom value.
:return: None
"""
session.expunge_all()
value = XCom.serialize_value(value)
# remove any duplicate XComs
session.query(cls).filter(
cls.key == key,
cls.execution_date == execution_date,
cls.task_id == task_id,
cls.dag_id == dag_id).delete()
session.commit()
# insert new XCom
session.add(XCom(
key=key,
value=value,
execution_date=execution_date,
task_id=task_id,
dag_id=dag_id))
session.commit()
@classmethod
@provide_session
[docs] def get_one(cls,
execution_date,
key=None,
task_id=None,
dag_id=None,
include_prior_dates=False,
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria.
TODO: "pickling" has been deprecated and JSON is preferred.
"pickling" will be removed in Airflow 2.0.
:return: XCom value
"""
filters = []
if key:
filters.append(cls.key == key)
if task_id:
filters.append(cls.task_id == task_id)
if dag_id:
filters.append(cls.dag_id == dag_id)
if include_prior_dates:
filters.append(cls.execution_date <= execution_date)
else:
filters.append(cls.execution_date == execution_date)
query = (
session.query(cls.value).filter(and_(*filters))
.order_by(cls.execution_date.desc(), cls.timestamp.desc()))
result = query.first()
if result:
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
return pickle.loads(result.value)
else:
try:
return json.loads(result.value.decode('UTF-8'))
except ValueError:
log.error("Could not deserialize the XCOM value from JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config.")
raise
@classmethod
@provide_session
[docs] def get_many(cls,
execution_date,
key=None,
task_ids=None,
dag_ids=None,
include_prior_dates=False,
limit=100,
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria
TODO: "pickling" has been deprecated and JSON is preferred.
"pickling" will be removed in Airflow 2.0.
"""
filters = []
if key:
filters.append(cls.key == key)
if task_ids:
filters.append(cls.task_id.in_(as_tuple(task_ids)))
if dag_ids:
filters.append(cls.dag_id.in_(as_tuple(dag_ids)))
if include_prior_dates:
filters.append(cls.execution_date <= execution_date)
else:
filters.append(cls.execution_date == execution_date)
query = (
session.query(cls).filter(and_(*filters))
.order_by(cls.execution_date.desc(), cls.timestamp.desc())
.limit(limit))
results = query.all()
return results
@classmethod
@provide_session
[docs] def delete(cls, xcoms, session=None):
if isinstance(xcoms, XCom):
xcoms = [xcoms]
for xcom in xcoms:
if not isinstance(xcom, XCom):
raise TypeError(
'Expected XCom; received {}'.format(xcom.__class__.__name__)
)
session.delete(xcom)
session.commit()
@staticmethod
[docs] def serialize_value(value):
# TODO: "pickling" has been deprecated and JSON is preferred.
# "pickling" will be removed in Airflow 2.0.
if conf.getboolean('core', 'enable_xcom_pickling'):
return pickle.dumps(value)
try:
return json.dumps(value).encode('UTF-8')
except ValueError:
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config.")
raise
[docs]def resolve_xcom_backend():
"""Resolves custom XCom class"""
clazz = conf.getimport("core", "xcom_backend", fallback="airflow.models.xcom.{}"
.format(BaseXCom.__name__))
if clazz:
if not issubclass(clazz, BaseXCom):
raise TypeError(
"Your custom XCom class `{class_name}` is not a subclass of `{base_name}`."
.format(class_name=clazz.__name__, base_name=BaseXCom.__name__)
)
return clazz
return BaseXCom
[docs]XCom = resolve_xcom_backend()