-
-
Notifications
You must be signed in to change notification settings - Fork 27
Publish with selector #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #189 +/- ##
===========================================
- Coverage 87.91% 68.43% -19.49%
===========================================
Files 37 37
Lines 811 1020 +209
Branches 110 154 +44
===========================================
- Hits 713 698 -15
- Misses 53 273 +220
- Partials 45 49 +4 ☔ View full report in Codecov by Sentry. |
…blishWithSelector
…blishWithSelector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a "publish with selector" functionality that allows for advanced flow transformation using a selector-based pattern. The implementation enables multiple flows to be derived from a single source flow with shared subscription and selective processing.
- Implements
publishfunction with selector scope for creating multiple derived flows from a single source - Adds comprehensive state management for selector scopes with thread-safe operations
- Provides specialized DSL annotations and interfaces for type-safe selector operations
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| publishWithSelector.kt | Main implementation with publish function, selector scopes, state management, and example usage |
| SimpleSuspendLazy.kt | Thread-safe lazy initialization for suspend functions with cleanup capabilities |
| SimpleLazy.kt | Thread-safe lazy initialization for regular functions with cleanup capabilities |
| AtomicRef.kt | Atomic reference utilities with loop and update operations |
| FlowExt.api | API definitions for the new publish selector functionality |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| val stateRef = AtomicRef<SelectorScopeState<T>>(SelectorScopeState.Init) | ||
|
|
||
| override fun <R> select(block: SelectorFunction<T, R>): Flow<R> { | ||
| println("call select with block: $block") |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("call select with block: $block") |
| println( | ||
| "onCompleteSelectedFlow: completedCount = ${(updated as? SelectorScopeState.Frozen)?.completedCount}, " + | ||
| "size = ${state.blocks.size}, " + | ||
| "(index = $index)", | ||
| ) |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statements should be removed from production code. Consider using a proper logging framework or removing these debug outputs.
| clear() | ||
| } | ||
|
|
||
| println("onCompleteSelectedFlow: cancel the publish scope") |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("onCompleteSelectedFlow: cancel the publish scope") |
| } | ||
|
|
||
| suspend fun send(value: T) { | ||
| println("send: $value") |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("send: $value") |
| } | ||
|
|
||
| fun close(e: Throwable?) { | ||
| println("close: $e") |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("close: $e") |
| println("cancel: $e") | ||
| transitionToClosed { it.cancel(e) } | ||
| } | ||
| } |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("cancel: $e") | |
| transitionToClosed { it.cancel(e) } | |
| } | |
| } | |
| transitionToClosed { it.cancel(e) } | |
| } | |
| } | |
| } |
| @OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) | ||
| public suspend fun main() { | ||
| flow<Any?> { | ||
| println("Collect...") | ||
| delay(100) | ||
| emit(1) | ||
| delay(100) | ||
| emit(2) | ||
| delay(100) | ||
| emit(3) | ||
| delay(100) | ||
| emit("4") | ||
| }.onEach { println(">>> onEach: $it") } | ||
| .publish { | ||
| delay(100) | ||
|
|
||
| merge( | ||
| select { flow -> | ||
| delay(1) | ||
| val sharedFlow = flow.shareIn() | ||
|
|
||
| interval(0, 100) | ||
| .onEach { println(">>> interval: $it") } | ||
| .flatMapMerge { value -> | ||
| timer(value, 50) | ||
| .withLatestFrom(sharedFlow) | ||
| .map { it to "shared" } | ||
| }.takeUntil(sharedFlow.filter { it == 3 }) | ||
| }, | ||
| select { flow -> | ||
| flow | ||
| .filterIsInstance<Int>() | ||
| .filter { it % 2 == 0 } | ||
| .map { it to "even" } | ||
| .take(1) | ||
| }, | ||
| select { flow -> | ||
| flow | ||
| .filterIsInstance<Int>() | ||
| .filter { it % 2 != 0 } | ||
| .map { it to "odd" } | ||
| .take(1) | ||
| }, | ||
| select { flow -> | ||
| flow | ||
| .filterIsInstance<String>() | ||
| .map { it to "string" } | ||
| .take(1) | ||
| }, | ||
| ) | ||
| } | ||
| .toList() | ||
| .also { println(it) } | ||
| .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } | ||
| } |
Copilot
AI
Aug 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main function appears to be a test or example and should not be included in production library code. Consider moving this to a test file or removing it entirely.
| @OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) | |
| public suspend fun main() { | |
| flow<Any?> { | |
| println("Collect...") | |
| delay(100) | |
| emit(1) | |
| delay(100) | |
| emit(2) | |
| delay(100) | |
| emit(3) | |
| delay(100) | |
| emit("4") | |
| }.onEach { println(">>> onEach: $it") } | |
| .publish { | |
| delay(100) | |
| merge( | |
| select { flow -> | |
| delay(1) | |
| val sharedFlow = flow.shareIn() | |
| interval(0, 100) | |
| .onEach { println(">>> interval: $it") } | |
| .flatMapMerge { value -> | |
| timer(value, 50) | |
| .withLatestFrom(sharedFlow) | |
| .map { it to "shared" } | |
| }.takeUntil(sharedFlow.filter { it == 3 }) | |
| }, | |
| select { flow -> | |
| flow | |
| .filterIsInstance<Int>() | |
| .filter { it % 2 == 0 } | |
| .map { it to "even" } | |
| .take(1) | |
| }, | |
| select { flow -> | |
| flow | |
| .filterIsInstance<Int>() | |
| .filter { it % 2 != 0 } | |
| .map { it to "odd" } | |
| .take(1) | |
| }, | |
| select { flow -> | |
| flow | |
| .filterIsInstance<String>() | |
| .map { it to "string" } | |
| .take(1) | |
| }, | |
| ) | |
| } | |
| .toList() | |
| .also { println(it) } | |
| .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } | |
| } |
|
@copilot review PR |
|
@codex review PR AND GIVE FEEDBACKS |
Review Summary
TestingNo tests were run. |
Status
READY/IN DEVELOPMENT/HOLD
Breaking Changes
YES | NO
Description
Type of Change
Requirements