diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 690eb29281..dad10fdef7 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -58,7 +58,7 @@ import kotlin.jvm.* * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved * using the `extraBufferCapacity` parameter. - * + * * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend. @@ -292,11 +292,23 @@ public fun MutableSharedFlow( // ------------------------------------ Implementation ------------------------------------ internal class SharedFlowSlot : AbstractSharedFlowSlot>() { + /** + * The first index in the buffer that was not yet consumed by this collector, + * or -1 if the slot is not allocated to a collector. + */ @JvmField - var index = -1L // current "to-be-emitted" index, -1 means the slot is free now + var index = -1L + /** + * The current continuation of the collector. + * + * This field is used by the collector each time when it can not acquire the next value and suspends while + * waiting for the next one. + * `null` means that either the collector is not suspended but is processing some value, + * or that this slot is not allocated to a collector. + */ @JvmField - var cont: Continuation? = null // collector waiting for new value + var cont: Continuation? = null override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean { if (index >= 0) return false // not free @@ -318,7 +330,7 @@ internal open class SharedFlowImpl( private val replay: Int, private val bufferCapacity: Int, private val onBufferOverflow: BufferOverflow -) : AbstractSharedFlow(), MutableSharedFlow, CancellableFlow, FusibleFlow { +) : AbstractSharedFlow, SharedFlowSlot>(), MutableSharedFlow, CancellableFlow, FusibleFlow { /* Logical structure of the buffer @@ -350,7 +362,7 @@ internal open class SharedFlowImpl( // Stored state private var buffer: Array? = null // allocated when needed, allocated size always power of two - private var replayIndex = 0L // minimal index from which new collector gets values + private var replayIndex = 0L // minimal index from which a new collector gets values private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none private var bufferSize = 0 // number of buffered values private var queueSize = 0 // number of queued emitters @@ -532,36 +544,55 @@ internal open class SharedFlowImpl( return index } - // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock + /** + * Handle a collector having receiving the next value from the buffer or disappearing. + * + * As a result of both, the *index* of the collector--that is, the index of the next value in the buffer this + * collector intends to consume--changes, so the internal invariants need updating. + * + * In addition, it may turn out that either + * - This collector was the slowest one, and now that it has processed a value in the buffer, + * the buffer has one more free slot to accommodate more values, so new emitters can be resumed. + * - This collector was the last one, and now that it's gone, emitters don't need to distribute values among + * collectors anymore and can all continue executing immediately. + * + * In both cases, an array of continuations of the correct number of waiting emitters is returned + * so that they can be resumed. + */ internal fun updateCollectorIndexLocked(oldIndex: Long): Array?> { assert { oldIndex >= minCollectorIndex } if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min - // start computing new minimal index of active collectors + // start computing the new minimal index of active collectors val head = head - var newMinCollectorIndex = head + bufferSize + var newMinCollectorIndex = head + bufferSize // = this.bufferEndIndex // take into account a special case of sync shared flow that can go past 1st queued emitter if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++ forEachSlotLocked { slot -> - @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend - if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index + if (slot.index in 0..= minCollectorIndex } // can only grow if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes - // Compute new buffer size if we drop items we no longer need and no emitter is resumed: - // We must keep all the items from newMinIndex to the end of buffer - var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed + /* The new minimal resumer index has increased, so new emitters can be resumed, + * with their values now moving to the buffer. */ + // Initially, `bufferEndIndex`, but later, grown for every resumed emitter. + var newBufferEndIndex = bufferEndIndex + // Represents how many emitters we can awaken at most. val maxResumeCount = if (nCollectors > 0) { - // If we have collectors we can resume up to maxResumeCount waiting emitters - // a) queueSize -> that's how many waiting emitters we have - // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity + // Represents how many elements would be in the buffer if we didn't resume any emitters. val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt() + /** a) We can't awaken more than [queueSize] emitters, as only that many have registered. + b) If we awaken more than `bufferCapacity - newBufferSize0` emitters, we'll exceed [bufferCapacity]. */ minOf(queueSize, bufferCapacity - newBufferSize0) } else { - // If we don't have collectors anymore we must resume all waiting emitters - queueSize // that's how many waiting emitters we have (at most) + // If we don't have collectors anymore, we must resume all waiting emitters and drop the buffer. + /** [queueSize] is the number of emitters that registered (there may, in fact, be fewer than that). */ + queueSize } + /* Walk through at most `maxResumeCount` emitters, moving their values to the buffer, + * zeroing out the cells the emitters were occupying, and storing their continuations in an array that + * will be returned from this function and processed once the lock gets released. */ var resumes: Array?> = EMPTY_RESUMES - val newQueueEndIndex = newBufferEndIndex + queueSize + val newQueueEndIndex = newBufferEndIndex + queueSize // = this.queueEndIndex if (maxResumeCount > 0) { // collect emitters to resume if we have them resumes = arrayOfNulls(maxResumeCount) var resumeCount = 0 @@ -578,14 +609,10 @@ internal open class SharedFlowImpl( } } } - // Compute new buffer size -> how many values we now actually have after resume + /** Represents how many values are stored in the buffer after processing the emitters. + * The value may be larger than [bufferCapacity] if [nCollectors] was 0, + * but the `minOf(replay)` will clamp this value to a valid one, since `replay <= bufferCapacity`. */ val newBufferSize1 = (newBufferEndIndex - head).toInt() - // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity, - // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by - // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be - // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1) - // expression, which coerces values that are too big anyway. - if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1)) // adjustment for synchronous case with cancelled emitter (NO_VALUE) @@ -593,6 +620,10 @@ internal open class SharedFlowImpl( newBufferEndIndex++ newReplayIndex++ } + /** If [nCollectors] was 0, there are no active collector slots, + * and the invariant dictates that [minCollectorIndex] should be [bufferEndIndex] (`replayIndex + bufferSize`). + * Ensure this directly. */ + if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Update buffer state updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex) // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now @@ -619,6 +650,7 @@ internal open class SharedFlowImpl( bufferSize = (newBufferEndIndex - newHead).toInt() queueSize = (newQueueEndIndex - newBufferEndIndex).toInt() // check our key invariants (just in case) + assert { bufferSize <= bufferCapacity } assert { bufferSize >= 0 } assert { queueSize >= 0 } assert { replayIndex <= this.head + bufferSize } @@ -713,7 +745,7 @@ internal open class SharedFlowImpl( override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow) - + private class Emitter( @JvmField val flow: SharedFlowImpl<*>, @JvmField var index: Long, diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 981b9e5e94..ad63971728 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -312,7 +312,7 @@ private class StateFlowSlot : AbstractSharedFlowSlot>() { @OptIn(ExperimentalForInheritanceCoroutinesApi::class) private class StateFlowImpl( initialState: Any // T | NULL -) : AbstractSharedFlow(), MutableStateFlow, CancellableFlow, FusibleFlow { +) : AbstractSharedFlow, StateFlowSlot>(), MutableStateFlow, CancellableFlow, FusibleFlow { private val _state = atomic(initialState) // T | NULL private var sequence = 0 // serializes updates, value update is in process when sequence is odd diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt index 6831ad7d72..d18e7bfe5f 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt @@ -10,19 +10,66 @@ import kotlin.jvm.* @JvmField internal val EMPTY_RESUMES = arrayOfNulls?>(0) +/** + * A slot allocated to a collector when it subscribes to a shared flow and freed when the collector unsubscribes. + */ internal abstract class AbstractSharedFlowSlot { + /** + * Try marking this slot as allocated for the given [flow]. Only call this under the [flow]'s lock. + * + * Returns `false` if the slot is already allocated to some other collector. + */ abstract fun allocateLocked(flow: F): Boolean - abstract fun freeLocked(flow: F): Array?> // returns continuations to resume after lock + + /** + * Mark this slot as available for reuse. Only call this under the [flow]'s lock. + * + * Returns an array of continuations that need to be resumed after the lock is released. + * These continuations represent suspended emitters that were waiting for the slowest collector to move on + * so that the next value can be placed into the buffer. + */ + abstract fun freeLocked(flow: F): Array?> } -internal abstract class AbstractSharedFlow> : SynchronizedObject() { - protected var slots: Array? = null // allocated when needed +/** + * A common data structure for `StateFlow` and `SharedFlow`. + */ +internal abstract class AbstractSharedFlow> : SynchronizedObject() { + /** + * Array of slots for collectors of the shared flow. + * + * `null` by default, created on demand. + * Each cell is also `null` by default, and the specific slot object is [created][createSlot] on demand. + * The whole array being `null` or a cell being `null` is equivalent to the cell not being + * [*allocated*][AbstractSharedFlowSlot.allocateLocked]--not to be confused with memory allocation, this means + * that a specific collector inhabits the slot. + */ + protected var slots: Array? = null private set - protected var nCollectors = 0 // number of allocated (!free) slots + + /** + * The number of [*allocated*][AbstractSharedFlowSlot.allocateLocked] slots in [slots]. + */ + protected var nCollectors = 0 private set - private var nextIndex = 0 // oracle for the next free slot index - private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need + /** + * A good starting index for looking for the next non-*allocated* slot in [slots]. + * + * It is not guaranteed that this slot will not be *allocated*, nor is it guaranteed that it will be the first + * non-*allocated* slot. + * This is just a heuristic to have a better guess in common scenarios. + */ + private var nextIndex = 0 + + /** + * The backing field for [subscriptionCount]. + * + * Will not be initialized until [subscriptionCount] is accessed for the first time. + */ + private var _subscriptionCount: SubscriptionCountStateFlow? = null + + /** A `StateFlow` representing [nCollectors], potentially with some delay. A user-visible API. */ val subscriptionCount: StateFlow get() = synchronized(this) { // allocate under lock in sync with nCollectors variable @@ -31,11 +78,13 @@ internal abstract class AbstractSharedFlow> : Sync } } + /** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */ protected abstract fun createSlot(): S + /** Equivalent to [arrayOfNulls]. */ protected abstract fun createSlotArray(size: Int): Array - @Suppress("UNCHECKED_CAST") + /** Register a new collector and return its newly allocated slot. A slot may be [created][createSlot] or reused. */ protected fun allocateSlot(): S { // Actually create slot under lock val subscriptionCount: SubscriptionCountStateFlow? @@ -54,7 +103,8 @@ internal abstract class AbstractSharedFlow> : Sync slot = slots[index] ?: createSlot().also { slots[index] = it } index++ if (index >= slots.size) index = 0 - if ((slot as AbstractSharedFlowSlot).allocateLocked(this)) break // break when found and allocated free slot + @Suppress("UNCHECKED_CAST") + if (slot.allocateLocked(this as This)) break // break when found and allocated free slot } nextIndex = index nCollectors++ @@ -66,7 +116,7 @@ internal abstract class AbstractSharedFlow> : Sync return slot } - @Suppress("UNCHECKED_CAST") + /** Deregisters a collector and marks its slot as available for reuse. */ protected fun freeSlot(slot: S) { // Release slot under lock val subscriptionCount: SubscriptionCountStateFlow? @@ -75,7 +125,8 @@ internal abstract class AbstractSharedFlow> : Sync subscriptionCount = _subscriptionCount // retrieve under lock if initialized // Reset next index oracle if we have no more active collectors for more predictable behavior next time if (nCollectors == 0) nextIndex = 0 - (slot as AbstractSharedFlowSlot).freeLocked(this) + @Suppress("UNCHECKED_CAST") + slot.freeLocked(this as This) } /* * Resume suspended coroutines. diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt index 1f7c22d9c0..e8b347dbd9 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.random.* import kotlin.test.* +import kotlin.time.Duration.Companion.milliseconds /** * This test suite contains some basic tests for [SharedFlow]. There are some scenarios here written @@ -820,6 +821,37 @@ class SharedFlowTest : TestBase() { fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest { testSubscriptionByFirstSuspensionInCollect(MutableSharedFlow()) { emit(it) } } + + /** Tests that cancelling a subscriber and an emitter simultaneously + * does not break the invariants of a [SharedFlow] with no buffer. */ + @Test + fun testCancellingSubscriberAndEmitterWithNoBuffer() = runTest { + val mutableSharedFlow = MutableSharedFlow() + repeat(10) { + launch { + val valueObtained = CompletableDeferred() + val j1 = launch { + mutableSharedFlow.collect { + valueObtained.complete(Unit) + } + } + val j2 = + launch { + while (isActive) { + mutableSharedFlow.emit(Unit) + } + } + val successfullyAwaited = withTimeoutOrNull(100.milliseconds) { + valueObtained.await() + } + if (successfullyAwaited == null) { + throw AssertionError("The collector failed to obtain a value from a non-empty shared flow") + } + j1.cancel() + j2.cancel() + }.join() + } + } } /**