Skip to content

Conversation

@dkhalanskyjb
Copy link
Collaborator

Fixes #4583

Diff in the JS code (above is the result before the change):

diff build?/compileSync/js/test/testDevelopmentExecutable/kotlin/kotlinx-coroutines-core.js
13594c13594,13595
<             if (compare(element.index_1, new Long(0, 0)) >= 0 && compare(element.index_1, newMinCollectorIndex) < 0)
---
>             var containsArg = element.index_1;
>             if (compare(new Long(0, 0), containsArg) <= 0 ? compare(containsArg, newMinCollectorIndex) < 0 : false)
Fixes #4583

See the `SharedFlow.testCancellingSubscriberAndEmitterWithNoBuffer`
test for the problematic scenario, which may arise when
the next emitter and the last collector get cancelled simultaneously.

After the first iteration of the test,
the internal state of the `SharedFlow` was:

* buffer = [null, NO_VALUE]
* replayIndex = 2
* minCollectorIndex = 1
* bufferSize = 1
* queueSize = 0
* nCollectors = 0
* nextIndex = 0

This is clearly invalid, as `bufferSize` can not be anything but 0
with the default `MutableSharedFlow` configuration.

The problem was within the logic that handles unsubscription from
a `SharedFlow` without a buffer if the first emitter is also
cancelled.

There were two separate checks for the cancellation case--
importantly, in that order:
- If the last collector was cancelled,
  the part of the buffer outside the replay buffer was erased,
  since no one was going to consume it.
  This was achieved by ordering the start of the buffer to be
  at *right buffer boundary*.
- If the next emitter was cancelled *and* the buffer is disabled,
  the *right boundary* of the empty buffer was moved past the
  cancelled emitter.

Taken together, these two checks conspired to break the invariants:
if *both* conditions are true, then first, the buffer was set to be
empty, and then, its right boundary was shifted, resulting in a
non-empty buffer when it should be impossible logically.
/**
* A common data structure for `StateFlow` and `SharedFlow`.
*/
internal abstract class AbstractSharedFlow<This, S : AbstractSharedFlowSlot<This>> : SynchronizedObject() {
Copy link
Contributor

@LouisCAD LouisCAD Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is This a new Kotlin feature to refer to the type of the instance's subclass?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only a generic parameter name. Could as well be named T. That said, I was also startled when I saw This. Possibly rename into something less misleading.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, you can read it as T, I just decided to use a more accurate name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just love Github's time travelling machine :)

* `null` by default, created on demand.
* Each cell is also `null` by default, and the specific slot object is [created][createSlot] on demand.
* The whole array being `null` or a cell being `null` is equivalent to the cell not being
* [*allocated*][AbstractSharedFlowSlot.allocateLocked]--not to be confused with memory allocation, this means
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocate/free slot was misleading naming, but at least it was consistent.

The current snapshot of all comments uses: register, inhabit, allocate to mean allocate.

Consider picking one term to describe comments. Possibly even replace it in code as well, so there's no need for a lengthy explanation.

Here's some options:
inhabit / free
register / deregister (a bit tricky because you can both register a collector and a slot)
inhabited slot / registered slot / occupied slot / assigned slot

(Another reason for this is because "allocate" used in its true sense at least twice in this file, on L81 and L87)

protected abstract fun createSlotArray(size: Int): Array<S?>

@Suppress("UNCHECKED_CAST")
/** Register a new collector and return its newly allocated slot. A slot may be [created][createSlot] or reused. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"may be either"

val resumes = synchronized(this) {
nCollectors--
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
// Reset next index oracle if we have no more active collectors for more predictable behavior next time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you removed the other mention/introduction of the term oracle, so now this comment is less clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word oracle was not the best choice, but it was used in the same sense as "hint" in C++ map.emplace_hint. No action, just saying.

// take into account a special case of sync shared flow that can go past 1st queued emitter
if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
forEachSlotLocked { slot ->
@Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you get rid of this now irrelevant suppress in all of the codebase (1 other usage)

/** A `StateFlow` representing [nCollectors], potentially with some delay. A user-visible API. */
val subscriptionCount: StateFlow<Int>
get() = synchronized(this) {
// allocate under lock in sync with nCollectors variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocate -> initialize

}
}

/** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the function's meaning but I can't parse "implementation-representation"

}
}

/** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid "allocate" since it's implementation defined where the implementer gets their slots from. There may not be any allocations. Also, I'm not sure if it is common to refer to memory allocations in the JVM world?

/**
* A common data structure for `StateFlow` and `SharedFlow`.
*/
internal abstract class AbstractSharedFlow<This, S : AbstractSharedFlowSlot<This>> : SynchronizedObject() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only a generic parameter name. Could as well be named T. That said, I was also startled when I saw This. Possibly rename into something less misleading.

Comment on lines +56 to +63
/**
* A good starting index for looking for the next non-*allocated* slot in [slots].
*
* It is not guaranteed that this slot will not be *allocated*, nor is it guaranteed that it will be the first
* non-*allocated* slot.
* This is just a heuristic to have a better guess in common scenarios.
*/
private var nextIndex = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says that nextIndex is a "good starting index" / "heuristics" / "guess" but doesn't say how this heuristic is used and what this even means. It does provide two options of what it is definitely not, which is useful, but just would be nice to define the object.

The next collector will be assigned to the first free slot at nextIndex or on the cyclical right of nextIndex.

import kotlin.jvm.*

@JvmField
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First batch of comments. Feel free to address now or wait for the rest to come.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants