Source code for airflow.providers.google.leveldb.hooks.leveldb
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Hook for Level DB"""fromtypingimportList,OptionalimportplyvelfromplyvelimportDBfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHook
[docs]defget_conn(self,name:str='/tmp/testdb/',create_if_missing:bool=False,**kwargs)->DB:""" Creates `Plyvel DB <https://plyvel.readthedocs.io/en/latest/api.html#DB>`__ :param name: path to create database e.g. `/tmp/testdb/`) :type name: str :param create_if_missing: whether a new database should be created if needed :type create_if_missing: bool :param kwargs: other options of creation plyvel.DB. See more in the link above. :type kwargs: Dict[str, Any] :returns: DB :rtype: plyvel.DB """ifself.dbisnotNone:returnself.dbself.db=plyvel.DB(name=name,create_if_missing=create_if_missing,**kwargs)returnself.db
[docs]defrun(self,command:str,key:bytes,value:bytes=None,keys:List[bytes]=None,values:List[bytes]=None,)->Optional[bytes]:""" Execute operation with leveldb :param command: command of plyvel(python wrap for leveldb) for DB object e.g. ``"put"``, ``"get"``, ``"delete"``, ``"write_batch"``. :type command: str :param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``) :type key: bytes :param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``) :type value: bytes :param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])`` :type keys: List[bytes] :param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']`` :type values: List[bytes] :returns: value from get or None :rtype: Optional[bytes] """ifcommand=='put':returnself.put(key,value)elifcommand=='get':returnself.get(key)elifcommand=='delete':returnself.delete(key)elifcommand=='write_batch':returnself.write_batch(keys,values)else:raiseLevelDBHookException("Unknown command for LevelDB hook")
[docs]defput(self,key:bytes,value:bytes):""" Put a single value into a leveldb db by key :param key: key for put execution, e.g. ``b'key'``, ``b'another-key'`` :type key: bytes :param value: value for put execution e.g. ``b'value'``, ``b'another-value'`` :type value: bytes """self.db.put(key,value)
[docs]defget(self,key:bytes)->bytes:""" Get a single value into a leveldb db by key :param key: key for get execution, e.g. ``b'key'``, ``b'another-key'`` :type key: bytes :returns: value of key from db.get :rtype: bytes """returnself.db.get(key)
[docs]defdelete(self,key:bytes):""" Delete a single value in a leveldb db by key. :param key: key for delete execution, e.g. ``b'key'``, ``b'another-key'`` :type key: bytes """self.db.delete(key)
[docs]defwrite_batch(self,keys:List[bytes],values:List[bytes]):""" Write batch of values in a leveldb db by keys :param keys: keys for write_batch execution e.g. ``[b'key', b'another-key']`` :type keys: List[bytes] :param values: values for write_batch execution e.g. ``[b'value', b'another-value']`` :type values: List[bytes] """withself.db.write_batch()asbatch:fori,keyinenumerate(keys):batch.put(key,values[i])