-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Open
Labels
A-networkingRelated to networking in generalRelated to networking in generalC-enhancementNew feature or requestNew feature or requestC-perfA change motivated by improving speed, memory usage or disk footprintA change motivated by improving speed, memory usage or disk footprintD-good-first-issueNice and easy! A great choice to get startedNice and easy! A great choice to get startedM-prevent-stalePrevents old inactive issues/PRs from being closed due to inactivityPrevents old inactive issues/PRs from being closed due to inactivity
Description
Describe the feature
the sink impl for
reth/crates/net/eth-wire/src/multiplex.rs
Lines 579 to 585 in bcf4f1b
| impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary> | |
| where | |
| St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin, | |
| Primary: Sink<T> + Unpin, | |
| P2PStreamError: Into<<Primary as Sink<T>>::Error>, | |
| { | |
| type Error = <Primary as Sink<T>>::Error; |
has a smol flaw that could potentially starve rlpx satellite impls.
because this will always bypass the satellite messages that are currently buffered
reth/crates/net/eth-wire/src/multiplex.rs
Lines 587 to 600 in bcf4f1b
| fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
| let this = self.get_mut(); | |
| if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) { | |
| return Poll::Ready(Err(err.into())) | |
| } | |
| if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) { | |
| return Poll::Ready(Err(err)) | |
| } | |
| Poll::Ready(Ok(())) | |
| } | |
| fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | |
| self.get_mut().primary.st.start_send_unpin(item) | |
| } |
to fix this we should move the sink logic embedded in the stream impl into the sink impl
reth/crates/net/eth-wire/src/multiplex.rs
Lines 469 to 479 in bcf4f1b
| loop { | |
| match this.inner.conn.poll_ready_unpin(cx) { | |
| Poll::Ready(Ok(())) => { | |
| if let Some(msg) = this.inner.out_buffer.pop_front() { | |
| if let Err(err) = this.inner.conn.start_send_unpin(msg) { | |
| return Poll::Ready(Some(Err(err.into()))) | |
| } | |
| } else { | |
| break | |
| } | |
| } |
this way poll_ready should always drain the output buffer of satellite msgs first treating all messages fcfs
FYI @0xvanbeethoven @Will-Smith11
Additional context
No response
0xvanbeethoven
Metadata
Metadata
Assignees
Labels
A-networkingRelated to networking in generalRelated to networking in generalC-enhancementNew feature or requestNew feature or requestC-perfA change motivated by improving speed, memory usage or disk footprintA change motivated by improving speed, memory usage or disk footprintD-good-first-issueNice and easy! A great choice to get startedNice and easy! A great choice to get startedM-prevent-stalePrevents old inactive issues/PRs from being closed due to inactivityPrevents old inactive issues/PRs from being closed due to inactivity
Type
Projects
Status
Backlog