-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix SharedFlow entering an invalid state #4591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,19 +10,66 @@ import kotlin.jvm.* | |
| @JvmField | ||
| internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First batch of comments. Feel free to address now or wait for the rest to come. |
||
|
|
||
| /** | ||
| * A slot allocated to a collector when it subscribes to a shared flow and freed when the collector unsubscribes. | ||
| */ | ||
| internal abstract class AbstractSharedFlowSlot<F> { | ||
| /** | ||
| * Try marking this slot as allocated for the given [flow]. Only call this under the [flow]'s lock. | ||
| * | ||
| * Returns `false` if the slot is already allocated to some other collector. | ||
| */ | ||
| abstract fun allocateLocked(flow: F): Boolean | ||
| abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock | ||
|
|
||
| /** | ||
| * Mark this slot as available for reuse. Only call this under the [flow]'s lock. | ||
| * | ||
| * Returns an array of continuations that need to be resumed after the lock is released. | ||
| * These continuations represent suspended emitters that were waiting for the slowest collector to move on | ||
| * so that the next value can be placed into the buffer. | ||
| */ | ||
| abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> | ||
| } | ||
|
|
||
| internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() { | ||
| protected var slots: Array<S?>? = null // allocated when needed | ||
| /** | ||
| * A common data structure for `StateFlow` and `SharedFlow`. | ||
| */ | ||
| internal abstract class AbstractSharedFlow<This, S : AbstractSharedFlowSlot<This>> : SynchronizedObject() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's only a generic parameter name. Could as well be named T. That said, I was also startled when I saw
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, you can read it as
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I just love Github's time travelling machine :) |
||
| /** | ||
| * Array of slots for collectors of the shared flow. | ||
| * | ||
| * `null` by default, created on demand. | ||
| * Each cell is also `null` by default, and the specific slot object is [created][createSlot] on demand. | ||
| * The whole array being `null` or a cell being `null` is equivalent to the cell not being | ||
| * [*allocated*][AbstractSharedFlowSlot.allocateLocked]--not to be confused with memory allocation, this means | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Allocate/free slot was misleading naming, but at least it was consistent. The current snapshot of all comments uses: register, inhabit, allocate to mean allocate. Consider picking one term to describe comments. Possibly even replace it in code as well, so there's no need for a lengthy explanation. Here's some options: (Another reason for this is because "allocate" used in its true sense at least twice in this file, on L81 and L87) |
||
| * that a specific collector inhabits the slot. | ||
| */ | ||
| protected var slots: Array<S?>? = null | ||
| private set | ||
| protected var nCollectors = 0 // number of allocated (!free) slots | ||
|
|
||
| /** | ||
| * The number of [*allocated*][AbstractSharedFlowSlot.allocateLocked] slots in [slots]. | ||
| */ | ||
| protected var nCollectors = 0 | ||
| private set | ||
| private var nextIndex = 0 // oracle for the next free slot index | ||
| private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need | ||
|
|
||
| /** | ||
| * A good starting index for looking for the next non-*allocated* slot in [slots]. | ||
| * | ||
| * It is not guaranteed that this slot will not be *allocated*, nor is it guaranteed that it will be the first | ||
| * non-*allocated* slot. | ||
| * This is just a heuristic to have a better guess in common scenarios. | ||
| */ | ||
| private var nextIndex = 0 | ||
|
Comment on lines
+56
to
+63
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment says that
|
||
|
|
||
| /** | ||
| * The backing field for [subscriptionCount]. | ||
| * | ||
| * Will not be initialized until [subscriptionCount] is accessed for the first time. | ||
| */ | ||
| private var _subscriptionCount: SubscriptionCountStateFlow? = null | ||
|
|
||
| /** A `StateFlow` representing [nCollectors], potentially with some delay. A user-visible API. */ | ||
| val subscriptionCount: StateFlow<Int> | ||
| get() = synchronized(this) { | ||
| // allocate under lock in sync with nCollectors variable | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. allocate -> initialize |
||
|
|
@@ -31,11 +78,13 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync | |
| } | ||
| } | ||
|
|
||
| /** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the function's meaning but I can't parse "implementation-representation"
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid "allocate" since it's implementation defined where the implementer gets their slots from. There may not be any allocations. Also, I'm not sure if it is common to refer to memory allocations in the JVM world? |
||
| protected abstract fun createSlot(): S | ||
|
|
||
| /** Equivalent to [arrayOfNulls]. */ | ||
| protected abstract fun createSlotArray(size: Int): Array<S?> | ||
|
|
||
| @Suppress("UNCHECKED_CAST") | ||
| /** Register a new collector and return its newly allocated slot. A slot may be [created][createSlot] or reused. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "may be either" |
||
| protected fun allocateSlot(): S { | ||
| // Actually create slot under lock | ||
| val subscriptionCount: SubscriptionCountStateFlow? | ||
|
|
@@ -54,7 +103,8 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync | |
| slot = slots[index] ?: createSlot().also { slots[index] = it } | ||
| index++ | ||
| if (index >= slots.size) index = 0 | ||
| if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot | ||
| @Suppress("UNCHECKED_CAST") | ||
| if (slot.allocateLocked(this as This)) break // break when found and allocated free slot | ||
| } | ||
| nextIndex = index | ||
| nCollectors++ | ||
|
|
@@ -66,7 +116,7 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync | |
| return slot | ||
| } | ||
|
|
||
| @Suppress("UNCHECKED_CAST") | ||
| /** Deregisters a collector and marks its slot as available for reuse. */ | ||
| protected fun freeSlot(slot: S) { | ||
| // Release slot under lock | ||
| val subscriptionCount: SubscriptionCountStateFlow? | ||
|
|
@@ -75,7 +125,8 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync | |
| subscriptionCount = _subscriptionCount // retrieve under lock if initialized | ||
| // Reset next index oracle if we have no more active collectors for more predictable behavior next time | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you removed the other mention/introduction of the term oracle, so now this comment is less clear
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The word oracle was not the best choice, but it was used in the same sense as "hint" in C++ |
||
| if (nCollectors == 0) nextIndex = 0 | ||
| (slot as AbstractSharedFlowSlot<Any>).freeLocked(this) | ||
| @Suppress("UNCHECKED_CAST") | ||
| slot.freeLocked(this as This) | ||
| } | ||
| /* | ||
| * Resume suspended coroutines. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you get rid of this now irrelevant suppress in all of the codebase (1 other usage)