From 3453832ec9ca756aa3184551dc02a372b6ce2d20 Mon Sep 17 00:00:00 2001 From: kould Date: Fri, 17 Apr 2026 02:49:49 +0800 Subject: [PATCH 1/3] feat: add rc and rr transaction isolation --- README.md | 1 + docs/features.md | 1 + docs/transaction-isolation.md | 173 +++++++++++++++++++++ src/db.rs | 285 ++++++++++++++++++++++++++++------ src/docs.rs | 4 + src/lib.rs | 1 + src/storage/lmdb.rs | 15 +- src/storage/memory.rs | 14 +- src/storage/mod.rs | 170 +++++++++++++++++++- src/storage/rocksdb.rs | 91 ++++++++++- 10 files changed, 695 insertions(+), 60 deletions(-) create mode 100644 docs/transaction-isolation.md create mode 100644 src/docs.rs diff --git a/README.md b/README.md index 36932aaf..a8bf0644 100755 --- a/README.md +++ b/README.md @@ -135,6 +135,7 @@ fn main() -> Result<(), DatabaseError> { - `build_in_memory()` opens an in-memory database for tests, examples, and temporary workloads. - `build_optimistic()` is available on native targets when you specifically want optimistic transactions on top of RocksDB. - `Database::checkpoint(path)` creates a local consistent snapshot when the selected storage backend supports it. +- Transaction isolation is documented in [`docs/transaction-isolation.md`](docs/transaction-isolation.md). - Cargo features: - `rocksdb` is enabled by default - `lmdb` is optional diff --git a/docs/features.md b/docs/features.md index 2317733a..b832269f 100644 --- a/docs/features.md +++ b/docs/features.md @@ -57,6 +57,7 @@ let kite_sql = DataBaseBuilder::path("./data") ### MVCC Transaction - Pessimistic (Default) - Optimistic +- Isolation levels: see [Transaction Isolation](transaction-isolation.md) ### Checkpoint KiteSQL exposes checkpoint as a storage capability rather than a full backup workflow. A checkpoint only creates a consistent local snapshot directory; compressing, uploading, retaining, and pruning backups should stay in application code. diff --git a/docs/transaction-isolation.md b/docs/transaction-isolation.md new file mode 100644 index 00000000..dbc231e0 --- /dev/null +++ b/docs/transaction-isolation.md @@ -0,0 +1,173 @@ +# Transaction Isolation + +KiteSQL currently exposes two transaction isolation levels: + +- `ReadCommitted` +- `RepeatableRead` + +The isolation level is selected through `DataBaseBuilder` and is validated by +the chosen storage backend. + +```rust +use kite_sql::db::DataBaseBuilder; +use kite_sql::errors::DatabaseError; +use kite_sql::storage::TransactionIsolationLevel; + +fn main() -> Result<(), DatabaseError> { + let db = DataBaseBuilder::path("./data") + .transaction_isolation(TransactionIsolationLevel::RepeatableRead) + .build_rocksdb()?; + + assert_eq!( + db.transaction_isolation(), + TransactionIsolationLevel::RepeatableRead + ); + Ok(()) +} +``` + +## Support Matrix + +Current storage support is: + +| Storage | Default | Supported Levels | +| --- | --- | --- | +| RocksDB `build_rocksdb()` | `ReadCommitted` | `ReadCommitted`, `RepeatableRead` | +| Optimistic RocksDB `build_optimistic()` | `ReadCommitted` | `ReadCommitted`, `RepeatableRead` | +| LMDB `build_lmdb()` | `RepeatableRead` | `RepeatableRead` only | +| Memory `build_in_memory()` | `ReadCommitted` | `ReadCommitted` only | + +If a storage backend does not support the requested level, builder creation +fails with an explicit error. + +## Semantics + +KiteSQL defines isolation in terms of the read snapshot used by ordinary SQL +statements. + +### Read Committed + +`ReadCommitted` uses one snapshot per statement. + +That means: + +- every statement sees only data committed before that statement starts +- a later statement in the same transaction may see changes committed by other + transactions after the earlier statement completed +- the transaction still sees its own writes because reads go through the + storage transaction object rather than bypassing it + +Example: + +1. Transaction `T1` starts. +2. `T1` runs `SELECT ...` and reads snapshot `S1`. +3. Transaction `T2` commits an update. +4. `T1` runs another `SELECT ...` and reads a new snapshot `S2`. +5. The second statement may see `T2`'s committed update. + +### Repeatable Read + +`RepeatableRead` uses one fixed snapshot per transaction. + +That means: + +- the first read view chosen for the transaction is reused by all statements +- re-running the same query inside the same transaction returns the same + committed view unless the transaction itself modified the rows +- range reads also stay stable because they are evaluated against the same + snapshot + +Example: + +1. Transaction `T1` starts and receives snapshot `S`. +2. `T1` runs `SELECT ...`. +3. Transaction `T2` commits an update. +4. `T1` runs the same `SELECT ...` again. +5. `T1` still reads snapshot `S`, so it does not see `T2`'s newly committed + row versions. + +## How KiteSQL Implements RC and RR + +The implementation is storage-driven, but the public API is storage-agnostic. + +The common abstraction lives in `Storage::transaction_with_isolation(...)` and +the statement hooks `Transaction::begin_statement_scope()` and +`Transaction::end_statement_scope()`. + +### RocksDB + +RocksDB is where both levels are currently implemented. + +KiteSQL does not rely on RocksDB transaction options to define read visibility. +Instead, it explicitly attaches a database snapshot to `ReadOptions` for each +read operation. + +The key idea is: + +- `ReadCommitted`: create a database snapshot at statement start, attach it to + all reads in that statement, and drop it when the statement finishes +- `RepeatableRead`: create a database snapshot when the transaction starts and + attach it to all reads in all statements of that transaction + +This is why the isolation difference is concentrated in the statement-scope +hooks and the current snapshot field inside the RocksDB transaction wrapper. + +Reads still execute through `rocksdb::Transaction`, not through the raw +database handle. That keeps "read your own writes" behavior intact while also +ensuring a statement or transaction uses one consistent committed view for +index scans and table lookups. + +### LMDB + +LMDB already provides a natural fixed snapshot view for a transaction, so +KiteSQL currently exposes only `RepeatableRead` there. + +KiteSQL intentionally does not emulate `ReadCommitted` on LMDB with extra +plumbing because that would complicate the storage contract and diverge from the +minimal implementation model used today. + +### Memory + +The in-memory storage currently exposes only `ReadCommitted`. + +This backend mainly exists for tests, examples, and temporary workloads, so the +implementation stays intentionally simple. + +## Why This Matches RC and RR + +For ordinary SQL reads, the difference between `ReadCommitted` and +`RepeatableRead` is exactly the lifetime of the read snapshot: + +- `ReadCommitted`: snapshot lifetime is one statement +- `RepeatableRead`: snapshot lifetime is one transaction + +That is the rule KiteSQL enforces. + +As a result: + +- `ReadCommitted` prevents dirty reads because every statement reads from a + committed snapshot +- `ReadCommitted` allows non-repeatable reads across statements because a later + statement may use a newer snapshot +- `RepeatableRead` prevents non-repeatable reads because every statement uses + the same snapshot +- `RepeatableRead` also keeps repeated range reads stable because the visible + key space is evaluated against the same snapshot + +In other words, KiteSQL's current implementation satisfies the normal MVCC +definition of RC and RR for plain reads. + +## Scope and Non-Goals + +This document describes the guarantees for ordinary statement reads. + +It does not claim support for: + +- `Serializable` +- `SELECT ... FOR UPDATE` +- explicit row-lock or range-lock APIs +- lock-based writer scheduling semantics beyond what the underlying storage + already provides + +Those features can be added later without changing the core visibility model +used here for `ReadCommitted` and `RepeatableRead`. diff --git a/src/db.rs b/src/db.rs index 66b02649..3bafefb0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -38,7 +38,8 @@ use crate::storage::memory::MemoryStorage; #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] use crate::storage::rocksdb::{OptimisticRocksStorage, RocksStorage, StorageConfig}; use crate::storage::{ - CheckpointableStorage, StatisticsMetaCache, Storage, TableCache, Transaction, ViewCache, + CheckpointableStorage, StatisticsMetaCache, Storage, TableCache, Transaction, + TransactionIsolationLevel, ViewCache, }; use crate::types::tuple::{SchemaRef, Tuple}; use crate::types::value::DataValue; @@ -120,6 +121,7 @@ pub struct DataBaseBuilder { scala_functions: ScalaFunctions, table_functions: TableFunctions, histogram_buckets: Option, + transaction_isolation: Option, #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] storage_config: StorageConfig, #[cfg(all(not(target_arch = "wasm32"), feature = "lmdb"))] @@ -137,6 +139,7 @@ impl DataBaseBuilder { scala_functions: Default::default(), table_functions: Default::default(), histogram_buckets: None, + transaction_isolation: None, #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] storage_config: Default::default(), #[cfg(all(not(target_arch = "wasm32"), feature = "lmdb"))] @@ -160,6 +163,12 @@ impl DataBaseBuilder { self } + /// Sets the transaction isolation level used by database-created transactions. + pub fn transaction_isolation(mut self, isolation: TransactionIsolationLevel) -> Self { + self.transaction_isolation = Some(isolation); + self + } + /// Registers a user-defined scalar function on the database builder. pub fn register_scala_function(mut self, function: Arc) -> Self { let summary = function.summary().clone(); @@ -237,6 +246,7 @@ impl DataBaseBuilder { self.scala_functions, self.table_functions, self.histogram_buckets, + self.transaction_isolation, ) } @@ -250,6 +260,7 @@ impl DataBaseBuilder { self.scala_functions, self.table_functions, self.histogram_buckets, + self.transaction_isolation, ) } @@ -263,6 +274,7 @@ impl DataBaseBuilder { self.scala_functions, self.table_functions, self.histogram_buckets, + self.transaction_isolation, ) } @@ -277,6 +289,7 @@ impl DataBaseBuilder { self.scala_functions, self.table_functions, self.histogram_buckets, + self.transaction_isolation, ) } @@ -290,6 +303,7 @@ impl DataBaseBuilder { self.scala_functions, self.table_functions, self.histogram_buckets, + self.transaction_isolation, ) } @@ -304,6 +318,7 @@ impl DataBaseBuilder { self.scala_functions, self.table_functions, self.histogram_buckets, + self.transaction_isolation, ) } @@ -312,18 +327,23 @@ impl DataBaseBuilder { scala_functions: ScalaFunctions, table_functions: TableFunctions, histogram_buckets: Option, + transaction_isolation: Option, ) -> Result, DatabaseError> { if matches!(histogram_buckets, Some(0)) { return Err(DatabaseError::InvalidValue( "histogram buckets must be >= 1".to_string(), )); } + let transaction_isolation = + transaction_isolation.unwrap_or_else(|| storage.default_transaction_isolation()); + storage.validate_transaction_isolation(transaction_isolation)?; let meta_cache = SharedLruCache::new(256, 8, RandomState::new())?; let table_cache = SharedLruCache::new(48, 4, RandomState::new())?; let view_cache = SharedLruCache::new(12, 4, RandomState::new())?; Ok(Database { storage, + transaction_isolation, mdl: Default::default(), state: Arc::new(State { scala_functions, @@ -512,24 +532,34 @@ impl State { where S: 'txn, { - let mut plan = self.build_plan(stmt, params, transaction)?; - let schema = plan.output_schema().clone(); - let mut arena = ExecArena::default(); - let root = build_write( - &mut arena, - plan, - (&self.table_cache, &self.view_cache, &self.meta_cache), - transaction, - ); - let executor = Executor::new(arena, root); - - Ok((schema, executor)) + transaction.begin_statement_scope()?; + match (|| { + let mut plan = self.build_plan(stmt, params, transaction)?; + let schema = plan.output_schema().clone(); + let mut arena = ExecArena::default(); + let root = build_write( + &mut arena, + plan, + (&self.table_cache, &self.view_cache, &self.meta_cache), + transaction, + ); + let executor = Executor::new(arena, root); + + Ok((schema, executor)) + })() { + Ok(result) => Ok(result), + Err(err) => { + transaction.end_statement_scope()?; + Err(err) + } + } } } /// Main database handle for executing SQL and creating transactions. pub struct Database { pub(crate) storage: S, + transaction_isolation: TransactionIsolationLevel, mdl: Arc>, pub(crate) state: Arc>, } @@ -574,29 +604,30 @@ impl Database { MetaDataLock::Read(self.mdl.read_arc()) }; - let transaction = Box::into_raw(Box::new(self.storage.transaction()?)); + let transaction = Box::into_raw(Box::new( + self.storage + .transaction_with_isolation(self.transaction_isolation)?, + )); let mut statements = statements.into_iter().peekable(); while let Some(statement) = statements.next() { let (schema, executor) = - match self - .state - .execute(unsafe { &mut (*transaction) }, &statement, &[]) - { - Ok(result) => result, - Err(err) => { - unsafe { drop(Box::from_raw(transaction)) }; - return Err(err.with_sql_context(sql)); - } + match self.state.execute(unsafe { &mut *transaction }, &statement, &[]) { + Ok(result) => result, + Err(err) => { + unsafe { drop(Box::from_raw(transaction)) }; + return Err(err.with_sql_context(sql)); + } }; if statements.peek().is_some() { - if let Err(err) = TransactionIter::new(schema, executor).done() { + if let Err(err) = TransactionIter::new(schema, executor, transaction).done() { unsafe { drop(Box::from_raw(transaction)) }; return Err(err.with_sql_context(sql)); } } else { - let inner = Box::into_raw(Box::new(TransactionIter::new(schema, executor))); + let inner = + Box::into_raw(Box::new(TransactionIter::new(schema, executor, transaction))); return Ok(DatabaseIter { transaction, inner, @@ -620,19 +651,23 @@ impl Database { } else { MetaDataLock::Read(self.mdl.read_arc()) }; - let transaction = Box::into_raw(Box::new(self.storage.transaction()?)); + let transaction = Box::into_raw(Box::new( + self.storage + .transaction_with_isolation(self.transaction_isolation)?, + )); let (schema, executor) = - match self - .state - .execute(unsafe { &mut (*transaction) }, statement, params) - { - Ok(result) => result, - Err(err) => { - unsafe { drop(Box::from_raw(transaction)) }; - return Err(err); - } + match self.state.execute(unsafe { &mut *transaction }, statement, params) { + Ok(result) => result, + Err(err) => { + unsafe { drop(Box::from_raw(transaction)) }; + return Err(err); + } }; - let inner = Box::into_raw(Box::new(TransactionIter::new(schema, executor))); + let inner = Box::into_raw(Box::new(TransactionIter::new( + schema, + executor, + transaction, + ))); Ok(DatabaseIter { transaction, inner, @@ -646,7 +681,9 @@ impl Database { /// transactional context until [`DBTransaction::commit`] is called. pub fn new_transaction(&self) -> Result, DatabaseError> { let guard = self.mdl.read_arc(); - let transaction = self.storage.transaction()?; + let transaction = self + .storage + .transaction_with_isolation(self.transaction_isolation)?; let state = self.state.clone(); Ok(DBTransaction { @@ -666,6 +703,11 @@ impl Database { pub fn storage_metrics(&self) -> Option { self.storage.metrics() } + + #[inline] + pub fn transaction_isolation(&self) -> TransactionIsolationLevel { + self.transaction_isolation + } } impl Database @@ -930,8 +972,10 @@ impl<'txn, S: Storage> DBTransaction<'txn, S> { "`DDL` is not allowed to execute within a transaction".to_string(), )); } - let (schema, executor) = self.state.execute(&mut self.inner, statement, params)?; - Ok(TransactionIter::new(schema, executor)) + let transaction = std::ptr::from_mut(&mut self.inner); + let (schema, executor) = + self.state.execute(unsafe { &mut *transaction }, statement, params)?; + Ok(TransactionIter::new(schema, executor, transaction)) } /// Commits the current transaction. @@ -944,13 +988,31 @@ impl<'txn, S: Storage> DBTransaction<'txn, S> { /// Raw result iterator returned by [`DBTransaction::run`] and [`DBTransaction::execute`]. pub struct TransactionIter<'a, T: Transaction + 'a> { - executor: Executor<'a, T>, + executor: Option>, schema: SchemaRef, + transaction: *mut T, + statement_scope_active: bool, } impl<'a, T: Transaction + 'a> TransactionIter<'a, T> { - fn new(schema: SchemaRef, executor: Executor<'a, T>) -> Self { - Self { executor, schema } + fn new(schema: SchemaRef, executor: Executor<'a, T>, transaction: *mut T) -> Self { + Self { + executor: Some(executor), + schema, + transaction, + statement_scope_active: true, + } + } + + #[inline] + fn finish_statement_scope(&mut self) -> Result<(), DatabaseError> { + if !self.statement_scope_active { + return Ok(()); + } + + self.executor.take(); + self.statement_scope_active = false; + unsafe { (*self.transaction).end_statement_scope() } } #[inline] @@ -960,7 +1022,21 @@ impl<'a, T: Transaction + 'a> TransactionIter<'a, T> { #[inline] pub fn next_borrowed_tuple(&mut self) -> Result, DatabaseError> { - self.executor.next_tuple() + let Some(executor) = self.executor.as_mut() else { + return Ok(None); + }; + let executor_ptr = std::ptr::from_mut(executor); + match unsafe { (*executor_ptr).next_tuple() } { + Ok(Some(tuple)) => Ok(Some(tuple)), + Ok(None) => { + self.finish_statement_scope()?; + Ok(None) + } + Err(err) => { + self.finish_statement_scope()?; + Err(err) + } + } } #[inline] @@ -970,14 +1046,30 @@ impl<'a, T: Transaction + 'a> TransactionIter<'a, T> { } } +impl Drop for TransactionIter<'_, T> { + fn drop(&mut self) { + let _ = self.finish_statement_scope(); + } +} + impl Iterator for TransactionIter<'_, T> { type Item = Result; fn next(&mut self) -> Option { - match self.executor.next_tuple() { + let result = { + let executor = self.executor.as_mut()?; + executor.next_tuple() + }; + match result { Ok(Some(tuple)) => Some(Ok(tuple.clone())), - Ok(None) => None, - Err(err) => Some(Err(err)), + Ok(None) => match self.finish_statement_scope() { + Ok(()) => None, + Err(err) => Some(Err(err)), + }, + Err(err) => match self.finish_statement_scope() { + Ok(()) => Some(Err(err)), + Err(scope_err) => Some(Err(scope_err)), + }, } } } @@ -1000,11 +1092,11 @@ impl BorrowResultIter for TransactionIter<'_, T> { pub(crate) mod test { use crate::binder::{Binder, BinderContext}; use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef}; - use crate::db::{DataBaseBuilder, DatabaseError}; + use crate::db::{BorrowResultIter, DataBaseBuilder, DatabaseError}; use crate::expression::ScalarExpression; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::Operator; - use crate::storage::{Storage, TableCache, Transaction}; + use crate::storage::{Storage, TableCache, Transaction, TransactionIsolationLevel}; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; @@ -1046,6 +1138,21 @@ pub(crate) mod test { Ok(()) } + fn read_single_i32(mut iter: I) -> Result + where + I: BorrowResultIter + Iterator>, + { + let value = match iter.next().transpose()?.map(|tuple| tuple.values) { + Some(values) => match values.as_slice() { + [DataValue::Int32(value)] => *value, + other => panic!("expected a single Int32 column, got {other:?}"), + }, + None => panic!("expected one result row"), + }; + iter.done()?; + Ok(value) + } + #[cfg(feature = "unsafe_txdb_checkpoint")] fn query_i32( database: &crate::db::Database, @@ -1985,6 +2092,70 @@ pub(crate) mod test { Ok(()) } + #[test] + fn test_read_committed_refreshes_snapshot_each_statement() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let kite_sql = DataBaseBuilder::path(temp_dir.path()) + .transaction_isolation(TransactionIsolationLevel::ReadCommitted) + .build_rocksdb()?; + + kite_sql + .run("create table t_rc (a int primary key, b int)")? + .done()?; + kite_sql.run("insert into t_rc values (1, 10)")?.done()?; + + let mut reader = kite_sql.new_transaction()?; + let mut writer = kite_sql.new_transaction()?; + + assert_eq!( + read_single_i32(reader.run("select b from t_rc where a = 1")?)?, + 10 + ); + + writer.run("update t_rc set b = 20 where a = 1")?.done()?; + writer.commit()?; + + assert_eq!( + read_single_i32(reader.run("select b from t_rc where a = 1")?)?, + 20 + ); + reader.commit()?; + + Ok(()) + } + + #[test] + fn test_repeatable_read_keeps_transaction_snapshot() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let kite_sql = DataBaseBuilder::path(temp_dir.path()) + .transaction_isolation(TransactionIsolationLevel::RepeatableRead) + .build_rocksdb()?; + + kite_sql + .run("create table t_rr (a int primary key, b int)")? + .done()?; + kite_sql.run("insert into t_rr values (1, 10)")?.done()?; + + let mut reader = kite_sql.new_transaction()?; + let mut writer = kite_sql.new_transaction()?; + + assert_eq!( + read_single_i32(reader.run("select b from t_rr where a = 1")?)?, + 10 + ); + + writer.run("update t_rr set b = 20 where a = 1")?.done()?; + writer.commit()?; + + assert_eq!( + read_single_i32(reader.run("select b from t_rr where a = 1")?)?, + 10 + ); + reader.commit()?; + + Ok(()) + } + #[test] fn test_optimistic_transaction_sql() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); @@ -2050,6 +2221,24 @@ pub(crate) mod test { )); } + #[cfg(feature = "lmdb")] + #[test] + fn test_lmdb_rejects_read_committed_isolation() { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let db_path = temp_dir.path().join("kite_sql.lmdb"); + let result = DataBaseBuilder::path(db_path) + .transaction_isolation(TransactionIsolationLevel::ReadCommitted) + .build_lmdb(); + + match result { + Err(DatabaseError::UnsupportedStmt(message)) => { + assert!(message.contains("read committed")); + } + Ok(_) => panic!("lmdb should reject read committed isolation"), + Err(err) => panic!("unexpected error: {err}"), + } + } + #[cfg(feature = "unsafe_txdb_checkpoint")] #[test] fn test_checkpoint_restores_snapshot() -> Result<(), DatabaseError> { diff --git a/src/docs.rs b/src/docs.rs new file mode 100644 index 00000000..f74b7b60 --- /dev/null +++ b/src/docs.rs @@ -0,0 +1,4 @@ +#![allow(missing_docs)] + +#[doc = include_str!("../docs/transaction-isolation.md")] +pub mod transaction_isolation {} diff --git a/src/lib.rs b/src/lib.rs index 04ff5651..40b161cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ extern crate core; pub mod binder; pub mod catalog; pub mod db; +pub mod docs; pub mod errors; pub mod execution; pub mod expression; diff --git a/src/storage/lmdb.rs b/src/storage/lmdb.rs index 5b916537..6c8685e6 100644 --- a/src/storage/lmdb.rs +++ b/src/storage/lmdb.rs @@ -14,7 +14,10 @@ use crate::errors::DatabaseError; use crate::storage::table_codec::{Bytes, TableCodec}; -use crate::storage::{reuse_bound_as_excluded, InnerIter, KeyValueRef, Storage, Transaction}; +use crate::storage::{ + reuse_bound_as_excluded, InnerIter, KeyValueRef, Storage, Transaction, + TransactionIsolationLevel, +}; use lmdb::{ Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, RoCursor, RwTransaction, Transaction as _, WriteFlags, @@ -125,7 +128,11 @@ impl Storage for LmdbStorage { where Self: 'a; - fn transaction(&self) -> Result, DatabaseError> { + fn transaction_with_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result, DatabaseError> { + self.validate_transaction_isolation(isolation)?; let tx = self.env.begin_rw_txn().map_err(map_lmdb_err)?; Ok(LmdbTransaction { @@ -135,6 +142,10 @@ impl Storage for LmdbStorage { }) } + fn default_transaction_isolation(&self) -> TransactionIsolationLevel { + TransactionIsolationLevel::RepeatableRead + } + fn metrics(&self) -> Option { if !self.config.enable_statistics { return None; diff --git a/src/storage/memory.rs b/src/storage/memory.rs index ba8f44b9..5fe9d583 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -14,7 +14,9 @@ use crate::errors::DatabaseError; use crate::storage::table_codec::{Bytes, TableCodec}; -use crate::storage::{EmptyStorageMetrics, InnerIter, Storage, Transaction}; +use crate::storage::{ + EmptyStorageMetrics, InnerIter, Storage, Transaction, TransactionIsolationLevel, +}; use std::cell::{Ref, RefCell}; use std::collections::{BTreeMap, Bound, VecDeque}; use std::rc::Rc; @@ -38,12 +40,20 @@ impl Storage for MemoryStorage { where Self: 'a; - fn transaction(&self) -> Result, DatabaseError> { + fn transaction_with_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result, DatabaseError> { + self.validate_transaction_isolation(isolation)?; Ok(MemoryTransaction { inner: self.inner.clone(), table_codec: Default::default(), }) } + + fn default_transaction_isolation(&self) -> TransactionIsolationLevel { + TransactionIsolationLevel::ReadCommitted + } } pub struct MemoryTransaction { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6a7c8b37..f6da370e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -56,6 +56,27 @@ pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), Optio pub(crate) type TableCache = SharedLruCache; pub(crate) type ViewCache = SharedLruCache; +/// Transaction isolation levels supported by KiteSQL. +/// +/// See [`crate::docs::transaction_isolation`] for the storage support matrix +/// and the detailed visibility rules used by KiteSQL. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TransactionIsolationLevel { + /// Statement-level snapshot isolation for ordinary reads. + ReadCommitted, + /// Transaction-level fixed snapshot for ordinary reads. + RepeatableRead, +} + +impl Display for TransactionIsolationLevel { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + TransactionIsolationLevel::ReadCommitted => f.write_str("read committed"), + TransactionIsolationLevel::RepeatableRead => f.write_str("repeatable read"), + } + } +} + pub(crate) fn index_value_type( table: &TableCatalog, column_ids: &[ColumnId], @@ -93,7 +114,31 @@ pub trait Storage: Clone { where Self: 'a; - fn transaction(&self) -> Result, DatabaseError>; + fn transaction(&self) -> Result, DatabaseError> { + self.transaction_with_isolation(self.default_transaction_isolation()) + } + + fn transaction_with_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result, DatabaseError>; + + fn default_transaction_isolation(&self) -> TransactionIsolationLevel { + TransactionIsolationLevel::ReadCommitted + } + + fn validate_transaction_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result<(), DatabaseError> { + if isolation == self.default_transaction_isolation() { + Ok(()) + } else { + Err(DatabaseError::UnsupportedStmt(format!( + "transaction isolation `{isolation}` is not supported by this storage" + ))) + } + } fn metrics(&self) -> Option { None @@ -120,6 +165,14 @@ pub trait Transaction: Sized { fn table_codec(&self) -> *const TableCodec; + fn begin_statement_scope(&mut self) -> Result<(), DatabaseError> { + Ok(()) + } + + fn end_statement_scope(&mut self) -> Result<(), DatabaseError> { + Ok(()) + } + /// The bounds is applied to the whole data batches, not per batch. /// /// The projections is column indices. @@ -2381,6 +2434,121 @@ mod test { Ok(()) } + #[test] + fn test_reader_transaction_can_mix_index_and_heap_views() -> Result<(), DatabaseError> { + let table_codec = TableCodec::default(); + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = RocksStorage::new(temp_dir.path())?; + let table_cache = Arc::new(SharedLruCache::new(4, 1, RandomState::new())?); + let serializers = [ + LogicalType::Integer.serializable(), + LogicalType::Boolean.serializable(), + LogicalType::Integer.serializable(), + ]; + + let initial_tuple = Tuple::new( + Some(DataValue::Int32(0)), + vec![ + DataValue::Int32(0), + DataValue::Boolean(true), + DataValue::Int32(0), + ], + ); + let updated_tuple = Tuple::new( + Some(DataValue::Int32(0)), + vec![ + DataValue::Int32(0), + DataValue::Boolean(true), + DataValue::Int32(1), + ], + ); + + let index_id = { + let mut setup_tx = storage.transaction()?; + build_table(&table_cache, &mut setup_tx)?; + let table = setup_tx + .table(&table_cache, "t1".to_string().into())? + .unwrap(); + let c3_column_id = *table.get_column_id_by_name("c3").unwrap(); + let index_id = setup_tx.add_index_meta( + &table_cache, + &"t1".to_string().into(), + "i1".to_string(), + vec![c3_column_id], + IndexType::Normal, + )?; + + setup_tx.add_index( + "t1", + Index::new(index_id, &initial_tuple.values[2], IndexType::Normal), + initial_tuple.pk.as_ref().unwrap(), + )?; + setup_tx.append_tuple("t1", initial_tuple.clone(), &serializers, false)?; + setup_tx.commit()?; + + index_id + }; + + let reader_tx = storage.transaction()?; + let tuple_id = { + let mut index_iter = table_codec.with_index_bound("t1", index_id, |min, max| { + reader_tx.range(Bound::Included(min), Bound::Included(max)) + })?; + let (_, value) = index_iter.try_next()?.unwrap(); + + TableCodec::decode_index(value)? + }; + + let before_update = table_codec.with_tuple_key("t1", &tuple_id, |key| { + let bytes = reader_tx.get_borrowed(key)?.expect("tuple should exist"); + let mut tuple = Tuple::default(); + + TableCodec::decode_tuple_into( + &mut tuple, + &serializers, + Some(tuple_id.clone()), + bytes.as_ref(), + 3, + )?; + Ok(tuple) + })?; + assert_eq!(before_update.values[2], DataValue::Int32(0)); + + let mut writer_tx = storage.transaction()?; + writer_tx.del_index( + "t1", + &Index::new(index_id, &initial_tuple.values[2], IndexType::Normal), + initial_tuple.pk.as_ref().unwrap(), + )?; + writer_tx.add_index( + "t1", + Index::new(index_id, &updated_tuple.values[2], IndexType::Normal), + updated_tuple.pk.as_ref().unwrap(), + )?; + writer_tx.append_tuple("t1", updated_tuple.clone(), &serializers, true)?; + writer_tx.commit()?; + + let after_update = table_codec.with_tuple_key("t1", &tuple_id, |key| { + let bytes = reader_tx.get_borrowed(key)?.expect("tuple should exist"); + let mut tuple = Tuple::default(); + + TableCodec::decode_tuple_into( + &mut tuple, + &serializers, + Some(tuple_id.clone()), + bytes.as_ref(), + 3, + )?; + Ok(tuple) + })?; + + // The same reader transaction first observed the old secondary-index entry, + // then observed the updated base-row contents after another transaction committed. + assert_eq!(after_update.values[2], DataValue::Int32(1)); + + Ok(()) + } + #[test] fn test_column_add_drop() -> Result<(), DatabaseError> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 429d05a9..a3e478f2 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -14,14 +14,16 @@ use crate::errors::DatabaseError; use crate::storage::table_codec::{Bytes, TableCodec}; -use crate::storage::{CheckpointableStorage, InnerIter, Storage, Transaction}; +use crate::storage::{ + CheckpointableStorage, InnerIter, Storage, Transaction, TransactionIsolationLevel, +}; #[cfg(feature = "unsafe_txdb_checkpoint")] use librocksdb_sys as ffi; use rocksdb::{ checkpoint::Checkpoint, statistics::{StatsLevel, Ticker}, - DBPinnableSlice, DBRawIteratorWithThreadMode, OptimisticTransactionDB, Options, ReadOptions, - SliceTransform, TransactionDB, + DBPinnableSlice, DBRawIteratorWithThreadMode, OptimisticTransactionDB, + Options, ReadOptions, SliceTransform, SnapshotWithThreadMode, TransactionDB, }; use std::collections::Bound; #[cfg(feature = "unsafe_txdb_checkpoint")] @@ -434,13 +436,35 @@ impl Storage for OptimisticRocksStorage { where Self: 'a; - fn transaction(&self) -> Result, DatabaseError> { + fn transaction_with_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result, DatabaseError> { + self.validate_transaction_isolation(isolation)?; Ok(OptimisticRocksTransaction { + db: self.inner.as_ref(), tx: self.inner.transaction(), + isolation, + current_snapshot: matches!(isolation, TransactionIsolationLevel::RepeatableRead) + .then(|| self.inner.snapshot()), table_codec: Default::default(), }) } + fn default_transaction_isolation(&self) -> TransactionIsolationLevel { + TransactionIsolationLevel::ReadCommitted + } + + fn validate_transaction_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result<(), DatabaseError> { + match isolation { + TransactionIsolationLevel::ReadCommitted + | TransactionIsolationLevel::RepeatableRead => Ok(()), + } + } + fn metrics(&self) -> Option { if !self.config.enable_statistics { return None; @@ -460,13 +484,35 @@ impl Storage for RocksStorage { where Self: 'a; - fn transaction(&self) -> Result, DatabaseError> { + fn transaction_with_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result, DatabaseError> { + self.validate_transaction_isolation(isolation)?; Ok(RocksTransaction { + db: self.inner.as_ref(), tx: self.inner.transaction(), + isolation, + current_snapshot: matches!(isolation, TransactionIsolationLevel::RepeatableRead) + .then(|| self.inner.snapshot()), table_codec: Default::default(), }) } + fn default_transaction_isolation(&self) -> TransactionIsolationLevel { + TransactionIsolationLevel::ReadCommitted + } + + fn validate_transaction_isolation( + &self, + isolation: TransactionIsolationLevel, + ) -> Result<(), DatabaseError> { + match isolation { + TransactionIsolationLevel::ReadCommitted + | TransactionIsolationLevel::RepeatableRead => Ok(()), + } + } + fn metrics(&self) -> Option { if !self.config.enable_statistics { return None; @@ -516,15 +562,31 @@ impl CheckpointableStorage for RocksStorage { } pub struct OptimisticRocksTransaction<'db> { + db: &'db OptimisticTransactionDB, tx: rocksdb::Transaction<'db, OptimisticTransactionDB>, + isolation: TransactionIsolationLevel, + current_snapshot: Option>, table_codec: TableCodec, } pub struct RocksTransaction<'db> { + db: &'db TransactionDB, tx: rocksdb::Transaction<'db, TransactionDB>, + isolation: TransactionIsolationLevel, + current_snapshot: Option>>, table_codec: TableCodec, } +fn build_read_options( + snapshot: Option<&SnapshotWithThreadMode<'_, D>>, +) -> ReadOptions { + let mut read_opts = ReadOptions::default(); + if let Some(snapshot) = snapshot { + read_opts.set_snapshot(snapshot); + } + read_opts +} + #[macro_export] macro_rules! impl_transaction { ($tx:ident, $iter:ident) => { @@ -544,12 +606,27 @@ macro_rules! impl_transaction { &self.table_codec } + fn begin_statement_scope(&mut self) -> Result<(), DatabaseError> { + if self.isolation == TransactionIsolationLevel::ReadCommitted { + self.current_snapshot = Some(self.db.snapshot()); + } + Ok(()) + } + + fn end_statement_scope(&mut self) -> Result<(), DatabaseError> { + if self.isolation == TransactionIsolationLevel::ReadCommitted { + self.current_snapshot = None; + } + Ok(()) + } + #[inline] fn get_borrowed<'a>( &'a self, key: &[u8], ) -> Result>, DatabaseError> { - Ok(self.tx.get_pinned(key)?) + let read_opts = build_read_options(self.current_snapshot.as_ref()); + Ok(self.tx.get_pinned_opt(key, &read_opts)?) } #[inline] @@ -573,7 +650,7 @@ macro_rules! impl_transaction { min: Bound<&'key [u8]>, max: Bound<&'key [u8]>, ) -> Result, DatabaseError> { - let mut read_opts = ReadOptions::default(); + let mut read_opts = build_read_options(self.current_snapshot.as_ref()); if let ( Bound::Included(min_bytes) | Bound::Excluded(min_bytes), Bound::Included(max_bytes) | Bound::Excluded(max_bytes), From a4aadbf1a715e201e38878c5d200c5c356759275 Mon Sep 17 00:00:00 2001 From: kould Date: Fri, 17 Apr 2026 02:54:19 +0800 Subject: [PATCH 2/3] chore: codefmt --- src/db.rs | 40 +++++++++++++++++++++++++--------------- src/storage/rocksdb.rs | 4 ++-- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/db.rs b/src/db.rs index 3bafefb0..92be6d27 100644 --- a/src/db.rs +++ b/src/db.rs @@ -612,12 +612,15 @@ impl Database { while let Some(statement) = statements.next() { let (schema, executor) = - match self.state.execute(unsafe { &mut *transaction }, &statement, &[]) { - Ok(result) => result, - Err(err) => { - unsafe { drop(Box::from_raw(transaction)) }; - return Err(err.with_sql_context(sql)); - } + match self + .state + .execute(unsafe { &mut *transaction }, &statement, &[]) + { + Ok(result) => result, + Err(err) => { + unsafe { drop(Box::from_raw(transaction)) }; + return Err(err.with_sql_context(sql)); + } }; if statements.peek().is_some() { @@ -626,8 +629,11 @@ impl Database { return Err(err.with_sql_context(sql)); } } else { - let inner = - Box::into_raw(Box::new(TransactionIter::new(schema, executor, transaction))); + let inner = Box::into_raw(Box::new(TransactionIter::new( + schema, + executor, + transaction, + ))); return Ok(DatabaseIter { transaction, inner, @@ -656,12 +662,15 @@ impl Database { .transaction_with_isolation(self.transaction_isolation)?, )); let (schema, executor) = - match self.state.execute(unsafe { &mut *transaction }, statement, params) { - Ok(result) => result, - Err(err) => { - unsafe { drop(Box::from_raw(transaction)) }; - return Err(err); - } + match self + .state + .execute(unsafe { &mut *transaction }, statement, params) + { + Ok(result) => result, + Err(err) => { + unsafe { drop(Box::from_raw(transaction)) }; + return Err(err); + } }; let inner = Box::into_raw(Box::new(TransactionIter::new( schema, @@ -974,7 +983,8 @@ impl<'txn, S: Storage> DBTransaction<'txn, S> { } let transaction = std::ptr::from_mut(&mut self.inner); let (schema, executor) = - self.state.execute(unsafe { &mut *transaction }, statement, params)?; + self.state + .execute(unsafe { &mut *transaction }, statement, params)?; Ok(TransactionIter::new(schema, executor, transaction)) } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index a3e478f2..9a8f5e82 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -22,8 +22,8 @@ use librocksdb_sys as ffi; use rocksdb::{ checkpoint::Checkpoint, statistics::{StatsLevel, Ticker}, - DBPinnableSlice, DBRawIteratorWithThreadMode, OptimisticTransactionDB, - Options, ReadOptions, SliceTransform, SnapshotWithThreadMode, TransactionDB, + DBPinnableSlice, DBRawIteratorWithThreadMode, OptimisticTransactionDB, Options, ReadOptions, + SliceTransform, SnapshotWithThreadMode, TransactionDB, }; use std::collections::Bound; #[cfg(feature = "unsafe_txdb_checkpoint")] From 6a4baeb0829ba6410973cd2561af8382f533b3bb Mon Sep 17 00:00:00 2001 From: kould Date: Fri, 17 Apr 2026 03:24:17 +0800 Subject: [PATCH 3/3] docs: clarify transaction isolation conflict detection --- docs/transaction-isolation.md | 39 +++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docs/transaction-isolation.md b/docs/transaction-isolation.md index dbc231e0..464d7b89 100644 --- a/docs/transaction-isolation.md +++ b/docs/transaction-isolation.md @@ -133,6 +133,35 @@ The in-memory storage currently exposes only `ReadCommitted`. This backend mainly exists for tests, examples, and temporary workloads, so the implementation stays intentionally simple. +## Conflict Detection in Current KiteSQL + +KiteSQL's current conflict detection is primarily key-based. + +In the RocksDB-backed implementation, table rows are stored under concrete KV +keys derived from the primary key, and write operations such as `INSERT`, +`UPDATE`, and `DELETE` ultimately modify those concrete keys through the storage +transaction. + +That means KiteSQL already has a solid baseline conflict detection capability +for cases like: + +- two transactions writing the same primary-key row +- two transactions rewriting the same concrete storage entry + +This is the most important transactional conflict detection foundation in the +current design: conflicts are naturally detected at the physical key level by +the underlying storage transaction mechanism. + +What KiteSQL does not currently provide is predicate-level or range-level +conflict detection such as: + +- "I read `a > 10`, so inserts into that range must now conflict" +- "I evaluated this SQL predicate, so future writes matching the predicate must + be blocked or rejected" + +Those stronger behaviors require explicit range locking, predicate locking, or +other higher-level concurrency control beyond today's key-based model. + ## Why This Matches RC and RR For ordinary SQL reads, the difference between `ReadCommitted` and @@ -154,6 +183,16 @@ As a result: - `RepeatableRead` also keeps repeated range reads stable because the visible key space is evaluated against the same snapshot +This is why KiteSQL's current key-level conflict detection is sufficient for +its present `ReadCommitted` and `RepeatableRead` support: + +- the basic write/write correctness still comes from the storage transaction's + concrete key conflict handling +- the `RC` / `RR` distinction itself comes from snapshot lifetime, not from + extra predicate conflict detection +- ordinary `RC` / `RR` reads do not require "I read this range, therefore other + transactions may not insert into it" semantics + In other words, KiteSQL's current implementation satisfies the normal MVCC definition of RC and RR for plain reads.