diff --git a/Cargo.lock b/Cargo.lock index 2aeba04..7a13747 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -775,6 +775,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "parking_lot", "rand", "reqwest", "reqwest-middleware", diff --git a/Cargo.toml b/Cargo.toml index 25329d0..eb9d0e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ fuser = { version = "0.16.0", features = ["libfuse"] } libc = "0.2" mesa-dev = "1.11.0" num-traits = "0.2" +parking_lot = "0.12" reqwest = { version = "0.12", default-features = false } reqwest-middleware = "0.4" serde_path_to_error = "0.1" diff --git a/src/fs/fuser.rs b/src/fs/fuser.rs index 86ddabb..e0aaf69 100644 --- a/src/fs/fuser.rs +++ b/src/fs/fuser.rs @@ -1,7 +1,9 @@ use std::ffi::OsStr; +use std::sync::Arc; use crate::fs::r#trait::{CommonFileAttr, DirEntryType, FileAttr, Fs, LockOwner, OpenFlags}; -use tracing::{debug, error, instrument}; +use tracing::Instrument as _; +use tracing::{debug, error}; impl From for fuser::FileAttr { fn from(val: FileAttr) -> Self { @@ -106,55 +108,47 @@ impl From for OpenFlags { } } -pub struct FuserAdapter +const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1); + +pub struct FuserAdapter where - F::LookupError: Into, - F::GetAttrError: Into, - F::OpenError: Into, - F::ReadError: Into, - F::ReaddirError: Into, - F::ReleaseError: Into, + F::LookupError: Into + Send + 'static, + F::GetAttrError: Into + Send + 'static, + F::OpenError: Into + Send + 'static, + F::ReadError: Into + Send + 'static, + F::ReaddirError: Into + Send + 'static, + F::ReleaseError: Into + Send + 'static, { - fs: F, + fs: Arc, runtime: tokio::runtime::Handle, } -impl FuserAdapter +impl FuserAdapter where - F::LookupError: Into, - F::GetAttrError: Into, - F::OpenError: Into, - F::ReadError: Into, - F::ReaddirError: Into, - F::ReleaseError: Into, + F::LookupError: Into + Send + 'static, + F::GetAttrError: Into + Send + 'static, + F::OpenError: Into + Send + 'static, + F::ReadError: Into + Send + 'static, + F::ReaddirError: Into + Send + 'static, + F::ReleaseError: Into + Send + 'static, { - // TODO(markovejnovic): This low TTL is really not ideal. It slows us down a lot, since the - // kernel has to ask us for every single lookup all the time. - // - // I think a better implementation is to implement - // - // notify_inval_inode(ino, offset, len) - // notify_inval_entry(parent_ino, name) - // - // These two functions can be used to invalidate specific entries in the kernel cache when we - // know they have changed. This would allow us to set a much higher TTL here. - const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1); - pub fn new(fs: F, runtime: tokio::runtime::Handle) -> Self { - Self { fs, runtime } + Self { + fs: Arc::new(fs), + runtime, + } } } -impl fuser::Filesystem for FuserAdapter +impl fuser::Filesystem for FuserAdapter where - F::LookupError: Into, - F::GetAttrError: Into, - F::OpenError: Into, - F::ReadError: Into, - F::ReaddirError: Into, - F::ReleaseError: Into, + F::LookupError: Into + Send + 'static, + F::GetAttrError: Into + Send + 'static, + F::OpenError: Into + Send + 'static, + F::ReadError: Into + Send + 'static, + F::ReaddirError: Into + Send + 'static, + F::ReleaseError: Into + Send + 'static, { - #[instrument(name = "FuserAdapter::lookup", skip(self, _req, reply))] fn lookup( &mut self, _req: &fuser::Request<'_>, @@ -162,23 +156,27 @@ where name: &OsStr, reply: fuser::ReplyEntry, ) { - match self.runtime.block_on(self.fs.lookup(parent, name)) { - Ok(attr) => { - // TODO(markovejnovic): Passing generation = 0 here is a recipe for disaster. - // Someone with A LOT of files will likely see inode reuse which will lead to a - // disaster. - let f_attr: fuser::FileAttr = attr.into(); - debug!(?f_attr, "replying..."); - reply.entry(&Self::SHAMEFUL_TTL, &f_attr, 0); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let name = name.to_owned(); + let span = tracing::debug_span!("FuserAdapter::lookup", parent, ?name); + self.runtime.spawn( + async move { + match fs.lookup(parent, &name).await { + Ok(attr) => { + let f_attr: fuser::FileAttr = attr.into(); + debug!(?f_attr, "replying..."); + reply.entry(&SHAMEFUL_TTL, &f_attr, 0); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } + } } - } + .instrument(span), + ); } - #[instrument(name = "FuserAdapter::getattr", skip(self, _req, fh, reply))] fn getattr( &mut self, _req: &fuser::Request<'_>, @@ -186,19 +184,25 @@ where fh: Option, reply: fuser::ReplyAttr, ) { - match self.runtime.block_on(self.fs.getattr(ino, fh)) { - Ok(attr) => { - debug!(?attr, "replying..."); - reply.attr(&Self::SHAMEFUL_TTL, &attr.into()); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let span = tracing::debug_span!("FuserAdapter::getattr", ino); + self.runtime.spawn( + async move { + match fs.getattr(ino, fh).await { + Ok(attr) => { + debug!(?attr, "replying..."); + reply.attr(&SHAMEFUL_TTL, &attr.into()); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } + } } - } + .instrument(span), + ); } - #[instrument(name = "FuserAdapter::readdir", skip(self, _req, _fh, offset, reply))] fn readdir( &mut self, _req: &fuser::Request<'_>, @@ -207,60 +211,70 @@ where offset: i64, mut reply: fuser::ReplyDirectory, ) { - let entries = match self.runtime.block_on(self.fs.readdir(ino)) { - Ok(entries) => entries, - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); - return; - } - }; + let fs = Arc::clone(&self.fs); + let span = tracing::debug_span!("FuserAdapter::readdir", ino); + self.runtime.spawn( + async move { + let entries = match fs.readdir(ino).await { + Ok(entries) => entries, + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + return; + } + }; - #[expect( - clippy::cast_possible_truncation, - reason = "fuser offset is i64 but always non-negative" - )] - for (i, entry) in entries - .iter() - .enumerate() - .skip(offset.cast_unsigned() as usize) - { - let kind: fuser::FileType = entry.kind.into(); - let Ok(idx): Result = (i + 1).try_into() else { - error!("Directory entry index {} too large for fuser", i + 1); - reply.error(libc::EIO); - return; - }; + #[expect( + clippy::cast_possible_truncation, + reason = "fuser offset is i64 but always non-negative" + )] + for (i, entry) in entries + .iter() + .enumerate() + .skip(offset.cast_unsigned() as usize) + { + let kind: fuser::FileType = entry.kind.into(); + let Ok(idx): Result = (i + 1).try_into() else { + error!("Directory entry index {} too large for fuser", i + 1); + reply.error(libc::EIO); + return; + }; - debug!(?entry, "adding entry to reply..."); - if reply.add(entry.ino, idx, kind, &entry.name) { - debug!("buffer full for now, stopping readdir"); - break; - } - } + debug!(?entry, "adding entry to reply..."); + if reply.add(entry.ino, idx, kind, &entry.name) { + debug!("buffer full for now, stopping readdir"); + break; + } + } - debug!("finalizing reply..."); - reply.ok(); + debug!("finalizing reply..."); + reply.ok(); + } + .instrument(span), + ); } - #[instrument(name = "FuserAdapter::open", skip(self, _req, flags, reply))] fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - match self.runtime.block_on(self.fs.open(ino, flags.into())) { - Ok(open_file) => { - debug!(handle = open_file.handle, "replying..."); - reply.opened(open_file.handle, 0); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let flags: OpenFlags = flags.into(); + let span = tracing::debug_span!("FuserAdapter::open", ino); + self.runtime.spawn( + async move { + match fs.open(ino, flags).await { + Ok(open_file) => { + debug!(handle = open_file.handle, "replying..."); + reply.opened(open_file.handle, 0); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } + } } - } + .instrument(span), + ); } - #[instrument( - name = "FuserAdapter::read", - skip(self, _req, fh, offset, size, flags, lock_owner, reply) - )] fn read( &mut self, _req: &fuser::Request<'_>, @@ -272,28 +286,30 @@ where lock_owner: Option, reply: fuser::ReplyData, ) { + let fs = Arc::clone(&self.fs); let flags: OpenFlags = flags.into(); let lock_owner = lock_owner.map(LockOwner); - match self.runtime.block_on(self.fs.read( - ino, - fh, - offset.cast_unsigned(), - size, - flags, - lock_owner, - )) { - Ok(data) => { - debug!(read_bytes = data.len(), "replying..."); - reply.data(&data); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let span = tracing::debug_span!("FuserAdapter::read", ino); + self.runtime.spawn( + async move { + match fs + .read(ino, fh, offset.cast_unsigned(), size, flags, lock_owner) + .await + { + Ok(data) => { + debug!(read_bytes = data.len(), "replying..."); + reply.data(&data); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } + } } - } + .instrument(span), + ); } - #[instrument(name = "FuserAdapter::release", skip(self, _req, _lock_owner, reply))] fn release( &mut self, _req: &fuser::Request<'_>, @@ -304,48 +320,63 @@ where flush: bool, reply: fuser::ReplyEmpty, ) { - match self - .runtime - .block_on(self.fs.release(ino, fh, flags.into(), flush)) - { - Ok(()) => { - debug!("replying ok"); - reply.ok(); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let flags: OpenFlags = flags.into(); + let span = tracing::debug_span!("FuserAdapter::release", ino, fh); + self.runtime.spawn( + async move { + match fs.release(ino, fh, flags, flush).await { + Ok(()) => { + debug!("replying ok"); + reply.ok(); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } + } } - } + .instrument(span), + ); } - #[instrument(name = "FuserAdapter::forget", skip(self, _req, nlookup))] fn forget(&mut self, _req: &fuser::Request<'_>, ino: u64, nlookup: u64) { - self.runtime.block_on(self.fs.forget(ino, nlookup)); + let fs = Arc::clone(&self.fs); + let span = tracing::debug_span!("FuserAdapter::forget", ino, nlookup); + self.runtime.spawn( + async move { + fs.forget(ino, nlookup).await; + } + .instrument(span), + ); } - #[instrument(name = "FuserAdapter::statfs", skip(self, _req, _ino, reply))] fn statfs(&mut self, _req: &fuser::Request<'_>, _ino: u64, reply: fuser::ReplyStatfs) { - self.runtime.block_on(async { - match self.fs.statfs().await { - Ok(statvfs) => { - debug!(?statvfs, "replying..."); - reply.statfs( - statvfs.total_blocks, - statvfs.free_blocks, - statvfs.available_blocks, - statvfs.total_inodes, - statvfs.free_inodes, - statvfs.block_size, - statvfs.max_filename_length, - 0, - ); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.raw_os_error().unwrap_or(libc::EIO)); + let fs = Arc::clone(&self.fs); + let span = tracing::debug_span!("FuserAdapter::statfs"); + self.runtime.spawn( + async move { + match fs.statfs().await { + Ok(statvfs) => { + debug!(?statvfs, "replying..."); + reply.statfs( + statvfs.total_blocks, + statvfs.free_blocks, + statvfs.available_blocks, + statvfs.total_inodes, + statvfs.free_inodes, + statvfs.block_size, + statvfs.max_filename_length, + 0, + ); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.raw_os_error().unwrap_or(libc::EIO)); + } } } - }); + .instrument(span), + ); } } diff --git a/src/fs/icache/bridge.rs b/src/fs/icache/bridge.rs index e674a56..66faf1a 100644 --- a/src/fs/icache/bridge.rs +++ b/src/fs/icache/bridge.rs @@ -1,72 +1,104 @@ +use parking_lot::RwLock; + use crate::fs::r#trait::{FileAttr, FileHandle, Inode}; /// Bidirectional bridge for both inodes and file handles between two Fs layers. /// /// Convention: **left = outer (caller), right = inner (callee)**. -/// `forward(left)` → right, `backward(right)` → left. +/// `forward(left)` -> right, `backward(right)` -> left. +/// +/// All methods take `&self`; internal synchronization via `parking_lot::RwLock`. pub struct HashMapBridge { - inode_map: bimap::BiMap, - fh_map: bimap::BiMap, + inode_map: RwLock>, + fh_map: RwLock>, } impl HashMapBridge { pub fn new() -> Self { Self { - inode_map: bimap::BiMap::new(), - fh_map: bimap::BiMap::new(), + inode_map: RwLock::new(bimap::BiMap::new()), + fh_map: RwLock::new(bimap::BiMap::new()), } } - // ── Inode methods ──────────────────────────────────────────────────── + /// Clear both maps, resetting the bridge to its initial empty state. + pub fn reset(&self) { + self.inode_map.write().clear(); + self.fh_map.write().clear(); + } - pub fn insert_inode(&mut self, left: Inode, right: Inode) { - self.inode_map.insert(left, right); + // Inode methods + + pub fn insert_inode(&self, left: Inode, right: Inode) { + self.inode_map.write().insert(left, right); } - /// Look up right→left, or allocate a new left inode if unmapped. + /// Look up right->left, or allocate a new left inode if unmapped. + /// + /// Uses double-checked locking: read-lock first, then write-lock only if + /// the mapping is missing. pub fn backward_or_insert_inode( - &mut self, + &self, right: Inode, allocate: impl FnOnce() -> Inode, ) -> Inode { - if let Some(&left) = self.inode_map.get_by_right(&right) { + // Fast path: read-lock. + { + let map = self.inode_map.read(); + if let Some(&left) = map.get_by_right(&right) { + return left; + } + } + // Slow path: write-lock with re-check. + let mut map = self.inode_map.write(); + if let Some(&left) = map.get_by_right(&right) { left } else { let left = allocate(); - self.inode_map.insert(left, right); + map.insert(left, right); left } } - /// Look up left→right, or allocate a new right inode if unmapped. - pub fn forward_or_insert_inode( - &mut self, - left: Inode, - allocate: impl FnOnce() -> Inode, - ) -> Inode { - if let Some(&right) = self.inode_map.get_by_left(&left) { + /// Look up left->right, or allocate a new right inode if unmapped. + /// + /// Uses double-checked locking: read-lock first, then write-lock only if + /// the mapping is missing. + pub fn forward_or_insert_inode(&self, left: Inode, allocate: impl FnOnce() -> Inode) -> Inode { + // Fast path: read-lock. + { + let map = self.inode_map.read(); + if let Some(&right) = map.get_by_left(&left) { + return right; + } + } + // Slow path: write-lock with re-check. + let mut map = self.inode_map.write(); + if let Some(&right) = map.get_by_left(&left) { right } else { let right = allocate(); - self.inode_map.insert(left, right); + map.insert(left, right); right } } /// Remove an inode mapping by its left (outer) key. - pub fn remove_inode_by_left(&mut self, left: Inode) { - self.inode_map.remove_by_left(&left); + pub fn remove_inode_by_left(&self, left: Inode) { + self.inode_map.write().remove_by_left(&left); } - /// Look up left→right directly. - pub fn inode_map_get_by_left(&self, left: Inode) -> Option<&Inode> { - self.inode_map.get_by_left(&left) + /// Look up left->right directly. Returns an owned `Inode` (copied out of + /// the lock guard) so the caller does not hold a borrow into the map. + pub fn inode_map_get_by_left(&self, left: Inode) -> Option { + self.inode_map.read().get_by_left(&left).copied() } /// Rewrite the `ino` field in a [`FileAttr`] from right (inner) to left (outer) namespace. pub fn attr_backward(&self, attr: FileAttr) -> FileAttr { + let map = self.inode_map.read(); let backward = |ino: Inode| -> Inode { - if let Some(&left) = self.inode_map.get_by_right(&ino) { + if let Some(&left) = map.get_by_right(&ino) { left } else { tracing::warn!( @@ -79,19 +111,19 @@ impl HashMapBridge { rewrite_attr_ino(attr, backward) } - // ── File handle methods ────────────────────────────────────────────── + // File handle methods - pub fn insert_fh(&mut self, left: FileHandle, right: FileHandle) { - self.fh_map.insert(left, right); + pub fn insert_fh(&self, left: FileHandle, right: FileHandle) { + self.fh_map.write().insert(left, right); } pub fn fh_forward(&self, left: FileHandle) -> Option { - self.fh_map.get_by_left(&left).copied() + self.fh_map.read().get_by_left(&left).copied() } /// Remove a file handle mapping by its left (outer) key. - pub fn remove_fh_by_left(&mut self, left: FileHandle) { - self.fh_map.remove_by_left(&left); + pub fn remove_fh_by_left(&self, left: FileHandle) { + self.fh_map.write().remove_by_left(&left); } } diff --git a/src/fs/mescloud/composite.rs b/src/fs/mescloud/composite.rs index 6dbac25..bb97316 100644 --- a/src/fs/mescloud/composite.rs +++ b/src/fs/mescloud/composite.rs @@ -1,7 +1,9 @@ -use std::collections::HashMap; use std::ffi::OsStr; +use std::sync::Arc; use bytes::Bytes; +use parking_lot::RwLock; +use scc::HashMap as ConcurrentHashMap; use tracing::{instrument, trace, warn}; use crate::fs::icache::bridge::HashMapBridge; @@ -42,12 +44,11 @@ where { pub icache: MescloudICache, pub file_table: FileTable, - pub readdir_buf: Vec, /// Maps outer inode to index into `slots` for child-root inodes. - pub child_inodes: HashMap, + pub child_inodes: ConcurrentHashMap, /// Maps every translated outer inode to its owning slot index. - pub inode_to_slot: HashMap, - pub slots: Vec>, + pub inode_to_slot: ConcurrentHashMap, + pub slots: RwLock>>>, } impl CompositeFs @@ -67,14 +68,18 @@ where /// Look up which child slot owns an inode via direct map. #[instrument(name = "CompositeFs::slot_for_inode", skip(self))] pub fn slot_for_inode(&self, ino: Inode) -> Option { - self.inode_to_slot.get(&ino).copied() + self.inode_to_slot.read_sync(&ino, |_, &v| v) } /// Allocate an outer file handle and map it through the bridge. #[must_use] - pub fn alloc_fh(&mut self, slot_idx: usize, inner_fh: FileHandle) -> FileHandle { + pub fn alloc_fh(&self, slot_idx: usize, inner_fh: FileHandle) -> FileHandle { let fh = self.file_table.allocate(); - self.slots[slot_idx].bridge.insert_fh(fh, inner_fh); + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[slot_idx]) + }; + slot.bridge.insert_fh(fh, inner_fh); fh } @@ -82,16 +87,20 @@ where /// Also inserts a stub ICB into the outer icache when the inode is new. #[instrument(name = "CompositeFs::translate_inner_ino", skip(self, name))] pub async fn translate_inner_ino( - &mut self, + &self, slot_idx: usize, inner_ino: Inode, parent_outer_ino: Inode, name: &OsStr, ) -> Inode { - let outer_ino = self.slots[slot_idx] + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[slot_idx]) + }; + let outer_ino = slot .bridge .backward_or_insert_inode(inner_ino, || self.icache.allocate_inode()); - self.inode_to_slot.insert(outer_ino, slot_idx); + let _ = self.inode_to_slot.insert_async(outer_ino, slot_idx).await; self.icache .entry_or_insert_icb( outer_ino, @@ -120,7 +129,7 @@ where /// Find slot, forward inode, delegate to inner, allocate outer file handle. #[instrument(name = "CompositeFs::delegated_open", skip(self))] pub async fn delegated_open( - &mut self, + &self, ino: Inode, flags: OpenFlags, ) -> Result { @@ -128,10 +137,14 @@ where warn!(ino, "open on inode not belonging to any child"); OpenError::InodeNotFound })?; - let inner_ino = self.slots[idx] + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + let inner_ino = slot .bridge .forward_or_insert_inode(ino, || unreachable!("open: ino should be mapped")); - let inner_open = self.slots[idx].inner.open(inner_ino, flags).await?; + let inner_open = slot.inner.open(inner_ino, flags).await?; let outer_fh = self.alloc_fh(idx, inner_open.handle); trace!( ino, @@ -149,7 +162,7 @@ where #[expect(clippy::too_many_arguments, reason = "mirrors fuser read API")] #[instrument(name = "CompositeFs::delegated_read", skip(self))] pub async fn delegated_read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -161,15 +174,18 @@ where warn!(ino, "read on inode not belonging to any child"); ReadError::InodeNotFound })?; - let inner_ino = self.slots[idx] + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + let inner_ino = slot .bridge .forward_or_insert_inode(ino, || unreachable!("read: ino should be mapped")); - let inner_fh = self.slots[idx].bridge.fh_forward(fh).ok_or_else(|| { + let inner_fh = slot.bridge.fh_forward(fh).ok_or_else(|| { warn!(fh, "read: no fh mapping found"); ReadError::FileNotOpen })?; - self.slots[idx] - .inner + slot.inner .read(inner_ino, inner_fh, offset, size, flags, lock_owner) .await } @@ -178,7 +194,7 @@ where /// then clean up the file handle mapping. #[instrument(name = "CompositeFs::delegated_release", skip(self))] pub async fn delegated_release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -188,18 +204,19 @@ where warn!(ino, "release on inode not belonging to any child"); ReleaseError::FileNotOpen })?; - let inner_ino = self.slots[idx] + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + let inner_ino = slot .bridge .forward_or_insert_inode(ino, || unreachable!("release: ino should be mapped")); - let inner_fh = self.slots[idx].bridge.fh_forward(fh).ok_or_else(|| { + let inner_fh = slot.bridge.fh_forward(fh).ok_or_else(|| { warn!(fh, "release: no fh mapping found"); ReleaseError::FileNotOpen })?; - let result = self.slots[idx] - .inner - .release(inner_ino, inner_fh, flags, flush) - .await; - self.slots[idx].bridge.remove_fh_by_left(fh); + let result = slot.inner.release(inner_ino, inner_fh, flags, flush).await; + slot.bridge.remove_fh_by_left(fh); trace!(ino, fh, "release: cleaned up fh mapping"); result } @@ -213,20 +230,27 @@ where /// evict the inner root, breaking all subsequent operations on that child. #[must_use] #[instrument(name = "CompositeFs::delegated_forget", skip(self))] - pub async fn delegated_forget(&mut self, ino: Inode, nlookups: u64) -> bool { + pub async fn delegated_forget(&self, ino: Inode, nlookups: u64) -> bool { let slot_idx = self.slot_for_inode(ino); - let is_child_root = self.child_inodes.contains_key(&ino); - if !is_child_root - && let Some(idx) = slot_idx - && let Some(&inner_ino) = self.slots[idx].bridge.inode_map_get_by_left(ino) - { - self.slots[idx].inner.forget(inner_ino, nlookups).await; + let is_child_root = self.child_inodes.contains_sync(&ino); + if !is_child_root && let Some(idx) = slot_idx { + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + if let Some(inner_ino) = slot.bridge.inode_map_get_by_left(ino) { + slot.inner.forget(inner_ino, nlookups).await; + } } if self.icache.forget(ino, nlookups).await.is_some() { - self.child_inodes.remove(&ino); - self.inode_to_slot.remove(&ino); + self.child_inodes.remove_async(&ino).await; + self.inode_to_slot.remove_async(&ino).await; if let Some(idx) = slot_idx { - self.slots[idx].bridge.remove_inode_by_left(ino); + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + slot.bridge.remove_inode_by_left(ino); } true } else { @@ -243,23 +267,25 @@ where /// Delegation branch for lookup when the parent is owned by a child slot. #[instrument(name = "CompositeFs::delegated_lookup", skip(self, name))] pub async fn delegated_lookup( - &mut self, + &self, parent: Inode, name: &OsStr, ) -> Result { let idx = self .slot_for_inode(parent) .ok_or(LookupError::InodeNotFound)?; - let inner_parent = self.slots[idx] + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + let inner_parent = slot .bridge .forward_or_insert_inode(parent, || unreachable!("lookup: parent should be mapped")); - let inner_attr = self.slots[idx].inner.lookup(inner_parent, name).await?; + let inner_attr = slot.inner.lookup(inner_parent, name).await?; let inner_ino = inner_attr.common().ino; let outer_ino = self.translate_inner_ino(idx, inner_ino, parent, name).await; - let outer_attr = self.slots[idx].bridge.attr_backward(inner_attr); + let outer_attr = slot.bridge.attr_backward(inner_attr); self.icache.cache_attr(outer_ino, outer_attr).await; - // None means the entry was concurrently evicted; fail the lookup so - // the kernel doesn't hold a ref the cache no longer tracks. let rc = self .icache .inc_rc(outer_ino) @@ -271,29 +297,36 @@ where /// Delegation branch for readdir when the inode is owned by a child slot. #[instrument(name = "CompositeFs::delegated_readdir", skip(self))] - pub async fn delegated_readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + pub async fn delegated_readdir(&self, ino: Inode) -> Result, ReadDirError> { let idx = self .slot_for_inode(ino) .ok_or(ReadDirError::InodeNotFound)?; - let inner_ino = self.slots[idx] + let slot = { + let slots = self.slots.read(); + Arc::clone(&slots[idx]) + }; + let inner_ino = slot .bridge .forward_or_insert_inode(ino, || unreachable!("readdir: ino should be mapped")); - let inner_entries = self.slots[idx].inner.readdir(inner_ino).await?; - let inner_entries: Vec = inner_entries.to_vec(); + let inner_entries = slot.inner.readdir(inner_ino).await?; let evicted = self.icache.evict_zero_rc_children(ino).await; for evicted_ino in evicted { - if let Some(slot) = self.inode_to_slot.remove(&evicted_ino) { - self.slots[slot].bridge.remove_inode_by_left(evicted_ino); + if let Some((_, slot_idx)) = self.inode_to_slot.remove_async(&evicted_ino).await { + let evict_slot = { + let slots = self.slots.read(); + Arc::clone(&slots[slot_idx]) + }; + evict_slot.bridge.remove_inode_by_left(evicted_ino); } - self.child_inodes.remove(&evicted_ino); + self.child_inodes.remove_async(&evicted_ino).await; } let mut outer_entries = Vec::with_capacity(inner_entries.len()); for entry in &inner_entries { let outer_child_ino = self .translate_inner_ino(idx, entry.ino, ino, &entry.name) .await; - if let Some(inner_attr) = self.slots[idx].inner.peek_attr(entry.ino).await { - let outer_attr = self.slots[idx].bridge.attr_backward(inner_attr); + if let Some(inner_attr) = slot.inner.peek_attr(entry.ino).await { + let outer_attr = slot.bridge.attr_backward(inner_attr); self.icache.cache_attr(outer_child_ino, outer_attr).await; } outer_entries.push(DirEntry { @@ -302,7 +335,6 @@ where kind: entry.kind, }); } - self.readdir_buf = outer_entries; - Ok(&self.readdir_buf) + Ok(outer_entries) } } diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 15f1f5d..7fb3307 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -82,6 +82,7 @@ pub struct MescloudICache> { inode_factory: InodeFactory, fs_owner: (u32, u32), block_size: u32, + child_ino_lock: tokio::sync::Mutex<()>, } impl> MescloudICache { @@ -92,6 +93,7 @@ impl> MescloudICache { inode_factory: InodeFactory::new(root_ino + 1), fs_owner, block_size, + child_ino_lock: tokio::sync::Mutex::new(()), }; // Set root directory attr synchronously during initialization @@ -218,13 +220,11 @@ impl> MescloudICache { /// If new, inserts a stub ICB (parent+path set, attr=None, children=None, rc=0). /// Does NOT bump rc. Returns the inode number. /// - /// # Safety invariant - /// - /// The `for_each` scan and `insert_icb` are **not** atomic. If two callers - /// race with the same `(parent, name)`, both may allocate distinct inodes - /// for the same logical child. This is currently safe because all callers - /// go through `&mut self` on the owning `Fs` implementation. + /// A `tokio::sync::Mutex` serializes access so that concurrent callers + /// with the same `(parent, name)` do not allocate duplicate inodes. pub async fn ensure_child_ino(&self, parent: Inode, name: &OsStr) -> Inode { + let _guard = self.child_ino_lock.lock().await; + // Search for existing child by parent + name let mut existing_ino = None; self.inner @@ -434,4 +434,20 @@ mod tests { "child of different parent should survive" ); } + + #[tokio::test] + async fn ensure_child_ino_concurrent_same_name_returns_same_ino() { + let cache = test_mescloud_cache(); + let parent = 1; // root + + let futs: Vec<_> = (0..50) + .map(|_| cache.ensure_child_ino(parent, OsStr::new("same_child"))) + .collect(); + let results = futures::future::join_all(futs).await; + + let first = results[0]; + for (i, &ino) in results.iter().enumerate() { + assert_eq!(ino, first, "concurrent call {i} returned different inode"); + } + } } diff --git a/src/fs/mescloud/mod.rs b/src/fs/mescloud/mod.rs index 0e32933..b0f3b04 100644 --- a/src/fs/mescloud/mod.rs +++ b/src/fs/mescloud/mod.rs @@ -1,10 +1,12 @@ -use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; +use std::sync::Arc; use std::time::SystemTime; use bytes::Bytes; use mesa_dev::MesaClient; +use parking_lot::RwLock; +use scc::HashMap as ConcurrentHashMap; use secrecy::ExposeSecret as _; use tracing::{Instrument as _, instrument, trace, warn}; @@ -117,22 +119,22 @@ impl MesaFS { Self::BLOCK_SIZE, ), file_table: FileTable::new(), - readdir_buf: Vec::new(), - child_inodes: HashMap::new(), - inode_to_slot: HashMap::new(), - slots: orgs - .map(|org_conf| { + child_inodes: ConcurrentHashMap::new(), + inode_to_slot: ConcurrentHashMap::new(), + slots: RwLock::new( + orgs.map(|org_conf| { let client = MesaClient::builder() .with_api_key(org_conf.api_key.expose_secret()) .with_base_path(MESA_API_BASE_URL) .build(); let org = OrgFs::new(org_conf.name, client, fs_owner); - ChildSlot { + Arc::new(ChildSlot { inner: org, bridge: HashMapBridge::new(), - } + }) }) .collect(), + ), }, } } @@ -142,7 +144,7 @@ impl MesaFS { if ino == Self::ROOT_NODE_INO { return Some(InodeRole::Root); } - if self.composite.child_inodes.contains_key(&ino) { + if self.composite.child_inodes.contains_sync(&ino) { return Some(InodeRole::OrgOwned); } if self.composite.slot_for_inode(ino).is_some() { @@ -151,63 +153,83 @@ impl MesaFS { None } - /// Ensure a mesa-level inode exists for the org at `org_idx`. - /// Seeds the bridge with (`mesa_org_ino`, `OrgFs::ROOT_INO`). - /// Does NOT bump rc. - async fn ensure_org_inode(&mut self, org_idx: usize) -> (Inode, FileAttr) { - // Check if an inode already exists. - let existing_ino = self - .composite + /// Try to reuse an existing inode for `org_idx`. Returns `Some` if a valid + /// inode was found (or rebuilt), `None` if allocation is needed. + async fn try_reuse_org_inode(&self, org_idx: usize) -> Option<(Inode, FileAttr)> { + let mut existing_ino = None; + self.composite .child_inodes - .iter() - .find(|&(_, &idx)| idx == org_idx) - .map(|(&ino, _)| ino); - - if let Some(existing_ino) = existing_ino { - if let Some(attr) = self.composite.icache.get_attr(existing_ino).await { - let rc = self - .composite - .icache - .get_icb(existing_ino, |icb| icb.rc) - .await - .unwrap_or(0); - trace!( - ino = existing_ino, - org_idx, rc, "ensure_org_inode: reusing existing inode" - ); - return (existing_ino, attr); - } - if self.composite.icache.contains(existing_ino) { - // ICB exists but attr missing — rebuild and cache. - warn!( - ino = existing_ino, - org_idx, "ensure_org_inode: attr missing, rebuilding" - ); - let now = SystemTime::now(); - let attr = FileAttr::Directory { - common: mescloud_icache::make_common_file_attr( - existing_ino, - 0o755, - now, - now, - self.composite.icache.fs_owner(), - self.composite.icache.block_size(), - ), - }; - self.composite.icache.cache_attr(existing_ino, attr).await; - return (existing_ino, attr); - } - // ICB was evicted — clean up stale tracking entries. + .any_async(|&ino, &idx| { + if idx == org_idx { + existing_ino = Some(ino); + true + } else { + false + } + }) + .await; + let existing_ino = existing_ino?; + + if let Some(attr) = self.composite.icache.get_attr(existing_ino).await { + let rc = self + .composite + .icache + .get_icb(existing_ino, |icb| icb.rc) + .await + .unwrap_or(0); + trace!( + ino = existing_ino, + org_idx, rc, "ensure_org_inode: reusing existing inode" + ); + return Some((existing_ino, attr)); + } + if self.composite.icache.contains(existing_ino) { warn!( ino = existing_ino, - org_idx, "ensure_org_inode: ICB evicted, cleaning up stale entry" + org_idx, "ensure_org_inode: attr missing, rebuilding" ); - self.composite.child_inodes.remove(&existing_ino); - self.composite.inode_to_slot.remove(&existing_ino); + let now = SystemTime::now(); + let attr = FileAttr::Directory { + common: mescloud_icache::make_common_file_attr( + existing_ino, + 0o755, + now, + now, + self.composite.icache.fs_owner(), + self.composite.icache.block_size(), + ), + }; + self.composite.icache.cache_attr(existing_ino, attr).await; + return Some((existing_ino, attr)); + } + warn!( + ino = existing_ino, + org_idx, "ensure_org_inode: ICB evicted, cleaning up stale entry" + ); + self.composite + .child_inodes + .remove_async(&existing_ino) + .await; + self.composite + .inode_to_slot + .remove_async(&existing_ino) + .await; + None + } + + /// Ensure a mesa-level inode exists for the org at `org_idx`. + /// Seeds the bridge with (`mesa_org_ino`, `OrgFs::ROOT_INO`). + /// Does NOT bump rc. + async fn ensure_org_inode(&self, org_idx: usize) -> (Inode, FileAttr) { + if let Some(result) = self.try_reuse_org_inode(org_idx).await { + return result; } // Allocate new. - let org_name = self.composite.slots[org_idx].inner.name().to_owned(); + let org_name = { + let slots = self.composite.slots.read(); + slots[org_idx].inner.name().to_owned() + }; let ino = self.composite.icache.allocate_inode(); trace!(ino, org_idx, org = %org_name, "ensure_org_inode: allocated new inode"); @@ -226,15 +248,21 @@ impl MesaFS { ) .await; - self.composite.child_inodes.insert(ino, org_idx); - self.composite.inode_to_slot.insert(ino, org_idx); + let _ = self.composite.child_inodes.insert_async(ino, org_idx).await; + let _ = self + .composite + .inode_to_slot + .insert_async(ino, org_idx) + .await; // Reset bridge (may have stale mappings from a previous eviction cycle) // and seed: mesa org-root <-> OrgFs::ROOT_INO. - self.composite.slots[org_idx].bridge = HashMapBridge::new(); - self.composite.slots[org_idx] - .bridge - .insert_inode(ino, OrgFs::ROOT_INO); + let slot = { + let slots = self.composite.slots.read(); + Arc::clone(&slots[org_idx]) + }; + slot.bridge.reset(); + slot.bridge.insert_inode(ino, OrgFs::ROOT_INO); let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( @@ -261,17 +289,18 @@ impl Fs for MesaFS { type ReleaseError = ReleaseError; #[instrument(name = "MesaFS::lookup", skip(self))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; match role { InodeRole::Root => { let org_name = name.to_str().ok_or(LookupError::InodeNotFound)?; - let org_idx = self - .composite - .slots - .iter() - .position(|s| s.inner.name() == org_name) - .ok_or(LookupError::InodeNotFound)?; + let org_idx = { + let slots = self.composite.slots.read(); + slots + .iter() + .position(|s| s.inner.name() == org_name) + .ok_or(LookupError::InodeNotFound)? + }; trace!(org = org_name, "lookup: matched org"); let (ino, attr) = self.ensure_org_inode(org_idx).await; @@ -289,26 +318,23 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::getattr", skip(self))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.composite.delegated_getattr(ino).await } #[instrument(name = "MesaFS::readdir", skip(self))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { let role = self.inode_role(ino).ok_or(ReadDirError::InodeNotFound)?; match role { InodeRole::Root => { - let org_info: Vec<(usize, String)> = self - .composite - .slots - .iter() - .enumerate() - .map(|(idx, s)| (idx, s.inner.name().to_owned())) - .collect(); + let org_info: Vec<(usize, String)> = { + let slots = self.composite.slots.read(); + slots + .iter() + .enumerate() + .map(|(idx, s)| (idx, s.inner.name().to_owned())) + .collect() + }; let mut entries = Vec::with_capacity(org_info.len()); for (org_idx, name) in &org_info { @@ -321,21 +347,20 @@ impl Fs for MesaFS { } trace!(entry_count = entries.len(), "readdir: listing orgs"); - self.composite.readdir_buf = entries; - Ok(&self.composite.readdir_buf) + Ok(entries) } InodeRole::OrgOwned => self.composite.delegated_readdir(ino).await, } } #[instrument(name = "MesaFS::open", skip(self))] - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result { self.composite.delegated_open(ino, flags).await } #[instrument(name = "MesaFS::read", skip(self))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -350,7 +375,7 @@ impl Fs for MesaFS { #[instrument(name = "MesaFS::release", skip(self))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -362,12 +387,12 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::forget", skip(self))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { // MesaFS has no extra state to clean up on eviction (unlike OrgFs::owner_inodes). let _ = self.composite.delegated_forget(ino, nlookups).await; } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.composite.delegated_statfs()) } } diff --git a/src/fs/mescloud/org.rs b/src/fs/mescloud/org.rs index 968c748..de37078 100644 --- a/src/fs/mescloud/org.rs +++ b/src/fs/mescloud/org.rs @@ -1,11 +1,12 @@ -use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; +use std::sync::Arc; use std::time::SystemTime; use bytes::Bytes; use futures::TryStreamExt as _; use mesa_dev::MesaClient; +use scc::HashMap as ConcurrentHashMap; use secrecy::SecretString; use tracing::{Instrument as _, instrument, trace, warn}; @@ -93,7 +94,7 @@ pub struct OrgFs { client: MesaClient, composite: CompositeFs, /// Maps org-level owner-dir inodes to owner name (github only). - owner_inodes: HashMap, + owner_inodes: ConcurrentHashMap, } impl OrgFs { @@ -121,37 +122,46 @@ impl OrgFs { /// Ensure an inode exists for a virtual owner directory (github only). Does NOT bump rc. /// TODO(MES-674): Cleanup "special" casing for github. - async fn ensure_owner_inode(&mut self, owner: &str) -> (Inode, FileAttr) { + async fn ensure_owner_inode(&self, owner: &str) -> (Inode, FileAttr) { // Check existing let mut stale_ino = None; - for (&ino, existing_owner) in &self.owner_inodes { - if existing_owner == owner { - if let Some(attr) = self.composite.icache.get_attr(ino).await { - return (ino, attr); - } - if self.composite.icache.contains(ino) { - // ICB exists but attr missing — rebuild and cache - let now = SystemTime::now(); - let attr = FileAttr::Directory { - common: mescloud_icache::make_common_file_attr( - ino, - 0o755, - now, - now, - self.composite.icache.fs_owner(), - self.composite.icache.block_size(), - ), - }; - self.composite.icache.cache_attr(ino, attr).await; - return (ino, attr); + let mut found = None; + self.owner_inodes + .any_async(|&ino, existing_owner| { + if existing_owner == owner { + found = Some(ino); + true + } else { + false } - // ICB was evicted — mark for cleanup - stale_ino = Some(ino); - break; + }) + .await; + + if let Some(ino) = found { + if let Some(attr) = self.composite.icache.get_attr(ino).await { + return (ino, attr); + } + if self.composite.icache.contains(ino) { + // ICB exists but attr missing — rebuild and cache + let now = SystemTime::now(); + let attr = FileAttr::Directory { + common: mescloud_icache::make_common_file_attr( + ino, + 0o755, + now, + now, + self.composite.icache.fs_owner(), + self.composite.icache.block_size(), + ), + }; + self.composite.icache.cache_attr(ino, attr).await; + return (ino, attr); } + // ICB was evicted — mark for cleanup + stale_ino = Some(ino); } if let Some(ino) = stale_ino { - self.owner_inodes.remove(&ino); + self.owner_inodes.remove_async(&ino).await; } // Allocate new @@ -170,7 +180,7 @@ impl OrgFs { }, ) .await; - self.owner_inodes.insert(ino, owner.to_owned()); + drop(self.owner_inodes.insert_async(ino, owner.to_owned()).await); let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( ino, @@ -197,12 +207,11 @@ impl OrgFs { composite: CompositeFs { icache: MescloudICache::new(resolver, Self::ROOT_INO, fs_owner, Self::BLOCK_SIZE), file_table: FileTable::new(), - readdir_buf: Vec::new(), - child_inodes: HashMap::new(), - inode_to_slot: HashMap::new(), - slots: Vec::new(), + child_inodes: ConcurrentHashMap::new(), + inode_to_slot: ConcurrentHashMap::new(), + slots: parking_lot::RwLock::new(Vec::new()), }, - owner_inodes: HashMap::new(), + owner_inodes: ConcurrentHashMap::new(), } } @@ -211,10 +220,10 @@ impl OrgFs { if ino == Self::ROOT_INO { return Some(InodeRole::OrgRoot); } - if self.owner_inodes.contains_key(&ino) { + if self.owner_inodes.contains_sync(&ino) { return Some(InodeRole::OwnerDir); } - if self.composite.child_inodes.contains_key(&ino) { + if self.composite.child_inodes.contains_sync(&ino) { return Some(InodeRole::RepoOwned); } if self.composite.slot_for_inode(ino).is_some() { @@ -230,41 +239,55 @@ impl OrgFs { /// - `display_name`: name shown in filesystem ("linux" for github, same as `repo_name` otherwise) /// - `parent_ino`: owner-dir inode for github, `ROOT_INO` otherwise async fn ensure_repo_inode( - &mut self, + &self, repo_name: &str, display_name: &str, default_branch: &str, parent_ino: Inode, ) -> (Inode, FileAttr) { // Check existing repos. - for (&ino, &idx) in &self.composite.child_inodes { - if self.composite.slots[idx].inner.repo_name() == repo_name { - if let Some(attr) = self.composite.icache.get_attr(ino).await { - let rc = self - .composite - .icache - .get_icb(ino, |icb| icb.rc) - .await - .unwrap_or(0); - trace!(ino, repo = repo_name, rc, "ensure_repo_inode: reusing"); - return (ino, attr); + let mut found = None; + self.composite + .child_inodes + .any_async(|&ino, &idx| { + let slot = { + let slots = self.composite.slots.read(); + Arc::clone(&slots[idx]) + }; + if slot.inner.repo_name() == repo_name { + found = Some((ino, idx)); + true + } else { + false } - warn!( - ino, - repo = repo_name, - "ensure_repo_inode: attr missing, rebuilding" - ); - return self.make_repo_dir_attr(ino).await; + }) + .await; + + if let Some((ino, _idx)) = found { + if let Some(attr) = self.composite.icache.get_attr(ino).await { + let rc = self + .composite + .icache + .get_icb(ino, |icb| icb.rc) + .await + .unwrap_or(0); + trace!(ino, repo = repo_name, rc, "ensure_repo_inode: reusing"); + return (ino, attr); } + warn!( + ino, + repo = repo_name, + "ensure_repo_inode: attr missing, rebuilding" + ); + return self.make_repo_dir_attr(ino).await; } // Check for orphaned slot (slot exists but not in child_inodes). - if let Some(idx) = self - .composite - .slots - .iter() - .position(|s| s.inner.repo_name() == repo_name) - { + let orphan_idx = { + let slots = self.composite.slots.read(); + slots.iter().position(|s| s.inner.repo_name() == repo_name) + }; + if let Some(idx) = orphan_idx { return self.register_repo_slot(idx, display_name, parent_ino).await; } @@ -298,16 +321,20 @@ impl OrgFs { self.composite.icache.fs_owner(), ); - let mut bridge = HashMapBridge::new(); + let bridge = HashMapBridge::new(); bridge.insert_inode(ino, RepoFs::ROOT_INO); - let idx = self.composite.slots.len(); - self.composite.slots.push(ChildSlot { - inner: repo, - bridge, - }); - self.composite.child_inodes.insert(ino, idx); - self.composite.inode_to_slot.insert(ino, idx); + let idx = { + let mut slots = self.composite.slots.write(); + let idx = slots.len(); + slots.push(Arc::new(ChildSlot { + inner: repo, + bridge, + })); + idx + }; + let _ = self.composite.child_inodes.insert_async(ino, idx).await; + let _ = self.composite.inode_to_slot.insert_async(ino, idx).await; self.make_repo_dir_attr(ino).await } @@ -315,7 +342,7 @@ impl OrgFs { /// Allocate a new inode, register it in an existing (orphaned) slot, and /// return `(ino, attr)`. async fn register_repo_slot( - &mut self, + &self, idx: usize, display_name: &str, parent_ino: Inode, @@ -337,18 +364,20 @@ impl OrgFs { ) .await; + let slot = { + let slots = self.composite.slots.read(); + Arc::clone(&slots[idx]) + }; warn!( ino, idx, "register_repo_slot: resetting bridge for orphaned slot; \ inner filesystem will not receive forget for stale inode mappings" ); - self.composite.slots[idx].bridge = HashMapBridge::new(); - self.composite.slots[idx] - .bridge - .insert_inode(ino, RepoFs::ROOT_INO); - self.composite.child_inodes.insert(ino, idx); - self.composite.inode_to_slot.insert(ino, idx); + slot.bridge.reset(); + slot.bridge.insert_inode(ino, RepoFs::ROOT_INO); + let _ = self.composite.child_inodes.insert_async(ino, idx).await; + let _ = self.composite.inode_to_slot.insert_async(ino, idx).await; self.make_repo_dir_attr(ino).await } @@ -402,7 +431,7 @@ impl Fs for OrgFs { type ReleaseError = ReleaseError; #[instrument(name = "OrgFs::lookup", skip(self), fields(org = %self.name))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; match role { InodeRole::OrgRoot => { @@ -444,9 +473,9 @@ impl Fs for OrgFs { // Parent is an owner dir, name is a repo like "linux". let owner = self .owner_inodes - .get(&parent) - .ok_or(LookupError::InodeNotFound)? - .clone(); + .read_async(&parent, |_, v| v.clone()) + .await + .ok_or(LookupError::InodeNotFound)?; let repo_name_str = name.to_str().ok_or(LookupError::InodeNotFound)?; let full_decoded = format!("{owner}/{repo_name_str}"); let encoded = Self::encode_github_repo_name(&full_decoded); @@ -476,16 +505,12 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::getattr", skip(self), fields(org = %self.name))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.composite.delegated_getattr(ino).await } #[instrument(name = "OrgFs::readdir", skip(self), fields(org = %self.name))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { let role = self.inode_role(ino).ok_or(ReadDirError::InodeNotFound)?; match role { InodeRole::OrgRoot => { @@ -526,8 +551,7 @@ impl Fs for OrgFs { }); } - self.composite.readdir_buf = entries; - Ok(&self.composite.readdir_buf) + Ok(entries) } InodeRole::OwnerDir if self.is_github() => { // TODO(MES-674): Cleanup "special" casing for github. @@ -539,13 +563,13 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::open", skip(self), fields(org = %self.name))] - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result { self.composite.delegated_open(ino, flags).await } #[instrument(name = "OrgFs::read", skip(self), fields(org = %self.name))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -560,7 +584,7 @@ impl Fs for OrgFs { #[instrument(name = "OrgFs::release", skip(self), fields(org = %self.name))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -572,14 +596,14 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::forget", skip(self), fields(org = %self.name))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { let evicted = self.composite.delegated_forget(ino, nlookups).await; if evicted { - self.owner_inodes.remove(&ino); + self.owner_inodes.remove_async(&ino).await; } } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.composite.delegated_statfs()) } } diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 0d22196..03014dc 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -3,13 +3,14 @@ //! This module directly accesses the mesa repo through the Rust SDK, on a per-repo basis. use std::future::Future; -use std::{collections::HashMap, ffi::OsStr, path::PathBuf, time::SystemTime}; +use std::{ffi::OsStr, path::PathBuf, time::SystemTime}; use base64::Engine as _; use bytes::Bytes; use mesa_dev::MesaClient; use mesa_dev::low_level::content::{Content, DirEntry as MesaDirEntry}; use num_traits::cast::ToPrimitive as _; +use scc::HashMap as ConcurrentHashMap; use tracing::{Instrument as _, instrument, trace, warn}; use crate::fs::icache::{AsyncICache, FileTable, IcbResolver}; @@ -181,8 +182,7 @@ pub struct RepoFs { icache: MescloudICache, file_table: FileTable, - readdir_buf: Vec, - open_files: HashMap, + open_files: ConcurrentHashMap, } impl RepoFs { @@ -212,8 +212,7 @@ impl RepoFs { ref_, icache: MescloudICache::new(resolver, Self::ROOT_INO, fs_owner, Self::BLOCK_SIZE), file_table: FileTable::new(), - readdir_buf: Vec::new(), - open_files: HashMap::new(), + open_files: ConcurrentHashMap::new(), } } @@ -277,7 +276,7 @@ impl Fs for RepoFs { type ReleaseError = ReleaseError; #[instrument(name = "RepoFs::lookup", skip(self), fields(repo = %self.repo_name))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { debug_assert!( self.icache.contains(parent), "lookup: parent inode {parent} not in inode table" @@ -300,11 +299,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::getattr", skip(self), fields(repo = %self.repo_name))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.icache.get_attr(ino).await.ok_or_else(|| { warn!(ino, "getattr on unknown inode"); GetAttrError::InodeNotFound @@ -312,7 +307,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::readdir", skip(self), fields(repo = %self.repo_name))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { debug_assert!( self.icache.contains(ino), "readdir: inode {ino} not in inode table" @@ -367,12 +362,11 @@ impl Fs for RepoFs { }); } - self.readdir_buf = entries; - Ok(&self.readdir_buf) + Ok(entries) } #[instrument(name = "RepoFs::open", skip(self), fields(repo = %self.repo_name))] - async fn open(&mut self, ino: Inode, _flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, _flags: OpenFlags) -> Result { if !self.icache.contains(ino) { warn!(ino, "open on unknown inode"); return Err(OpenError::InodeNotFound); @@ -385,7 +379,7 @@ impl Fs for RepoFs { "open: inode {ino} has non-file cached attr" ); let fh = self.file_table.allocate(); - self.open_files.insert(fh, ino); + let _ = self.open_files.insert_async(fh, ino).await; trace!(ino, fh, "assigned file handle"); Ok(OpenFile { handle: fh, @@ -395,7 +389,7 @@ impl Fs for RepoFs { #[instrument(name = "RepoFs::read", skip(self), fields(repo = %self.repo_name))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -403,10 +397,14 @@ impl Fs for RepoFs { _flags: OpenFlags, _lock_owner: Option, ) -> Result { - let &file_ino = self.open_files.get(&fh).ok_or_else(|| { - warn!(fh, "read on unknown file handle"); - ReadError::FileNotOpen - })?; + let file_ino = self + .open_files + .read_async(&fh, |_, &v| v) + .await + .ok_or_else(|| { + warn!(fh, "read on unknown file handle"); + ReadError::FileNotOpen + })?; debug_assert!( file_ino == ino, "read: file handle {fh} maps to inode {file_ino}, but caller passed inode {ino}" @@ -457,13 +455,13 @@ impl Fs for RepoFs { #[instrument(name = "RepoFs::release", skip(self), fields(repo = %self.repo_name))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, _flags: OpenFlags, _flush: bool, ) -> Result<(), ReleaseError> { - let released_ino = self.open_files.remove(&fh).ok_or_else(|| { + let (_, released_ino) = self.open_files.remove_async(&fh).await.ok_or_else(|| { warn!(fh, "release on unknown file handle"); ReleaseError::FileNotOpen })?; @@ -476,7 +474,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::forget", skip(self), fields(repo = %self.repo_name))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { debug_assert!( self.icache.contains(ino), "forget: inode {ino} not in inode table" @@ -485,7 +483,7 @@ impl Fs for RepoFs { self.icache.forget(ino, nlookups).await; } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.icache.statfs()) } } diff --git a/src/fs/trait.rs b/src/fs/trait.rs index f4d9852..c215e26 100644 --- a/src/fs/trait.rs +++ b/src/fs/trait.rs @@ -330,26 +330,26 @@ pub trait Fs { /// For each lookup call made by the kernel, it expects the icache to be updated with the /// returned `FileAttr`. - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result; + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result; /// Can be called in two contexts -- the file is not open (in which case `fh` is `None`), /// or the file is open (in which case `fh` is `Some`). async fn getattr( - &mut self, + &self, ino: Inode, fh: Option, ) -> Result; /// Read the contents of a directory. - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], Self::ReaddirError>; + async fn readdir(&self, ino: Inode) -> Result, Self::ReaddirError>; /// Open a file for reading. - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result; + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result; /// Read data from an open file. #[expect(clippy::too_many_arguments, reason = "mirrors fuser read API")] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -360,7 +360,7 @@ pub trait Fs { /// Called when the kernel closes a file handle. async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -368,8 +368,8 @@ pub trait Fs { ) -> Result<(), Self::ReleaseError>; /// Called when the kernel is done with an inode. - async fn forget(&mut self, ino: Inode, nlookups: u64); + async fn forget(&self, ino: Inode, nlookups: u64); /// Get filesystem statistics. - async fn statfs(&mut self) -> Result; + async fn statfs(&self) -> Result; }