Source code for

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from airflow.models import BaseOperator
from import LevelDBHook

    from airflow.utils.context import Context

[docs]class LevelDBOperator(BaseOperator): """ Execute command in LevelDB. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:LevelDBOperator` :param command: command of plyvel(python wrap for leveldb) for DB object e.g. ``"put"``, ``"get"``, ``"delete"``, ``"write_batch"``. :param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``) :param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``) :param keys: keys for command(write_batch) execution(list[bytes], e.g. ``[b'key', b'another-key'])`` :param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']`` :param leveldb_conn_id: :param create_if_missing: whether a new database should be created if needed :param create_db_extra_options: extra options of creation LevelDBOperator. See more in the link below `Plyvel DB <>`__ """ def __init__( self, *, command: str, key: bytes, value: bytes | None = None, keys: list[bytes] | None = None, values: list[bytes] | None = None, leveldb_conn_id: str = "leveldb_default", name: str = "/tmp/testdb/", create_if_missing: bool = True, create_db_extra_options: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) self.command = command self.key = key self.value = value self.keys = keys self.values = values self.leveldb_conn_id = leveldb_conn_id = name self.create_if_missing = create_if_missing self.create_db_extra_options = create_db_extra_options or {}
[docs] def execute(self, context: Context) -> str | None: """ Execute command in LevelDB. :returns: value from get(str, not bytes, to prevent error in json.dumps in serialize_value in or str | None """ leveldb_hook = LevelDBHook(leveldb_conn_id=self.leveldb_conn_id) leveldb_hook.get_conn(, create_if_missing=self.create_if_missing, **self.create_db_extra_options ) value = command=self.command, key=self.key, value=self.value, keys=self.keys, values=self.values, )"Done. Returned value was: %s", value) leveldb_hook.close_conn() str_value = value if value is None else value.decode() return str_value

Was this entry helpful?