[docs]classConnection:_await=staticmethod(util.await_only)_is_async=False_ydb_driver_class=ydb.Driver_ydb_session_pool_class=ydb.SessionPool_ydb_table_client_class=ydb.TableClient_cursor_class=Cursordef__init__(self,host:str="",port:str="",database:str="",**conn_kwargs:Any,):self.endpoint=f"grpc://{host}:{port}"self.database=databaseself.conn_kwargs=conn_kwargsself.credentials=self.conn_kwargs.pop("credentials",None)self.table_path_prefix=self.conn_kwargs.pop("ydb_table_path_prefix","")if"ydb_session_pool"inself.conn_kwargs:# Use session pool managed manuallyself._shared_session_pool=Trueself.session_pool:ydb.SessionPool=self.conn_kwargs.pop("ydb_session_pool")self.driver=(self.session_pool._driverifhasattr(self.session_pool,"_driver")elseself.session_pool._pool_impl._driver)self.driver.table_client=self._ydb_table_client_class(self.driver,self._get_table_client_settings())else:self._shared_session_pool=Falseself.driver=self._create_driver()self.session_pool=self._ydb_session_pool_class(self.driver,size=5)self.interactive_transaction:bool=False# AUTOCOMMITself.tx_mode:ydb.AbstractTransactionModeBuilder=ydb.SerializableReadWrite()self.tx_context:Optional[ydb.TxContext]=None
[docs]defset_isolation_level(self,isolation_level:str):classIsolationSettings(NamedTuple):ydb_mode:ydb.AbstractTransactionModeBuilderinteractive:boolydb_isolation_settings_map={IsolationLevel.AUTOCOMMIT:IsolationSettings(ydb.SerializableReadWrite(),interactive=False),IsolationLevel.SERIALIZABLE:IsolationSettings(ydb.SerializableReadWrite(),interactive=True),IsolationLevel.ONLINE_READONLY:IsolationSettings(ydb.OnlineReadOnly(),interactive=False),IsolationLevel.ONLINE_READONLY_INCONSISTENT:IsolationSettings(ydb.OnlineReadOnly().with_allow_inconsistent_reads(),interactive=False),IsolationLevel.STALE_READONLY:IsolationSettings(ydb.StaleReadOnly(),interactive=False),IsolationLevel.SNAPSHOT_READONLY:IsolationSettings(ydb.SnapshotReadOnly(),interactive=True),}ydb_isolation_settings=ydb_isolation_settings_map[isolation_level]ifself.tx_contextandself.tx_context.tx_id:raiseInternalError("Failed to set transaction mode: transaction is already began")self.tx_mode=ydb_isolation_settings.ydb_modeself.interactive_transaction=ydb_isolation_settings.interactive
[docs]defget_isolation_level(self)->str:ifself.tx_mode.name==ydb.SerializableReadWrite().name:ifself.interactive_transaction:returnIsolationLevel.SERIALIZABLEelse:returnIsolationLevel.AUTOCOMMITelifself.tx_mode.name==ydb.OnlineReadOnly().name:ifself.tx_mode.settings.allow_inconsistent_reads:returnIsolationLevel.ONLINE_READONLY_INCONSISTENTelse:returnIsolationLevel.ONLINE_READONLYelifself.tx_mode.name==ydb.StaleReadOnly().name:returnIsolationLevel.STALE_READONLYelifself.tx_mode.name==ydb.SnapshotReadOnly().name:returnIsolationLevel.SNAPSHOT_READONLYelse:raiseNotSupportedError(f"{self.tx_mode.name} is not supported")
@classmethoddef_maybe_await(cls,callee:collections.abc.Callable,*args,**kwargs)->Any:ifcls._is_async:returncls._await(callee(*args,**kwargs))returncallee(*args,**kwargs)def_get_table_client_settings(self)->ydb.TableClientSettings:return(ydb.TableClientSettings().with_native_date_in_result_sets(True).with_native_datetime_in_result_sets(True).with_native_timestamp_in_result_sets(True).with_native_interval_in_result_sets(True).with_native_json_in_result_sets(False))def_create_driver(self):driver_config=ydb.DriverConfig(endpoint=self.endpoint,database=self.database,table_client_settings=self._get_table_client_settings(),credentials=self.credentials,)driver=self._ydb_driver_class(driver_config)try:self._maybe_await(driver.wait,timeout=5,fail_fast=True)exceptydb.Errorase:raiseInterfaceError(e.message,original_error=e)fromeexceptExceptionase:self._maybe_await(driver.stop)raiseInterfaceError(f"Failed to connect to YDB, details {driver.discovery_debug_details()}")fromereturndriverdef_stop_driver(self):self._maybe_await(self.driver.stop)