From 7e5077eb9839853110d5e2435aa1bde34c0b7878 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Dec 2025 13:05:05 +0100 Subject: [PATCH 1/3] Remove an outdated workaround for a compiler bug Diff in the JS code (above is the result before the change): diff build?/compileSync/js/test/testDevelopmentExecutable/kotlin/kotlinx-coroutines-core.js 13594c13594,13595 < if (compare(element.index_1, new Long(0, 0)) >= 0 && compare(element.index_1, newMinCollectorIndex) < 0) --- > var containsArg = element.index_1; > if (compare(new Long(0, 0), containsArg) <= 0 ? compare(containsArg, newMinCollectorIndex) < 0 : false) --- kotlinx-coroutines-core/common/src/flow/SharedFlow.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 690eb29281..9b86d00800 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -542,8 +542,7 @@ internal open class SharedFlowImpl( // take into account a special case of sync shared flow that can go past 1st queued emitter if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++ forEachSlotLocked { slot -> - @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend - if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index + if (slot.index in 0..= minCollectorIndex } // can only grow if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes From 0352ccafae54949ae31b4124bb95c5d0dab9a67d Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Dec 2025 16:29:10 +0100 Subject: [PATCH 2/3] Improve the documentation of SharedFlow internals --- .../common/src/flow/SharedFlow.kt | 81 +++++++++++++------ .../common/src/flow/StateFlow.kt | 2 +- .../src/flow/internal/AbstractSharedFlow.kt | 71 +++++++++++++--- 3 files changed, 119 insertions(+), 35 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 9b86d00800..42996639b8 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -58,7 +58,7 @@ import kotlin.jvm.* * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved * using the `extraBufferCapacity` parameter. - * + * * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend. @@ -292,11 +292,23 @@ public fun MutableSharedFlow( // ------------------------------------ Implementation ------------------------------------ internal class SharedFlowSlot : AbstractSharedFlowSlot>() { + /** + * The first index in the buffer that was not yet consumed by this collector, + * or -1 if the slot is not allocated to a collector. + */ @JvmField - var index = -1L // current "to-be-emitted" index, -1 means the slot is free now + var index = -1L + /** + * The current continuation of the collector. + * + * This field is used by the collector each time when it can not acquire the next value and suspends while + * waiting for the next one. + * `null` means that either the collector is not suspended but is processing some value, + * or that this slot is not allocated to a collector. + */ @JvmField - var cont: Continuation? = null // collector waiting for new value + var cont: Continuation? = null override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean { if (index >= 0) return false // not free @@ -318,7 +330,7 @@ internal open class SharedFlowImpl( private val replay: Int, private val bufferCapacity: Int, private val onBufferOverflow: BufferOverflow -) : AbstractSharedFlow(), MutableSharedFlow, CancellableFlow, FusibleFlow { +) : AbstractSharedFlow, SharedFlowSlot>(), MutableSharedFlow, CancellableFlow, FusibleFlow { /* Logical structure of the buffer @@ -350,7 +362,7 @@ internal open class SharedFlowImpl( // Stored state private var buffer: Array? = null // allocated when needed, allocated size always power of two - private var replayIndex = 0L // minimal index from which new collector gets values + private var replayIndex = 0L // minimal index from which a new collector gets values private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none private var bufferSize = 0 // number of buffered values private var queueSize = 0 // number of queued emitters @@ -532,13 +544,27 @@ internal open class SharedFlowImpl( return index } - // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock + /** + * Handle a collector having receiving the next value from the buffer or disappearing. + * + * As a result of both, the *index* of the collector--that is, the index of the next value in the buffer this + * collector intends to consume--changes, so the internal invariants need updating. + * + * In addition, it may turn out that either + * - This collector was the slowest one, and now that it has processed a value in the buffer, + * the buffer has one more free slot to accommodate more values, so new emitters can be resumed. + * - This collector was the last one, and now that it's gone, emitters don't need to distribute values among + * collectors anymore and can all continue executing immediately. + * + * In both cases, an array of continuations of the correct number of waiting emitters is returned + * so that they can be resumed. + */ internal fun updateCollectorIndexLocked(oldIndex: Long): Array?> { assert { oldIndex >= minCollectorIndex } if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min - // start computing new minimal index of active collectors + // start computing the new minimal index of active collectors val head = head - var newMinCollectorIndex = head + bufferSize + var newMinCollectorIndex = head + bufferSize // = this.bufferEndIndex // take into account a special case of sync shared flow that can go past 1st queued emitter if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++ forEachSlotLocked { slot -> @@ -546,21 +572,27 @@ internal open class SharedFlowImpl( } assert { newMinCollectorIndex >= minCollectorIndex } // can only grow if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes - // Compute new buffer size if we drop items we no longer need and no emitter is resumed: - // We must keep all the items from newMinIndex to the end of buffer - var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed + /* The new minimal resumer index has increased, so new emitters can be resumed, + * with their values now moving to the buffer. */ + // Initially, `bufferEndIndex`, but later, grown for every resumed emitter. + var newBufferEndIndex = bufferEndIndex + // Represents how many emitters we can awaken at most. val maxResumeCount = if (nCollectors > 0) { - // If we have collectors we can resume up to maxResumeCount waiting emitters - // a) queueSize -> that's how many waiting emitters we have - // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity + // Represents how many elements would be in the buffer if we didn't resume any emitters. val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt() + /** a) We can't awaken more than [queueSize] emitters, as only that many have registered. + b) If we awaken more than `bufferCapacity - newBufferSize0` emitters, we'll exceed [bufferCapacity]. */ minOf(queueSize, bufferCapacity - newBufferSize0) } else { - // If we don't have collectors anymore we must resume all waiting emitters - queueSize // that's how many waiting emitters we have (at most) + // If we don't have collectors anymore, we must resume all waiting emitters and drop the buffer. + /** [queueSize] is the number of emitters that registered (there may, in fact, be fewer than that). */ + queueSize } + /* Walk through at most `maxResumeCount` emitters, moving their values to the buffer, + * zeroing out the cells the emitters were occupying, and storing their continuations in an array that + * will be returned from this function and processed once the lock gets released. */ var resumes: Array?> = EMPTY_RESUMES - val newQueueEndIndex = newBufferEndIndex + queueSize + val newQueueEndIndex = newBufferEndIndex + queueSize // = this.queueEndIndex if (maxResumeCount > 0) { // collect emitters to resume if we have them resumes = arrayOfNulls(maxResumeCount) var resumeCount = 0 @@ -577,13 +609,13 @@ internal open class SharedFlowImpl( } } } - // Compute new buffer size -> how many values we now actually have after resume + /** Represents how many values are stored in the buffer after processing the emitters. + * The value may be larger than [bufferCapacity] if [nCollectors] was 0, + * but the `minOf(replay)` will clamp this value to a valid one, since `replay <= bufferCapacity`. */ val newBufferSize1 = (newBufferEndIndex - head).toInt() - // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity, - // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by - // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be - // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1) - // expression, which coerces values that are too big anyway. + /** If [nCollectors] was 0, there are no active collector slots, + * and the invariant dictates that [minCollectorIndex] should be [bufferEndIndex] (`replayIndex + bufferSize`). + * Ensure this directly. */ if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1)) @@ -618,6 +650,7 @@ internal open class SharedFlowImpl( bufferSize = (newBufferEndIndex - newHead).toInt() queueSize = (newQueueEndIndex - newBufferEndIndex).toInt() // check our key invariants (just in case) + assert { bufferSize <= bufferCapacity } assert { bufferSize >= 0 } assert { queueSize >= 0 } assert { replayIndex <= this.head + bufferSize } @@ -712,7 +745,7 @@ internal open class SharedFlowImpl( override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow) - + private class Emitter( @JvmField val flow: SharedFlowImpl<*>, @JvmField var index: Long, diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 981b9e5e94..ad63971728 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -312,7 +312,7 @@ private class StateFlowSlot : AbstractSharedFlowSlot>() { @OptIn(ExperimentalForInheritanceCoroutinesApi::class) private class StateFlowImpl( initialState: Any // T | NULL -) : AbstractSharedFlow(), MutableStateFlow, CancellableFlow, FusibleFlow { +) : AbstractSharedFlow, StateFlowSlot>(), MutableStateFlow, CancellableFlow, FusibleFlow { private val _state = atomic(initialState) // T | NULL private var sequence = 0 // serializes updates, value update is in process when sequence is odd diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt index 6831ad7d72..d18e7bfe5f 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt @@ -10,19 +10,66 @@ import kotlin.jvm.* @JvmField internal val EMPTY_RESUMES = arrayOfNulls?>(0) +/** + * A slot allocated to a collector when it subscribes to a shared flow and freed when the collector unsubscribes. + */ internal abstract class AbstractSharedFlowSlot { + /** + * Try marking this slot as allocated for the given [flow]. Only call this under the [flow]'s lock. + * + * Returns `false` if the slot is already allocated to some other collector. + */ abstract fun allocateLocked(flow: F): Boolean - abstract fun freeLocked(flow: F): Array?> // returns continuations to resume after lock + + /** + * Mark this slot as available for reuse. Only call this under the [flow]'s lock. + * + * Returns an array of continuations that need to be resumed after the lock is released. + * These continuations represent suspended emitters that were waiting for the slowest collector to move on + * so that the next value can be placed into the buffer. + */ + abstract fun freeLocked(flow: F): Array?> } -internal abstract class AbstractSharedFlow> : SynchronizedObject() { - protected var slots: Array? = null // allocated when needed +/** + * A common data structure for `StateFlow` and `SharedFlow`. + */ +internal abstract class AbstractSharedFlow> : SynchronizedObject() { + /** + * Array of slots for collectors of the shared flow. + * + * `null` by default, created on demand. + * Each cell is also `null` by default, and the specific slot object is [created][createSlot] on demand. + * The whole array being `null` or a cell being `null` is equivalent to the cell not being + * [*allocated*][AbstractSharedFlowSlot.allocateLocked]--not to be confused with memory allocation, this means + * that a specific collector inhabits the slot. + */ + protected var slots: Array? = null private set - protected var nCollectors = 0 // number of allocated (!free) slots + + /** + * The number of [*allocated*][AbstractSharedFlowSlot.allocateLocked] slots in [slots]. + */ + protected var nCollectors = 0 private set - private var nextIndex = 0 // oracle for the next free slot index - private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need + /** + * A good starting index for looking for the next non-*allocated* slot in [slots]. + * + * It is not guaranteed that this slot will not be *allocated*, nor is it guaranteed that it will be the first + * non-*allocated* slot. + * This is just a heuristic to have a better guess in common scenarios. + */ + private var nextIndex = 0 + + /** + * The backing field for [subscriptionCount]. + * + * Will not be initialized until [subscriptionCount] is accessed for the first time. + */ + private var _subscriptionCount: SubscriptionCountStateFlow? = null + + /** A `StateFlow` representing [nCollectors], potentially with some delay. A user-visible API. */ val subscriptionCount: StateFlow get() = synchronized(this) { // allocate under lock in sync with nCollectors variable @@ -31,11 +78,13 @@ internal abstract class AbstractSharedFlow> : Sync } } + /** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */ protected abstract fun createSlot(): S + /** Equivalent to [arrayOfNulls]. */ protected abstract fun createSlotArray(size: Int): Array - @Suppress("UNCHECKED_CAST") + /** Register a new collector and return its newly allocated slot. A slot may be [created][createSlot] or reused. */ protected fun allocateSlot(): S { // Actually create slot under lock val subscriptionCount: SubscriptionCountStateFlow? @@ -54,7 +103,8 @@ internal abstract class AbstractSharedFlow> : Sync slot = slots[index] ?: createSlot().also { slots[index] = it } index++ if (index >= slots.size) index = 0 - if ((slot as AbstractSharedFlowSlot).allocateLocked(this)) break // break when found and allocated free slot + @Suppress("UNCHECKED_CAST") + if (slot.allocateLocked(this as This)) break // break when found and allocated free slot } nextIndex = index nCollectors++ @@ -66,7 +116,7 @@ internal abstract class AbstractSharedFlow> : Sync return slot } - @Suppress("UNCHECKED_CAST") + /** Deregisters a collector and marks its slot as available for reuse. */ protected fun freeSlot(slot: S) { // Release slot under lock val subscriptionCount: SubscriptionCountStateFlow? @@ -75,7 +125,8 @@ internal abstract class AbstractSharedFlow> : Sync subscriptionCount = _subscriptionCount // retrieve under lock if initialized // Reset next index oracle if we have no more active collectors for more predictable behavior next time if (nCollectors == 0) nextIndex = 0 - (slot as AbstractSharedFlowSlot).freeLocked(this) + @Suppress("UNCHECKED_CAST") + slot.freeLocked(this as This) } /* * Resume suspended coroutines. From 9cb2a05b112eabcfe123a7ce56e72c4d0f5d94a7 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Dec 2025 17:18:19 +0100 Subject: [PATCH 3/3] Fix `SharedFlow` entering an invalid state Fixes #4583 See the `SharedFlow.testCancellingSubscriberAndEmitterWithNoBuffer` test for the problematic scenario, which may arise when the next emitter and the last collector get cancelled simultaneously. After the first iteration of the test, the internal state of the `SharedFlow` was: * buffer = [null, NO_VALUE] * replayIndex = 2 * minCollectorIndex = 1 * bufferSize = 1 * queueSize = 0 * nCollectors = 0 * nextIndex = 0 This is clearly invalid, as `bufferSize` can not be anything but 0 with the default `MutableSharedFlow` configuration. The problem was within the logic that handles unsubscription from a `SharedFlow` without a buffer if the first emitter is also cancelled. There were two separate checks for the cancellation case-- importantly, in that order: - If the last collector was cancelled, the part of the buffer outside the replay buffer was erased, since no one was going to consume it. This was achieved by ordering the start of the buffer to be at *right buffer boundary*. - If the next emitter was cancelled *and* the buffer is disabled, the *right boundary* of the empty buffer was moved past the cancelled emitter. Taken together, these two checks conspired to break the invariants: if *both* conditions are true, then first, the buffer was set to be empty, and then, its right boundary was shifted, resulting in a non-empty buffer when it should be impossible logically. --- .../common/src/flow/SharedFlow.kt | 8 ++--- .../test/flow/sharing/SharedFlowTest.kt | 32 +++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 42996639b8..dad10fdef7 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -613,10 +613,6 @@ internal open class SharedFlowImpl( * The value may be larger than [bufferCapacity] if [nCollectors] was 0, * but the `minOf(replay)` will clamp this value to a valid one, since `replay <= bufferCapacity`. */ val newBufferSize1 = (newBufferEndIndex - head).toInt() - /** If [nCollectors] was 0, there are no active collector slots, - * and the invariant dictates that [minCollectorIndex] should be [bufferEndIndex] (`replayIndex + bufferSize`). - * Ensure this directly. */ - if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1)) // adjustment for synchronous case with cancelled emitter (NO_VALUE) @@ -624,6 +620,10 @@ internal open class SharedFlowImpl( newBufferEndIndex++ newReplayIndex++ } + /** If [nCollectors] was 0, there are no active collector slots, + * and the invariant dictates that [minCollectorIndex] should be [bufferEndIndex] (`replayIndex + bufferSize`). + * Ensure this directly. */ + if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Update buffer state updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex) // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt index 1f7c22d9c0..e8b347dbd9 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.random.* import kotlin.test.* +import kotlin.time.Duration.Companion.milliseconds /** * This test suite contains some basic tests for [SharedFlow]. There are some scenarios here written @@ -820,6 +821,37 @@ class SharedFlowTest : TestBase() { fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest { testSubscriptionByFirstSuspensionInCollect(MutableSharedFlow()) { emit(it) } } + + /** Tests that cancelling a subscriber and an emitter simultaneously + * does not break the invariants of a [SharedFlow] with no buffer. */ + @Test + fun testCancellingSubscriberAndEmitterWithNoBuffer() = runTest { + val mutableSharedFlow = MutableSharedFlow() + repeat(10) { + launch { + val valueObtained = CompletableDeferred() + val j1 = launch { + mutableSharedFlow.collect { + valueObtained.complete(Unit) + } + } + val j2 = + launch { + while (isActive) { + mutableSharedFlow.emit(Unit) + } + } + val successfullyAwaited = withTimeoutOrNull(100.milliseconds) { + valueObtained.await() + } + if (successfullyAwaited == null) { + throw AssertionError("The collector failed to obtain a value from a non-empty shared flow") + } + j1.cancel() + j2.cancel() + }.join() + } + } } /**