Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,14 @@ impl<T: Encode> Commitlog<T> {
{
self.inner.read().unwrap().fold_transactions_from(offset, de)
}

pub fn fold_transactions_range<D>(&self, range: impl RangeBounds<u64>, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
self.inner.read().unwrap().fold_transaction_range(range, de)
}
}

/// Extract the most recently written [`segment::Metadata`] from the commitlog
Expand Down
38 changes: 9 additions & 29 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::{
};
use anyhow::{anyhow, Context};
use core::{cell::RefCell, ops::RangeBounds};
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use spacetimedb_commitlog::payload::txdata;
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap, IntMap};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics};
Expand Down Expand Up @@ -992,7 +992,7 @@ pub struct Replay<F> {

impl<F> Replay<F> {
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T {
let mut committed_state = self.committed_state.write_arc();
let mut committed_state = self.committed_state.write();
let mut visitor = ReplayVisitor {
database_identity: &self.database_identity,
committed_state: &mut committed_state,
Expand All @@ -1006,10 +1006,14 @@ impl<F> Replay<F> {
pub fn next_tx_offset(&self) -> u64 {
self.committed_state.read_arc().next_tx_offset
}

pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
self.committed_state.read()
}
}

impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for Replay<F> {
type Record = Txdata<ProductValue>;
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>;

fn decode_record<'a, R: BufReader<'a>>(
Expand Down Expand Up @@ -1040,30 +1044,6 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for Replay<F> {
}
}

impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>;

#[inline]
fn decode_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> std::result::Result<Self::Record, Self::Error> {
spacetimedb_commitlog::Decoder::decode_record(&**self, version, tx_offset, reader)
}

fn skip_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> std::result::Result<(), Self::Error> {
spacetimedb_commitlog::Decoder::skip_record(&**self, version, tx_offset, reader)
}
}

// n.b. (Tyler) We actually **do not** want to check constraints at replay
// time because not only is it a pain, but actually **subtly wrong** the
// way we have it implemented. It's wrong because the actual constraints of
Expand Down