From 08aec8e1d59a36d42b79dd0aa745bbe8dd390b38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sun, 29 Oct 2023 18:33:28 +0700 Subject: [PATCH 1/9] publishWithSelector --- .idea/vcs.xml | 2 +- .../hoc081098/flowext/publishWithSelector.kt | 354 ++++++++++++++++++ 2 files changed, 355 insertions(+), 1 deletion(-) create mode 100644 src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt diff --git a/.idea/vcs.xml b/.idea/vcs.xml index aae8b7c8..4c6280eb 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -9,4 +9,4 @@ - + \ No newline at end of file diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt new file mode 100644 index 00000000..f8553e5d --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -0,0 +1,354 @@ +/* + * MIT License + * + * Copyright (c) 2021 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn +import kotlin.concurrent.Volatile +import kotlin.jvm.JvmField +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.internal.SynchronizedObject +import kotlinx.coroutines.internal.synchronized +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +@FlowExtPreview +@DslMarker +public annotation class PublishSelectorDsl + +@FlowExtPreview +@PublishSelectorDsl +public sealed interface SelectorSharedFlowScope { + @PublishSelectorDsl + public fun Flow.shared(replay: Int = 0): SharedFlow + + /** @suppress */ + @Deprecated( + level = DeprecationLevel.ERROR, + message = "This function is not supported", + replaceWith = ReplaceWith("this.shared(replay)"), + ) + public fun Flow.shareIn( + scope: CoroutineScope, + started: SharingStarted, + replay: Int = 0, + ): SharedFlow = throw UnsupportedOperationException("Not implemented, should not be called") +} + +@FlowExtPreview +private typealias SelectorFunction = suspend SelectorSharedFlowScope.(Flow) -> Flow + +@FlowExtPreview +@PublishSelectorDsl +public sealed interface SelectorScope { + @PublishSelectorDsl + public fun select(block: SelectorFunction): Flow +} + +private class SimpleSuspendLazy( + initializer: suspend () -> T, +) { + private val mutex = Mutex() + + private var _initializer: (suspend () -> T)? = initializer + + @Volatile + private var value: T? = null + + suspend fun getValue(): T = + value ?: mutex.withLock { + value ?: _initializer!!().also { + _initializer = null + value = it + } + } + + fun clear() { + _initializer = null + value = null + } +} + +@FlowExtPreview +@OptIn(DelicateCoroutinesApi::class, InternalCoroutinesApi::class) +private class DefaultSelectorScope( + @JvmField val scope: CoroutineScope, +) : SynchronizedObject(), + SelectorScope, + SelectorSharedFlowScope { + // TODO: atomic + // Initialized in freezeAndInit + // Will be set to null when close or cancel + @JvmField + @Volatile + var channels: Array>? = null + + // TODO: atomic + // Initialized in freezeAndInit + // Will be set to null when all output flows are completed. + @JvmField + @Volatile + var cachedSelectedFlows: Array>>? = null + + // TODO: atomic + @JvmField + val blocks: MutableList> = ArrayList() + + /** + * Indicate that this scope is frozen, all [select] calls after this will throw [IllegalStateException]. + */ + @Volatile + @JvmField + var isFrozen = false // TODO: atomic + + /** + * Indicate that a [select] calls is in progress, + * all [select] calls inside another [select] block will throw [IllegalStateException]. + */ + @JvmField + @Volatile + var isInSelectClause = false // TODO: atomic + + // TODO: atomic + @JvmField + @Volatile + var completedCount = 0 + + // TODO: atomic or sync? + override fun select(block: SelectorFunction): Flow = + synchronized(this) { + check(!isInSelectClause) { "select can not be called inside another select" } + check(!isFrozen) { "select only can be called inside publish, do not use SelectorScope outside publish" } + + isInSelectClause = true + + blocks += block + val index = size - 1 + + return defer { + val cached = + synchronized(this) { + // Only frozen state can reach here, + // that means we collect the output flow after frozen this scope + check(isFrozen) { "only frozen state can reach here!" } + + cachedSelectedFlows + ?.get(index) + ?: error("It looks like you are trying to collect the select{} flow outside publish, please don't do that!") + } + + @Suppress("UNCHECKED_CAST") // Always safe + cached.getValue() as Flow + }.onCompletion { onCompleteASelectedFlow(index) } + .also { isInSelectClause = false } + } + + // TODO: atomic + private fun onCompleteASelectedFlow(index: Int) { + synchronized(this@DefaultSelectorScope) { + completedCount++ + + println("onCompleteASelectedFlow: completedCount = $completedCount, size = $size (index = $index)") + + if (completedCount == size) { + cachedSelectedFlows?.forEach { it.clear() } + cachedSelectedFlows = null + println("onCompleteASelectedFlow: cancel the publish scope") + } + } + } + + override fun Flow.shared(replay: Int): SharedFlow = + kotlinXFlowShareIn( + scope = scope, + started = SharingStarted.Lazily, + replay = replay, + ) + + // TODO: synchronized? + fun freezeAndInit() = + synchronized(this) { + val channels = Array(size) { Channel() }.also { this.channels = it } + + cachedSelectedFlows = + Array(size) { index -> + val block = blocks[index] + val flow = channels[index].consumeAsFlow() + + SimpleSuspendLazy { this.block(flow) } + } + + isFrozen = true + } + + private inline val size: Int get() = blocks.size + + suspend fun send(value: T) { + println("send: $value") + for (channel in channels.orEmpty()) { + if (channel.isClosedForSend || channel.isClosedForReceive) { + continue + } + + try { + channel.send(value) + } catch (_: Throwable) { + // Swallow all exceptions + } + } + } + + // TODO: synchronized? + fun close(e: Throwable?) = + synchronized(this) { + println("close: $e") + for (channel in channels.orEmpty()) { + channel.close(e) + } + channels = null + } + + // TODO: synchronized? + fun cancel(e: CancellationException) = + synchronized(this) { + println("cancel: $e") + for (channel in channels.orEmpty()) { + channel.cancel(e) + } + channels = null + } +} + +@FlowExtPreview +public fun Flow.publish(selector: suspend SelectorScope.() -> Flow): Flow { + val source = this + + return flow { + coroutineScope { + val scope = DefaultSelectorScope(this) + + val output = selector(scope) + + // IMPORTANT: freeze and init before collect the output flow + scope.freezeAndInit() + + launch { + try { + source.collect { value -> return@collect scope.send(value) } + scope.close(null) + } catch (e: CancellationException) { + scope.cancel(e) + throw e + } catch (e: Throwable) { + scope.close(e) + throw e + } + } + + // IMPORTANT: collect the output flow after frozen the scope + emitAll(output) + } + } +} + +@OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) +public suspend fun main() { + flow { + println("Collect...") + delay(100) + emit(1) + delay(100) + emit(2) + delay(100) + emit(3) + delay(100) + emit("4") + }.onEach { println(">>> onEach: $it") } + .publish { + delay(100) + + merge( + select { flow -> + delay(1) + val sharedFlow = flow.shared() + + interval(0, 100) + .onEach { println(">>> interval: $it") } + .flatMapMerge { value -> + timer(value, 50) + .withLatestFrom(sharedFlow) + .map { it to "shared" } + }.takeUntil(sharedFlow.filter { it == 3 }) + }, + select { flow -> + flow + .filterIsInstance() + .filter { it % 2 == 0 } + .map { it to "even" } + .take(1) + }, + select { flow -> + flow + .filterIsInstance() + .filter { it % 2 != 0 } + .map { it to "odd" } + .take(1) + }, + select { flow -> + flow + .filterIsInstance() + .map { it to "string" } + .take(1) + }, + ) + } + .toList() + .also { println(it) } + .let { + check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) + } +} From a66f93cc6c1890d81ec8d5ea89d6944eaa29052e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Mon, 30 Oct 2023 01:32:41 +0700 Subject: [PATCH 2/9] atomic --- api/FlowExt.api | 24 + .../hoc081098/flowext/publishWithSelector.kt | 430 ++++++++++++++---- 2 files changed, 356 insertions(+), 98 deletions(-) diff --git a/api/FlowExt.api b/api/FlowExt.api index 8289218c..b4d2a01c 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -191,6 +191,15 @@ public final class com/hoc081098/flowext/PairwiseKt { public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } +public abstract interface annotation class com/hoc081098/flowext/PublishSelectorDsl : java/lang/annotation/Annotation { +} + +public final class com/hoc081098/flowext/PublishWithSelectorKt { + public static final fun main (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun main ([Ljava/lang/String;)V + public static final fun publish (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; +} + public final class com/hoc081098/flowext/RaceKt { public static final fun amb (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow; public static final fun amb (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -225,6 +234,21 @@ public final class com/hoc081098/flowext/ScanWithKt { public static final fun scanWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } +public abstract interface class com/hoc081098/flowext/SelectorScope { + public abstract fun select (Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; +} + +public abstract interface class com/hoc081098/flowext/SelectorSharedFlowScope { + public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow; + public abstract fun shared (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/SharedFlow; +} + +public final class com/hoc081098/flowext/SelectorSharedFlowScope$DefaultImpls { + public static fun shareIn (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow; + public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; + public static synthetic fun shared$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; +} + public final class com/hoc081098/flowext/SelectorsKt { public static final fun select (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun select (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow; diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index f8553e5d..6cfd6225 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2021 Petrus Nguyễn Thái Học + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,9 +22,10 @@ * SOFTWARE. */ +@file:Suppress("ktlint:standard:property-naming") + package com.hoc081098.flowext -import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn import kotlin.concurrent.Volatile import kotlin.jvm.JvmField import kotlinx.coroutines.CancellationException @@ -36,19 +37,25 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.toList import kotlinx.coroutines.internal.SynchronizedObject import kotlinx.coroutines.internal.synchronized @@ -113,91 +120,252 @@ private class SimpleSuspendLazy( } } +private fun simpleLazyOf(initializer: () -> T): SimpleLazy = + SimpleLazy(initializer) + +// TODO: Remove SynchronizedObject +@OptIn(InternalCoroutinesApi::class) +private class SimpleLazy( + initializer: () -> T, +) : SynchronizedObject() { + private var _initializer: (() -> T)? = initializer + + @Volatile + private var value: T? = null + + fun getValue(): T = + value ?: synchronized(this) { + value ?: _initializer!!().also { + _initializer = null + value = it + } + } + + fun getOrNull(): T? = value + + fun clear() { + _initializer = null + value = null + } +} + +private typealias SimpleSuspendLazyOfFlow = SimpleSuspendLazy> + @FlowExtPreview -@OptIn(DelicateCoroutinesApi::class, InternalCoroutinesApi::class) +private sealed interface DefaultSelectorScopeState { + data object Init : DefaultSelectorScopeState + + sealed interface NotFrozen : DefaultSelectorScopeState { + val blocks: List> + + data class InSelectClause( + override val blocks: List>, + ) : NotFrozen + + data class NotInSelectClause( + override val blocks: List>, + ) : NotFrozen + } + + data class Frozen( + val selectedFlowsAndChannels: SimpleLazy, List>>>, + val completedCount: Int, + val blocks: List>, + ) : DefaultSelectorScopeState + + data object Closed : DefaultSelectorScopeState +} + +@OptIn(FlowExtPreview::class) +private val DefaultSelectorScopeState.debug: String + get() = when (this) { + DefaultSelectorScopeState.Closed -> toString() + DefaultSelectorScopeState.Init -> toString() + is DefaultSelectorScopeState.Frozen -> { + val orNull = selectedFlowsAndChannels.getOrNull() + """ + | Frozen( + | selectedFlowsAndChannels = ${orNull?.first?.size to orNull?.second?.size}, + | completedCount = $completedCount, + | blocks = ${blocks.size} + | ) + """.trimMargin() + } + + is DefaultSelectorScopeState.NotFrozen.InSelectClause -> + """ + | NotFrozen.InSelectClause( + | blocks = ${blocks.size} + | ) + """.trimMargin() + + is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> + """ + | NotFrozen.NotInSelectClause( + | blocks = ${blocks.size} + | ) + """.trimMargin() + } + +@FlowExtPreview +@OptIn(DelicateCoroutinesApi::class) private class DefaultSelectorScope( @JvmField val scope: CoroutineScope, -) : SynchronizedObject(), +) : SelectorScope, - SelectorSharedFlowScope { - // TODO: atomic - // Initialized in freezeAndInit - // Will be set to null when close or cancel + SelectorSharedFlowScope { + // TODO: Revert to AtomicRef @JvmField - @Volatile - var channels: Array>? = null + val stateRef = MutableStateFlow>(DefaultSelectorScopeState.Init) + + init { + // TODO: Revert to AtomicRef + stateRef + .buffer(Channel.UNLIMITED) + .takeWhile { it !is DefaultSelectorScopeState.Closed } + .concatWith(flowOf(DefaultSelectorScopeState.Closed)) + .onEach { state -> println("state: ${state.debug}") } + .launchIn(scope) + } - // TODO: atomic - // Initialized in freezeAndInit - // Will be set to null when all output flows are completed. - @JvmField - @Volatile - var cachedSelectedFlows: Array>>? = null + override fun select(block: SelectorFunction): Flow { + println("call select with block: $block") - // TODO: atomic - @JvmField - val blocks: MutableList> = ArrayList() + while (true) { + val state = stateRef.value - /** - * Indicate that this scope is frozen, all [select] calls after this will throw [IllegalStateException]. - */ - @Volatile - @JvmField - var isFrozen = false // TODO: atomic + val updated = when (state) { + DefaultSelectorScopeState.Closed -> { + error("This scope is closed") + } - /** - * Indicate that a [select] calls is in progress, - * all [select] calls inside another [select] block will throw [IllegalStateException]. - */ - @JvmField - @Volatile - var isInSelectClause = false // TODO: atomic + is DefaultSelectorScopeState.Frozen -> { + error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") + } - // TODO: atomic - @JvmField - @Volatile - var completedCount = 0 + DefaultSelectorScopeState.Init -> { + // Ok, lets transition to NotFrozen.InSelectClause + DefaultSelectorScopeState.NotFrozen.InSelectClause(blocks = listOf(block)) + } + + is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + error("`select` can not be called inside another `select`") + } + + is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + // Ok, lets transition to NotFrozen.InSelectClause + DefaultSelectorScopeState.NotFrozen.InSelectClause(blocks = state.blocks + block) + } + } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + val index = updated.blocks.size - 1 + + val result = defer { + // Only frozen state can reach here, + // that means we collect the output flow after frozen this scope + val stateWhenCollecting = stateRef.value + check(stateWhenCollecting is DefaultSelectorScopeState.Frozen) { "only frozen state can reach here!" } + + @Suppress("UNCHECKED_CAST") // We know that the type is correct + stateWhenCollecting + .selectedFlowsAndChannels + .getValue() + .first[index] + .getValue() + as Flow + }.onCompletion { onCompleteSelectedFlow(index) } + + while (true) { + val state = stateRef.value + + val updated = when (state) { + is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + // Ok, lets transition to NotFrozen.NotInSelectClause + DefaultSelectorScopeState.NotFrozen.NotInSelectClause(blocks = state.blocks) + } + + is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + // Ok, state already is NotFrozen.NotInSelectClause + return result + } + + DefaultSelectorScopeState.Closed -> { + error("This scope is closed") + } + + is DefaultSelectorScopeState.Frozen -> { + error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") + } + + DefaultSelectorScopeState.Init -> { + error("Cannot be here!") + } + } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + return result + } + } + } + } + } - // TODO: atomic or sync? - override fun select(block: SelectorFunction): Flow = - synchronized(this) { - check(!isInSelectClause) { "select can not be called inside another select" } - check(!isFrozen) { "select only can be called inside publish, do not use SelectorScope outside publish" } + private fun onCompleteSelectedFlow(index: Int) { + while (true) { + val state = stateRef.value - isInSelectClause = true + val updated = when (state) { + DefaultSelectorScopeState.Init -> { + error("Cannot be here!") + } - blocks += block - val index = size - 1 + is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + error("Cannot be here!") + } - return defer { - val cached = - synchronized(this) { - // Only frozen state can reach here, - // that means we collect the output flow after frozen this scope - check(isFrozen) { "only frozen state can reach here!" } + is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + error("Cannot be here!") + } - cachedSelectedFlows - ?.get(index) - ?: error("It looks like you are trying to collect the select{} flow outside publish, please don't do that!") + is DefaultSelectorScopeState.Frozen -> { + if (state.completedCount == state.blocks.size) { + // Ok, all output flows are completed. Lets transition to DefaultSelectorScopeState.Closed + DefaultSelectorScopeState.Closed + } else { + // Ok, lets transition to DefaultSelectorScopeState.Frozen with completedCount=completedCount + 1 + state.copy(completedCount = state.completedCount + 1) } + } - @Suppress("UNCHECKED_CAST") // Always safe - cached.getValue() as Flow - }.onCompletion { onCompleteASelectedFlow(index) } - .also { isInSelectClause = false } - } + DefaultSelectorScopeState.Closed -> { + // Ok, already closed. Do nothing. + return + } + } - // TODO: atomic - private fun onCompleteASelectedFlow(index: Int) { - synchronized(this@DefaultSelectorScope) { - completedCount++ + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success - println("onCompleteASelectedFlow: completedCount = $completedCount, size = $size (index = $index)") + println( + "onCompleteSelectedFlow: completedCount = ${(updated as? DefaultSelectorScopeState.Frozen)?.completedCount}, " + + "size = ${state.blocks.size}, " + + "(index = $index)", + ) + + // Once state reaches DefaultSelectorScopeState.Closed, we can clear unused lazy + if (updated is DefaultSelectorScopeState.Closed) { + state.selectedFlowsAndChannels.run { + getOrNull()?.first?.forEach { it.clear() } + clear() + } - if (completedCount == size) { - cachedSelectedFlows?.forEach { it.clear() } - cachedSelectedFlows = null - println("onCompleteASelectedFlow: cancel the publish scope") + println("onCompleteSelectedFlow: cancel the publish scope") + } + + return } } } @@ -209,27 +377,62 @@ private class DefaultSelectorScope( replay = replay, ) - // TODO: synchronized? - fun freezeAndInit() = - synchronized(this) { - val channels = Array(size) { Channel() }.also { this.channels = it } + fun freezeAndInit() { + while (true) { + val state = stateRef.value - cachedSelectedFlows = - Array(size) { index -> - val block = blocks[index] - val flow = channels[index].consumeAsFlow() + // Transition from NotFrozen to Frozen + when (state) { + DefaultSelectorScopeState.Init -> { + error("Not implemented") + } - SimpleSuspendLazy { this.block(flow) } + is DefaultSelectorScopeState.NotFrozen -> { + // Freeze and init } - isFrozen = true - } + is DefaultSelectorScopeState.Frozen, DefaultSelectorScopeState.Closed -> { + // Already frozen or closed + return + } + } - private inline val size: Int get() = blocks.size + val blocks = state.blocks + val size = blocks.size + val updated = DefaultSelectorScopeState.Frozen( + selectedFlowsAndChannels = simpleLazyOf { + val channels = List(size) { Channel() } + + List(size) { index -> + val block = blocks[index] + val flow = channels[index].consumeAsFlow() + SimpleSuspendLazy { this.block(flow) } + } to channels + }, + completedCount = 0, + blocks = blocks, + ) + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + return + } + + // clear unused lazy + updated.selectedFlowsAndChannels.clear() + } + } suspend fun send(value: T) { println("send: $value") - for (channel in channels.orEmpty()) { + + val state = stateRef.value as? DefaultSelectorScopeState.Frozen ?: return + val channels = state + .selectedFlowsAndChannels + .getValue() + .second + + for (channel in channels) { if (channel.isClosedForSend || channel.isClosedForReceive) { continue } @@ -242,25 +445,58 @@ private class DefaultSelectorScope( } } - // TODO: synchronized? - fun close(e: Throwable?) = - synchronized(this) { - println("close: $e") - for (channel in channels.orEmpty()) { - channel.close(e) + private fun transitionToClosed(action: (Channel) -> Unit) { + while (true) { + val state = stateRef.value + + val updated = when (state) { + DefaultSelectorScopeState.Init -> { + error("Cannot be here!") + } + + is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + error("Cannot be here!") + } + + is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + error("Cannot be here!") + } + + is DefaultSelectorScopeState.Frozen -> { + // Ok, lets transition to DefaultSelectorScopeState.Closed + DefaultSelectorScopeState.Closed + } + + DefaultSelectorScopeState.Closed -> { + // Ok, already closed. Do nothing. + return + } } - channels = null - } - // TODO: synchronized? - fun cancel(e: CancellationException) = - synchronized(this) { - println("cancel: $e") - for (channel in channels.orEmpty()) { - channel.cancel(e) + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + + // Once state reaches DefaultSelectorScopeState.Closed, we can clear unused lazy and close all channels + state.selectedFlowsAndChannels.run { + getOrNull()?.first?.forEach { it.clear() } + getOrNull()?.second?.forEach(action) + clear() + } + + return } - channels = null } + } + + fun close(e: Throwable?) { + println("close: $e") + transitionToClosed { it.close(e) } + } + + fun cancel(e: CancellationException) { + println("cancel: $e") + transitionToClosed { it.cancel(e) } + } } @FlowExtPreview @@ -348,7 +584,5 @@ public suspend fun main() { } .toList() .also { println(it) } - .let { - check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) - } + .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } } From 636fdfa84e01a59f139eb87d3871bc8619122eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Mon, 30 Oct 2023 01:37:56 +0700 Subject: [PATCH 3/9] atomic --- .../hoc081098/flowext/publishWithSelector.kt | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index 6cfd6225..3cb9bc06 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -71,13 +71,16 @@ public annotation class PublishSelectorDsl @PublishSelectorDsl public sealed interface SelectorSharedFlowScope { @PublishSelectorDsl - public fun Flow.shared(replay: Int = 0): SharedFlow + public fun Flow.shareIn( + replay: Int = 0, + started: SharingStarted = SharingStarted.Lazily, + ): SharedFlow /** @suppress */ @Deprecated( level = DeprecationLevel.ERROR, message = "This function is not supported", - replaceWith = ReplaceWith("this.shared(replay)"), + replaceWith = ReplaceWith("this.shareIn(replay, started)"), ) public fun Flow.shareIn( scope: CoroutineScope, @@ -211,9 +214,8 @@ private val DefaultSelectorScopeState.debug: String @OptIn(DelicateCoroutinesApi::class) private class DefaultSelectorScope( @JvmField val scope: CoroutineScope, -) : - SelectorScope, - SelectorSharedFlowScope { +) : SelectorScope, + SelectorSharedFlowScope { // TODO: Revert to AtomicRef @JvmField val stateRef = MutableStateFlow>(DefaultSelectorScopeState.Init) @@ -370,10 +372,10 @@ private class DefaultSelectorScope( } } - override fun Flow.shared(replay: Int): SharedFlow = + override fun Flow.shareIn(replay: Int, started: SharingStarted): SharedFlow = kotlinXFlowShareIn( scope = scope, - started = SharingStarted.Lazily, + started = started, replay = replay, ) @@ -550,7 +552,7 @@ public suspend fun main() { merge( select { flow -> delay(1) - val sharedFlow = flow.shared() + val sharedFlow = flow.shareIn() interval(0, 100) .onEach { println(">>> interval: $it") } From 16f7b6bb24b5b6576e3389233aa6935ee3e4ba73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Mon, 30 Oct 2023 01:40:23 +0700 Subject: [PATCH 4/9] atomic --- .../kotlin/com/hoc081098/flowext/publishWithSelector.kt | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index 3cb9bc06..1c484c68 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -68,9 +68,13 @@ import kotlinx.coroutines.sync.withLock public annotation class PublishSelectorDsl @FlowExtPreview -@PublishSelectorDsl +@DslMarker +public annotation class PublishSelectorSharedFlowDsl + +@FlowExtPreview +@PublishSelectorSharedFlowDsl public sealed interface SelectorSharedFlowScope { - @PublishSelectorDsl + @PublishSelectorSharedFlowDsl public fun Flow.shareIn( replay: Int = 0, started: SharingStarted = SharingStarted.Lazily, From 0928da3c0b83838f109ffa48afc07ca9be6230ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Mon, 30 Oct 2023 01:40:40 +0700 Subject: [PATCH 5/9] atomic --- api/FlowExt.api | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/FlowExt.api b/api/FlowExt.api index b4d2a01c..70da4ec2 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -194,6 +194,9 @@ public final class com/hoc081098/flowext/PairwiseKt { public abstract interface annotation class com/hoc081098/flowext/PublishSelectorDsl : java/lang/annotation/Annotation { } +public abstract interface annotation class com/hoc081098/flowext/PublishSelectorSharedFlowDsl : java/lang/annotation/Annotation { +} + public final class com/hoc081098/flowext/PublishWithSelectorKt { public static final fun main (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun main ([Ljava/lang/String;)V @@ -239,14 +242,14 @@ public abstract interface class com/hoc081098/flowext/SelectorScope { } public abstract interface class com/hoc081098/flowext/SelectorSharedFlowScope { + public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/SharedFlow; public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow; - public abstract fun shared (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/SharedFlow; } public final class com/hoc081098/flowext/SelectorSharedFlowScope$DefaultImpls { public static fun shareIn (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow; + public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; - public static synthetic fun shared$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow; } public final class com/hoc081098/flowext/SelectorsKt { From 5009265c8344fd7ae3c3cc57afdef69db023b102 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sat, 18 Nov 2023 18:31:55 +0700 Subject: [PATCH 6/9] loop --- .../hoc081098/flowext/internal/AtomicRef.kt | 47 +++++++++++++++++++ .../hoc081098/flowext/publishWithSelector.kt | 28 ++--------- 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt index b4963113..b7208f6b 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt @@ -29,3 +29,50 @@ internal expect class AtomicRef(initialValue: T) { fun compareAndSet(expect: T, update: T): Boolean } + +// Copy from: https://github.com/arrow-kt/arrow/blob/6fb6a75b131f5bbb272611bf277e263ff791cb67/arrow-libs/core/arrow-atomic/src/commonMain/kotlin/arrow/atomic/Atomic.kt#L44 + +/** + * Infinite loop that reads this atomic variable and performs the specified [action] on its value. + */ +internal inline fun AtomicRef.loop(action: (T) -> Unit): Nothing { + while (true) { + action(value) + } +} + +internal fun AtomicRef.tryUpdate(function: (T) -> T): Boolean { + val cur = value + val upd = function(cur) + return compareAndSet(cur, upd) +} + +internal inline fun AtomicRef.update(function: (T) -> T) { + while (true) { + val cur = value + val upd = function(cur) + if (compareAndSet(cur, upd)) return + } +} + +/** + * Updates variable atomically using the specified [function] of its value and returns its old value. + */ +internal inline fun AtomicRef.getAndUpdate(function: (T) -> T): T { + while (true) { + val cur = value + val upd = function(cur) + if (compareAndSet(cur, upd)) return cur + } +} + +/** + * Updates variable atomically using the specified [function] of its value and returns its new value. + */ +internal inline fun AtomicRef.updateAndGet(function: (T) -> T): T { + while (true) { + val cur = value + val upd = function(cur) + if (compareAndSet(cur, upd)) return upd + } +} diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index 1c484c68..7a281303 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -26,6 +26,8 @@ package com.hoc081098.flowext +import com.hoc081098.flowext.internal.AtomicRef +import com.hoc081098.flowext.internal.loop import kotlin.concurrent.Volatile import kotlin.jvm.JvmField import kotlinx.coroutines.CancellationException @@ -37,25 +39,20 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn import kotlinx.coroutines.flow.take -import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.toList import kotlinx.coroutines.internal.SynchronizedObject import kotlinx.coroutines.internal.synchronized @@ -220,19 +217,8 @@ private class DefaultSelectorScope( @JvmField val scope: CoroutineScope, ) : SelectorScope, SelectorSharedFlowScope { - // TODO: Revert to AtomicRef @JvmField - val stateRef = MutableStateFlow>(DefaultSelectorScopeState.Init) - - init { - // TODO: Revert to AtomicRef - stateRef - .buffer(Channel.UNLIMITED) - .takeWhile { it !is DefaultSelectorScopeState.Closed } - .concatWith(flowOf(DefaultSelectorScopeState.Closed)) - .onEach { state -> println("state: ${state.debug}") } - .launchIn(scope) - } + val stateRef = AtomicRef>(DefaultSelectorScopeState.Init) override fun select(block: SelectorFunction): Flow { println("call select with block: $block") @@ -384,9 +370,7 @@ private class DefaultSelectorScope( ) fun freezeAndInit() { - while (true) { - val state = stateRef.value - + stateRef.loop { state -> // Transition from NotFrozen to Frozen when (state) { DefaultSelectorScopeState.Init -> { @@ -452,9 +436,7 @@ private class DefaultSelectorScope( } private fun transitionToClosed(action: (Channel) -> Unit) { - while (true) { - val state = stateRef.value - + stateRef.loop { state -> val updated = when (state) { DefaultSelectorScopeState.Init -> { error("Cannot be here!") From dc0d5aa84a77acd5c1afcf3ba5ee5c86ffc7b7e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sat, 18 Nov 2023 18:35:38 +0700 Subject: [PATCH 7/9] loop 2 --- .../hoc081098/flowext/publishWithSelector.kt | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index 7a281303..d4528c0b 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -223,9 +223,7 @@ private class DefaultSelectorScope( override fun select(block: SelectorFunction): Flow { println("call select with block: $block") - while (true) { - val state = stateRef.value - + stateRef.loop { state -> val updated = when (state) { DefaultSelectorScopeState.Closed -> { error("This scope is closed") @@ -269,46 +267,48 @@ private class DefaultSelectorScope( as Flow }.onCompletion { onCompleteSelectedFlow(index) } - while (true) { - val state = stateRef.value + transitionToNotInSelectClause() - val updated = when (state) { - is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { - // Ok, lets transition to NotFrozen.NotInSelectClause - DefaultSelectorScopeState.NotFrozen.NotInSelectClause(blocks = state.blocks) - } + return result + } + } + } - is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { - // Ok, state already is NotFrozen.NotInSelectClause - return result - } + private fun transitionToNotInSelectClause() { + stateRef.loop { state -> + val updated = when (state) { + is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + // Ok, lets transition to NotFrozen.NotInSelectClause + DefaultSelectorScopeState.NotFrozen.NotInSelectClause(blocks = state.blocks) + } - DefaultSelectorScopeState.Closed -> { - error("This scope is closed") - } + is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + // Ok, state already is NotFrozen.NotInSelectClause + return + } - is DefaultSelectorScopeState.Frozen -> { - error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") - } + DefaultSelectorScopeState.Closed -> { + error("This scope is closed") + } - DefaultSelectorScopeState.Init -> { - error("Cannot be here!") - } - } + is DefaultSelectorScopeState.Frozen -> { + error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") + } - if (stateRef.compareAndSet(expect = state, update = updated)) { - // CAS success - return result - } + DefaultSelectorScopeState.Init -> { + error("Cannot be here!") } } + + if (stateRef.compareAndSet(expect = state, update = updated)) { + // CAS success + return + } } } private fun onCompleteSelectedFlow(index: Int) { - while (true) { - val state = stateRef.value - + stateRef.loop { state -> val updated = when (state) { DefaultSelectorScopeState.Init -> { error("Cannot be here!") From 682a8eee2adc77da1f633f5075ced1fc36fde59e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sat, 18 Nov 2023 18:38:02 +0700 Subject: [PATCH 8/9] remove debug --- .../hoc081098/flowext/publishWithSelector.kt | 109 +++++++----------- 1 file changed, 39 insertions(+), 70 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index d4528c0b..ef0538c4 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -156,10 +156,10 @@ private class SimpleLazy( private typealias SimpleSuspendLazyOfFlow = SimpleSuspendLazy> @FlowExtPreview -private sealed interface DefaultSelectorScopeState { - data object Init : DefaultSelectorScopeState +private sealed interface SelectorScopeState { + data object Init : SelectorScopeState - sealed interface NotFrozen : DefaultSelectorScopeState { + sealed interface NotFrozen : SelectorScopeState { val blocks: List> data class InSelectClause( @@ -175,42 +175,11 @@ private sealed interface DefaultSelectorScopeState { val selectedFlowsAndChannels: SimpleLazy, List>>>, val completedCount: Int, val blocks: List>, - ) : DefaultSelectorScopeState + ) : SelectorScopeState - data object Closed : DefaultSelectorScopeState + data object Closed : SelectorScopeState } -@OptIn(FlowExtPreview::class) -private val DefaultSelectorScopeState.debug: String - get() = when (this) { - DefaultSelectorScopeState.Closed -> toString() - DefaultSelectorScopeState.Init -> toString() - is DefaultSelectorScopeState.Frozen -> { - val orNull = selectedFlowsAndChannels.getOrNull() - """ - | Frozen( - | selectedFlowsAndChannels = ${orNull?.first?.size to orNull?.second?.size}, - | completedCount = $completedCount, - | blocks = ${blocks.size} - | ) - """.trimMargin() - } - - is DefaultSelectorScopeState.NotFrozen.InSelectClause -> - """ - | NotFrozen.InSelectClause( - | blocks = ${blocks.size} - | ) - """.trimMargin() - - is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> - """ - | NotFrozen.NotInSelectClause( - | blocks = ${blocks.size} - | ) - """.trimMargin() - } - @FlowExtPreview @OptIn(DelicateCoroutinesApi::class) private class DefaultSelectorScope( @@ -218,33 +187,33 @@ private class DefaultSelectorScope( ) : SelectorScope, SelectorSharedFlowScope { @JvmField - val stateRef = AtomicRef>(DefaultSelectorScopeState.Init) + val stateRef = AtomicRef>(SelectorScopeState.Init) override fun select(block: SelectorFunction): Flow { println("call select with block: $block") stateRef.loop { state -> val updated = when (state) { - DefaultSelectorScopeState.Closed -> { + SelectorScopeState.Closed -> { error("This scope is closed") } - is DefaultSelectorScopeState.Frozen -> { + is SelectorScopeState.Frozen -> { error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") } - DefaultSelectorScopeState.Init -> { + SelectorScopeState.Init -> { // Ok, lets transition to NotFrozen.InSelectClause - DefaultSelectorScopeState.NotFrozen.InSelectClause(blocks = listOf(block)) + SelectorScopeState.NotFrozen.InSelectClause(blocks = listOf(block)) } - is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + is SelectorScopeState.NotFrozen.InSelectClause -> { error("`select` can not be called inside another `select`") } - is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + is SelectorScopeState.NotFrozen.NotInSelectClause -> { // Ok, lets transition to NotFrozen.InSelectClause - DefaultSelectorScopeState.NotFrozen.InSelectClause(blocks = state.blocks + block) + SelectorScopeState.NotFrozen.InSelectClause(blocks = state.blocks + block) } } @@ -256,7 +225,7 @@ private class DefaultSelectorScope( // Only frozen state can reach here, // that means we collect the output flow after frozen this scope val stateWhenCollecting = stateRef.value - check(stateWhenCollecting is DefaultSelectorScopeState.Frozen) { "only frozen state can reach here!" } + check(stateWhenCollecting is SelectorScopeState.Frozen) { "only frozen state can reach here!" } @Suppress("UNCHECKED_CAST") // We know that the type is correct stateWhenCollecting @@ -277,25 +246,25 @@ private class DefaultSelectorScope( private fun transitionToNotInSelectClause() { stateRef.loop { state -> val updated = when (state) { - is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + is SelectorScopeState.NotFrozen.InSelectClause -> { // Ok, lets transition to NotFrozen.NotInSelectClause - DefaultSelectorScopeState.NotFrozen.NotInSelectClause(blocks = state.blocks) + SelectorScopeState.NotFrozen.NotInSelectClause(blocks = state.blocks) } - is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + is SelectorScopeState.NotFrozen.NotInSelectClause -> { // Ok, state already is NotFrozen.NotInSelectClause return } - DefaultSelectorScopeState.Closed -> { + SelectorScopeState.Closed -> { error("This scope is closed") } - is DefaultSelectorScopeState.Frozen -> { + is SelectorScopeState.Frozen -> { error("This scope is frozen. `select` only can be called inside `publish`, do not use `SelectorScope` outside `publish`") } - DefaultSelectorScopeState.Init -> { + SelectorScopeState.Init -> { error("Cannot be here!") } } @@ -310,29 +279,29 @@ private class DefaultSelectorScope( private fun onCompleteSelectedFlow(index: Int) { stateRef.loop { state -> val updated = when (state) { - DefaultSelectorScopeState.Init -> { + SelectorScopeState.Init -> { error("Cannot be here!") } - is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + is SelectorScopeState.NotFrozen.InSelectClause -> { error("Cannot be here!") } - is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + is SelectorScopeState.NotFrozen.NotInSelectClause -> { error("Cannot be here!") } - is DefaultSelectorScopeState.Frozen -> { + is SelectorScopeState.Frozen -> { if (state.completedCount == state.blocks.size) { // Ok, all output flows are completed. Lets transition to DefaultSelectorScopeState.Closed - DefaultSelectorScopeState.Closed + SelectorScopeState.Closed } else { // Ok, lets transition to DefaultSelectorScopeState.Frozen with completedCount=completedCount + 1 state.copy(completedCount = state.completedCount + 1) } } - DefaultSelectorScopeState.Closed -> { + SelectorScopeState.Closed -> { // Ok, already closed. Do nothing. return } @@ -342,13 +311,13 @@ private class DefaultSelectorScope( // CAS success println( - "onCompleteSelectedFlow: completedCount = ${(updated as? DefaultSelectorScopeState.Frozen)?.completedCount}, " + + "onCompleteSelectedFlow: completedCount = ${(updated as? SelectorScopeState.Frozen)?.completedCount}, " + "size = ${state.blocks.size}, " + "(index = $index)", ) // Once state reaches DefaultSelectorScopeState.Closed, we can clear unused lazy - if (updated is DefaultSelectorScopeState.Closed) { + if (updated is SelectorScopeState.Closed) { state.selectedFlowsAndChannels.run { getOrNull()?.first?.forEach { it.clear() } clear() @@ -373,15 +342,15 @@ private class DefaultSelectorScope( stateRef.loop { state -> // Transition from NotFrozen to Frozen when (state) { - DefaultSelectorScopeState.Init -> { + SelectorScopeState.Init -> { error("Not implemented") } - is DefaultSelectorScopeState.NotFrozen -> { + is SelectorScopeState.NotFrozen -> { // Freeze and init } - is DefaultSelectorScopeState.Frozen, DefaultSelectorScopeState.Closed -> { + is SelectorScopeState.Frozen, SelectorScopeState.Closed -> { // Already frozen or closed return } @@ -389,7 +358,7 @@ private class DefaultSelectorScope( val blocks = state.blocks val size = blocks.size - val updated = DefaultSelectorScopeState.Frozen( + val updated = SelectorScopeState.Frozen( selectedFlowsAndChannels = simpleLazyOf { val channels = List(size) { Channel() } @@ -416,7 +385,7 @@ private class DefaultSelectorScope( suspend fun send(value: T) { println("send: $value") - val state = stateRef.value as? DefaultSelectorScopeState.Frozen ?: return + val state = stateRef.value as? SelectorScopeState.Frozen ?: return val channels = state .selectedFlowsAndChannels .getValue() @@ -438,24 +407,24 @@ private class DefaultSelectorScope( private fun transitionToClosed(action: (Channel) -> Unit) { stateRef.loop { state -> val updated = when (state) { - DefaultSelectorScopeState.Init -> { + SelectorScopeState.Init -> { error("Cannot be here!") } - is DefaultSelectorScopeState.NotFrozen.InSelectClause -> { + is SelectorScopeState.NotFrozen.InSelectClause -> { error("Cannot be here!") } - is DefaultSelectorScopeState.NotFrozen.NotInSelectClause -> { + is SelectorScopeState.NotFrozen.NotInSelectClause -> { error("Cannot be here!") } - is DefaultSelectorScopeState.Frozen -> { + is SelectorScopeState.Frozen -> { // Ok, lets transition to DefaultSelectorScopeState.Closed - DefaultSelectorScopeState.Closed + SelectorScopeState.Closed } - DefaultSelectorScopeState.Closed -> { + SelectorScopeState.Closed -> { // Ok, already closed. Do nothing. return } From bf29d751d544e5c835a414e63bec76eeb9b765c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sat, 18 Nov 2023 18:45:00 +0700 Subject: [PATCH 9/9] extract file --- .../hoc081098/flowext/internal/SimpleLazy.kt | 61 +++++++++++++++ .../flowext/internal/SimpleSuspendLazy.kt | 59 ++++++++++++++ .../hoc081098/flowext/publishWithSelector.kt | 76 +++---------------- 3 files changed, 132 insertions(+), 64 deletions(-) create mode 100644 src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt create mode 100644 src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt new file mode 100644 index 00000000..92dcdbd3 --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:Suppress("ktlint:standard:property-naming") + +package com.hoc081098.flowext.internal + +import kotlin.concurrent.Volatile +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.internal.SynchronizedObject +import kotlinx.coroutines.internal.synchronized + +// TODO: Remove SynchronizedObject +@OptIn(InternalCoroutinesApi::class) +internal class SimpleLazy( + initializer: () -> T, +) : SynchronizedObject() { + private var _initializer: (() -> T)? = initializer + + @Volatile + private var value: T? = null + + fun getValue(): T = + value ?: synchronized(this) { + value ?: _initializer!!().also { + _initializer = null + value = it + } + } + + fun getOrNull(): T? = value + + fun clear() { + _initializer = null + value = null + } +} + +internal fun simpleLazyOf(initializer: () -> T): SimpleLazy = + SimpleLazy(initializer) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt new file mode 100644 index 00000000..9a6ae23b --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleSuspendLazy.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2021-2023 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:Suppress("ktlint:standard:property-naming") + +package com.hoc081098.flowext.internal + +import kotlin.concurrent.Volatile +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +internal class SimpleSuspendLazy( + initializer: suspend () -> T, +) { + private val mutex = Mutex() + + @Volatile + private var _initializer: (suspend () -> T)? = initializer + + @Volatile + private var value: T? = null + + suspend fun getValue(): T = + value ?: mutex.withLock { + value ?: _initializer!!().also { + _initializer = null + value = it + } + } + + fun clear() { + _initializer = null + value = null + } +} + +internal fun simpleSuspendLazy(initializer: suspend () -> T): SimpleSuspendLazy = + SimpleSuspendLazy(initializer) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index ef0538c4..722db892 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -22,19 +22,19 @@ * SOFTWARE. */ -@file:Suppress("ktlint:standard:property-naming") - package com.hoc081098.flowext import com.hoc081098.flowext.internal.AtomicRef +import com.hoc081098.flowext.internal.SimpleLazy +import com.hoc081098.flowext.internal.SimpleSuspendLazy import com.hoc081098.flowext.internal.loop -import kotlin.concurrent.Volatile +import com.hoc081098.flowext.internal.simpleLazyOf +import com.hoc081098.flowext.internal.simpleSuspendLazy import kotlin.jvm.JvmField import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay @@ -54,11 +54,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.internal.SynchronizedObject -import kotlinx.coroutines.internal.synchronized import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock @FlowExtPreview @DslMarker @@ -100,60 +96,7 @@ public sealed interface SelectorScope { public fun select(block: SelectorFunction): Flow } -private class SimpleSuspendLazy( - initializer: suspend () -> T, -) { - private val mutex = Mutex() - - private var _initializer: (suspend () -> T)? = initializer - - @Volatile - private var value: T? = null - - suspend fun getValue(): T = - value ?: mutex.withLock { - value ?: _initializer!!().also { - _initializer = null - value = it - } - } - - fun clear() { - _initializer = null - value = null - } -} - -private fun simpleLazyOf(initializer: () -> T): SimpleLazy = - SimpleLazy(initializer) - -// TODO: Remove SynchronizedObject -@OptIn(InternalCoroutinesApi::class) -private class SimpleLazy( - initializer: () -> T, -) : SynchronizedObject() { - private var _initializer: (() -> T)? = initializer - - @Volatile - private var value: T? = null - - fun getValue(): T = - value ?: synchronized(this) { - value ?: _initializer!!().also { - _initializer = null - value = it - } - } - - fun getOrNull(): T? = value - - fun clear() { - _initializer = null - value = null - } -} - -private typealias SimpleSuspendLazyOfFlow = SimpleSuspendLazy> +private typealias SimpleSuspendLazyOfAnyFlow = SimpleSuspendLazy> @FlowExtPreview private sealed interface SelectorScopeState { @@ -172,7 +115,12 @@ private sealed interface SelectorScopeState { } data class Frozen( - val selectedFlowsAndChannels: SimpleLazy, List>>>, + val selectedFlowsAndChannels: SimpleLazy< + Pair< + List, + List>, + >, + >, val completedCount: Int, val blocks: List>, ) : SelectorScopeState @@ -365,7 +313,7 @@ private class DefaultSelectorScope( List(size) { index -> val block = blocks[index] val flow = channels[index].consumeAsFlow() - SimpleSuspendLazy { this.block(flow) } + simpleSuspendLazy { this.block(flow) } } to channels }, completedCount = 0,