Source code for airflow.providers.ydb.hooks._vendor.dbapi
from .connection import AsyncConnection, Connection, IsolationLevel  # noqa: F401
from .cursor import AsyncCursor, Cursor, YdbQuery  # noqa: F401
from .errors import (
    DatabaseError,
    DataError,
    Error,
    IntegrityError,
    InterfaceError,
    InternalError,
    NotSupportedError,
    OperationalError,
    ProgrammingError,
    Warning,
)
[docs]class YdbDBApi:
    def __init__(self):
        self.paramstyle = "pyformat"
        self.threadsafety = 0
        self.apilevel = "1.0"
        self._init_dbapi_attributes()
    def _init_dbapi_attributes(self):
        for name, value in {
            "Warning": Warning,
            "Error": Error,
            "InterfaceError": InterfaceError,
            "DatabaseError": DatabaseError,
            "DataError": DataError,
            "OperationalError": OperationalError,
            "IntegrityError": IntegrityError,
            "InternalError": InternalError,
            "ProgrammingError": ProgrammingError,
            "NotSupportedError": NotSupportedError,
        }.items():
            setattr(self, name, value)
[docs]    def connect(self, *args, **kwargs) -> Connection:
        return Connection(*args, **kwargs) 
[docs]    def async_connect(self, *args, **kwargs) -> AsyncConnection:
        return AsyncConnection(*args, **kwargs)