Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import kotlin.jvm.*
* get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
* can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
* using the `extraBufferCapacity` parameter.
*
*
* A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using
* the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other
* than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend.
Expand Down Expand Up @@ -292,11 +292,23 @@ public fun <T> MutableSharedFlow(
// ------------------------------------ Implementation ------------------------------------

internal class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
/**
* The first index in the buffer that was not yet consumed by this collector,
* or -1 if the slot is not allocated to a collector.
*/
@JvmField
var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
var index = -1L

/**
* The current continuation of the collector.
*
* This field is used by the collector each time when it can not acquire the next value and suspends while
* waiting for the next one.
* `null` means that either the collector is not suspended but is processing some value,
* or that this slot is not allocated to a collector.
*/
@JvmField
var cont: Continuation<Unit>? = null // collector waiting for new value
var cont: Continuation<Unit>? = null

override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
if (index >= 0) return false // not free
Expand All @@ -318,7 +330,7 @@ internal open class SharedFlowImpl<T>(
private val replay: Int,
private val bufferCapacity: Int,
private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
) : AbstractSharedFlow<SharedFlowImpl<*>, SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
/*
Logical structure of the buffer

Expand Down Expand Up @@ -350,7 +362,7 @@ internal open class SharedFlowImpl<T>(

// Stored state
private var buffer: Array<Any?>? = null // allocated when needed, allocated size always power of two
private var replayIndex = 0L // minimal index from which new collector gets values
private var replayIndex = 0L // minimal index from which a new collector gets values
private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none
private var bufferSize = 0 // number of buffered values
private var queueSize = 0 // number of queued emitters
Expand Down Expand Up @@ -532,36 +544,55 @@ internal open class SharedFlowImpl<T>(
return index
}

// Is called when a collector disappears or changes index, returns a list of continuations to resume after lock
/**
* Handle a collector having receiving the next value from the buffer or disappearing.
*
* As a result of both, the *index* of the collector--that is, the index of the next value in the buffer this
* collector intends to consume--changes, so the internal invariants need updating.
*
* In addition, it may turn out that either
* - This collector was the slowest one, and now that it has processed a value in the buffer,
* the buffer has one more free slot to accommodate more values, so new emitters can be resumed.
* - This collector was the last one, and now that it's gone, emitters don't need to distribute values among
* collectors anymore and can all continue executing immediately.
*
* In both cases, an array of continuations of the correct number of waiting emitters is returned
* so that they can be resumed.
*/
internal fun updateCollectorIndexLocked(oldIndex: Long): Array<Continuation<Unit>?> {
assert { oldIndex >= minCollectorIndex }
if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min
// start computing new minimal index of active collectors
// start computing the new minimal index of active collectors
val head = head
var newMinCollectorIndex = head + bufferSize
var newMinCollectorIndex = head + bufferSize // = this.bufferEndIndex
// take into account a special case of sync shared flow that can go past 1st queued emitter
if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
forEachSlotLocked { slot ->
@Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you get rid of this now irrelevant suppress in all of the codebase (1 other usage)

if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index
if (slot.index in 0..<newMinCollectorIndex) newMinCollectorIndex = slot.index
}
assert { newMinCollectorIndex >= minCollectorIndex } // can only grow
if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes
// Compute new buffer size if we drop items we no longer need and no emitter is resumed:
// We must keep all the items from newMinIndex to the end of buffer
var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed
/* The new minimal resumer index has increased, so new emitters can be resumed,
* with their values now moving to the buffer. */
// Initially, `bufferEndIndex`, but later, grown for every resumed emitter.
var newBufferEndIndex = bufferEndIndex
// Represents how many emitters we can awaken at most.
val maxResumeCount = if (nCollectors > 0) {
// If we have collectors we can resume up to maxResumeCount waiting emitters
// a) queueSize -> that's how many waiting emitters we have
// b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity
// Represents how many elements would be in the buffer if we didn't resume any emitters.
val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()
/** a) We can't awaken more than [queueSize] emitters, as only that many have registered.
b) If we awaken more than `bufferCapacity - newBufferSize0` emitters, we'll exceed [bufferCapacity]. */
minOf(queueSize, bufferCapacity - newBufferSize0)
} else {
// If we don't have collectors anymore we must resume all waiting emitters
queueSize // that's how many waiting emitters we have (at most)
// If we don't have collectors anymore, we must resume all waiting emitters and drop the buffer.
/** [queueSize] is the number of emitters that registered (there may, in fact, be fewer than that). */
queueSize
}
/* Walk through at most `maxResumeCount` emitters, moving their values to the buffer,
* zeroing out the cells the emitters were occupying, and storing their continuations in an array that
* will be returned from this function and processed once the lock gets released. */
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val newQueueEndIndex = newBufferEndIndex + queueSize
val newQueueEndIndex = newBufferEndIndex + queueSize // = this.queueEndIndex
if (maxResumeCount > 0) { // collect emitters to resume if we have them
resumes = arrayOfNulls(maxResumeCount)
var resumeCount = 0
Expand All @@ -578,21 +609,21 @@ internal open class SharedFlowImpl<T>(
}
}
}
// Compute new buffer size -> how many values we now actually have after resume
/** Represents how many values are stored in the buffer after processing the emitters.
* The value may be larger than [bufferCapacity] if [nCollectors] was 0,
* but the `minOf(replay)` will clamp this value to a valid one, since `replay <= bufferCapacity`. */
val newBufferSize1 = (newBufferEndIndex - head).toInt()
// Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
// and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
// forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
// too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
// expression, which coerces values that are too big anyway.
if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
// Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
// adjustment for synchronous case with cancelled emitter (NO_VALUE)
if (bufferCapacity == 0 && newReplayIndex < newQueueEndIndex && buffer!!.getBufferAt(newReplayIndex) == NO_VALUE) {
newBufferEndIndex++
newReplayIndex++
}
/** If [nCollectors] was 0, there are no active collector slots,
* and the invariant dictates that [minCollectorIndex] should be [bufferEndIndex] (`replayIndex + bufferSize`).
* Ensure this directly. */
if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
// Update buffer state
updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
// just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
Expand All @@ -619,6 +650,7 @@ internal open class SharedFlowImpl<T>(
bufferSize = (newBufferEndIndex - newHead).toInt()
queueSize = (newQueueEndIndex - newBufferEndIndex).toInt()
// check our key invariants (just in case)
assert { bufferSize <= bufferCapacity }
assert { bufferSize >= 0 }
assert { queueSize >= 0 }
assert { replayIndex <= this.head + bufferSize }
Expand Down Expand Up @@ -713,7 +745,7 @@ internal open class SharedFlowImpl<T>(

override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseSharedFlow(context, capacity, onBufferOverflow)

private class Emitter(
@JvmField val flow: SharedFlowImpl<*>,
@JvmField var index: Long,
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private class StateFlowImpl<T>(
initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
) : AbstractSharedFlow<StateFlowImpl<*>, StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialState) // T | NULL
private var sequence = 0 // serializes updates, value update is in process when sequence is odd

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,66 @@ import kotlin.jvm.*
@JvmField
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First batch of comments. Feel free to address now or wait for the rest to come.


/**
* A slot allocated to a collector when it subscribes to a shared flow and freed when the collector unsubscribes.
*/
internal abstract class AbstractSharedFlowSlot<F> {
/**
* Try marking this slot as allocated for the given [flow]. Only call this under the [flow]'s lock.
*
* Returns `false` if the slot is already allocated to some other collector.
*/
abstract fun allocateLocked(flow: F): Boolean
abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock

/**
* Mark this slot as available for reuse. Only call this under the [flow]'s lock.
*
* Returns an array of continuations that need to be resumed after the lock is released.
* These continuations represent suspended emitters that were waiting for the slowest collector to move on
* so that the next value can be placed into the buffer.
*/
abstract fun freeLocked(flow: F): Array<Continuation<Unit>?>
}

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
protected var slots: Array<S?>? = null // allocated when needed
/**
* A common data structure for `StateFlow` and `SharedFlow`.
*/
internal abstract class AbstractSharedFlow<This, S : AbstractSharedFlowSlot<This>> : SynchronizedObject() {
Copy link
Contributor

@LouisCAD LouisCAD Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is This a new Kotlin feature to refer to the type of the instance's subclass?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only a generic parameter name. Could as well be named T. That said, I was also startled when I saw This. Possibly rename into something less misleading.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, you can read it as T, I just decided to use a more accurate name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just love Github's time travelling machine :)

/**
* Array of slots for collectors of the shared flow.
*
* `null` by default, created on demand.
* Each cell is also `null` by default, and the specific slot object is [created][createSlot] on demand.
* The whole array being `null` or a cell being `null` is equivalent to the cell not being
* [*allocated*][AbstractSharedFlowSlot.allocateLocked]--not to be confused with memory allocation, this means
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocate/free slot was misleading naming, but at least it was consistent.

The current snapshot of all comments uses: register, inhabit, allocate to mean allocate.

Consider picking one term to describe comments. Possibly even replace it in code as well, so there's no need for a lengthy explanation.

Here's some options:
inhabit / free
register / deregister (a bit tricky because you can both register a collector and a slot)
inhabited slot / registered slot / occupied slot / assigned slot

(Another reason for this is because "allocate" used in its true sense at least twice in this file, on L81 and L87)

* that a specific collector inhabits the slot.
*/
protected var slots: Array<S?>? = null
private set
protected var nCollectors = 0 // number of allocated (!free) slots

/**
* The number of [*allocated*][AbstractSharedFlowSlot.allocateLocked] slots in [slots].
*/
protected var nCollectors = 0
private set
private var nextIndex = 0 // oracle for the next free slot index
private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need

/**
* A good starting index for looking for the next non-*allocated* slot in [slots].
*
* It is not guaranteed that this slot will not be *allocated*, nor is it guaranteed that it will be the first
* non-*allocated* slot.
* This is just a heuristic to have a better guess in common scenarios.
*/
private var nextIndex = 0
Comment on lines +56 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says that nextIndex is a "good starting index" / "heuristics" / "guess" but doesn't say how this heuristic is used and what this even means. It does provide two options of what it is definitely not, which is useful, but just would be nice to define the object.

The next collector will be assigned to the first free slot at nextIndex or on the cyclical right of nextIndex.


/**
* The backing field for [subscriptionCount].
*
* Will not be initialized until [subscriptionCount] is accessed for the first time.
*/
private var _subscriptionCount: SubscriptionCountStateFlow? = null

/** A `StateFlow` representing [nCollectors], potentially with some delay. A user-visible API. */
val subscriptionCount: StateFlow<Int>
get() = synchronized(this) {
// allocate under lock in sync with nCollectors variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocate -> initialize

Expand All @@ -31,11 +78,13 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
}
}

/** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the function's meaning but I can't parse "implementation-representation"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid "allocate" since it's implementation defined where the implementer gets their slots from. There may not be any allocations. Also, I'm not sure if it is common to refer to memory allocations in the JVM world?

protected abstract fun createSlot(): S

/** Equivalent to [arrayOfNulls]. */
protected abstract fun createSlotArray(size: Int): Array<S?>

@Suppress("UNCHECKED_CAST")
/** Register a new collector and return its newly allocated slot. A slot may be [created][createSlot] or reused. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"may be either"

protected fun allocateSlot(): S {
// Actually create slot under lock
val subscriptionCount: SubscriptionCountStateFlow?
Expand All @@ -54,7 +103,8 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
slot = slots[index] ?: createSlot().also { slots[index] = it }
index++
if (index >= slots.size) index = 0
if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
@Suppress("UNCHECKED_CAST")
if (slot.allocateLocked(this as This)) break // break when found and allocated free slot
}
nextIndex = index
nCollectors++
Expand All @@ -66,7 +116,7 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
return slot
}

@Suppress("UNCHECKED_CAST")
/** Deregisters a collector and marks its slot as available for reuse. */
protected fun freeSlot(slot: S) {
// Release slot under lock
val subscriptionCount: SubscriptionCountStateFlow?
Expand All @@ -75,7 +125,8 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
// Reset next index oracle if we have no more active collectors for more predictable behavior next time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you removed the other mention/introduction of the term oracle, so now this comment is less clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word oracle was not the best choice, but it was used in the same sense as "hint" in C++ map.emplace_hint. No action, just saying.

if (nCollectors == 0) nextIndex = 0
(slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
@Suppress("UNCHECKED_CAST")
slot.freeLocked(this as This)
}
/*
* Resume suspended coroutines.
Expand Down
32 changes: 32 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.random.*
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds

/**
* This test suite contains some basic tests for [SharedFlow]. There are some scenarios here written
Expand Down Expand Up @@ -820,6 +821,37 @@ class SharedFlowTest : TestBase() {
fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest {
testSubscriptionByFirstSuspensionInCollect(MutableSharedFlow()) { emit(it) }
}

/** Tests that cancelling a subscriber and an emitter simultaneously
* does not break the invariants of a [SharedFlow] with no buffer. */
@Test
fun testCancellingSubscriberAndEmitterWithNoBuffer() = runTest {
val mutableSharedFlow = MutableSharedFlow<Unit>()
repeat(10) {
launch {
val valueObtained = CompletableDeferred<Unit>()
val j1 = launch {
mutableSharedFlow.collect {
valueObtained.complete(Unit)
}
}
val j2 =
launch {
while (isActive) {
mutableSharedFlow.emit(Unit)
}
}
val successfullyAwaited = withTimeoutOrNull(100.milliseconds) {
valueObtained.await()
}
if (successfullyAwaited == null) {
throw AssertionError("The collector failed to obtain a value from a non-empty shared flow")
}
j1.cancel()
j2.cancel()
}.join()
}
}
}

/**
Expand Down