diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index a3fb912d827..1e7a8c0047e 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -526,6 +526,14 @@ impl Commitlog { { self.inner.read().unwrap().fold_transactions_from(offset, de) } + + pub fn fold_transactions_range(&self, range: impl RangeBounds, de: D) -> Result<(), D::Error> + where + D: Decoder, + D::Error: From, + { + self.inner.read().unwrap().fold_transaction_range(range, de) + } } /// Extract the most recently written [`segment::Metadata`] from the commitlog diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 8730e319c2d..4326e2c6249 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -27,8 +27,8 @@ use crate::{ }; use anyhow::{anyhow, Context}; use core::{cell::RefCell, ops::RangeBounds}; -use parking_lot::{Mutex, RwLock}; -use spacetimedb_commitlog::payload::{txdata, Txdata}; +use parking_lot::{Mutex, RwLock, RwLockReadGuard}; +use spacetimedb_commitlog::payload::txdata; use spacetimedb_data_structures::map::{HashCollectionExt, HashMap, IntMap}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics}; @@ -992,7 +992,7 @@ pub struct Replay { impl Replay { fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T { - let mut committed_state = self.committed_state.write_arc(); + let mut committed_state = self.committed_state.write(); let mut visitor = ReplayVisitor { database_identity: &self.database_identity, committed_state: &mut committed_state, @@ -1006,10 +1006,14 @@ impl Replay { pub fn next_tx_offset(&self) -> u64 { self.committed_state.read_arc().next_tx_offset } + + pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> { + self.committed_state.read() + } } -impl spacetimedb_commitlog::Decoder for Replay { - type Record = Txdata; +impl spacetimedb_commitlog::Decoder for &mut Replay { + type Record = txdata::Txdata; type Error = txdata::DecoderError; fn decode_record<'a, R: BufReader<'a>>( @@ -1040,30 +1044,6 @@ impl spacetimedb_commitlog::Decoder for Replay { } } -impl spacetimedb_commitlog::Decoder for &mut Replay { - type Record = txdata::Txdata; - type Error = txdata::DecoderError; - - #[inline] - fn decode_record<'a, R: BufReader<'a>>( - &self, - version: u8, - tx_offset: u64, - reader: &mut R, - ) -> std::result::Result { - spacetimedb_commitlog::Decoder::decode_record(&**self, version, tx_offset, reader) - } - - fn skip_record<'a, R: BufReader<'a>>( - &self, - version: u8, - tx_offset: u64, - reader: &mut R, - ) -> std::result::Result<(), Self::Error> { - spacetimedb_commitlog::Decoder::skip_record(&**self, version, tx_offset, reader) - } -} - // n.b. (Tyler) We actually **do not** want to check constraints at replay // time because not only is it a pain, but actually **subtly wrong** the // way we have it implemented. It's wrong because the actual constraints of