Skip to content

Conversation

@murfel
Copy link
Contributor

@murfel murfel commented Oct 16, 2025

No description provided.

@murfel
Copy link
Contributor Author

murfel commented Oct 16, 2025

It does perform unexpectedly badly, could you all take a look for any silly mistakes?

The simplest benchmark, sending X elements only (not receiving anything), is already pretty sad:

ChannelBenchmark.sendUnlimited                1000  avgt   10  0.008 ±  0.001  ms/op
ChannelBenchmark.sendUnlimited               10000  avgt   10  0.080 ±  0.001  ms/op
ChannelBenchmark.sendUnlimited              100000  avgt   10  0.800 ±  0.005  ms/op

Full output for the first three counts (4KB, 40KB, 400KB) of Ints (just FYI, no reason to look at this, since the snippet above is already bad enough)

Benchmark                                  (count)  Mode  Cnt  Score    Error  Units
ChannelBenchmark.manySendersManyReceivers     1000  avgt   10  0.119 ±  0.001  ms/op
ChannelBenchmark.manySendersManyReceivers    10000  avgt   10  0.900 ±  0.006  ms/op
ChannelBenchmark.manySendersManyReceivers   100000  avgt   10  9.244 ±  0.418  ms/op
ChannelBenchmark.manySendersOneReceiver       1000  avgt   10  0.108 ±  0.001  ms/op
ChannelBenchmark.manySendersOneReceiver      10000  avgt   10  0.713 ±  0.042  ms/op
ChannelBenchmark.manySendersOneReceiver     100000  avgt   10  7.010 ±  0.115  ms/op
ChannelBenchmark.oneSenderManyReceivers       1000  avgt   10  0.138 ±  0.001  ms/op
ChannelBenchmark.oneSenderManyReceivers      10000  avgt   10  0.923 ±  0.003  ms/op
ChannelBenchmark.oneSenderManyReceivers     100000  avgt   10  8.411 ±  0.035  ms/op
ChannelBenchmark.sendConflated                1000  avgt   10  0.020 ±  0.001  ms/op
ChannelBenchmark.sendConflated               10000  avgt   10  0.187 ±  0.007  ms/op
ChannelBenchmark.sendConflated              100000  avgt   10  1.834 ±  0.013  ms/op
ChannelBenchmark.sendReceiveConflated         1000  avgt   10  0.039 ±  0.001  ms/op
ChannelBenchmark.sendReceiveConflated        10000  avgt   10  0.236 ±  0.009  ms/op
ChannelBenchmark.sendReceiveConflated       100000  avgt   10  1.906 ±  0.019  ms/op
ChannelBenchmark.sendReceiveRendezvous        1000  avgt   10  0.103 ±  0.001  ms/op
ChannelBenchmark.sendReceiveRendezvous       10000  avgt   10  0.866 ±  0.021  ms/op
ChannelBenchmark.sendReceiveRendezvous      100000  avgt   10  8.270 ±  0.071  ms/op
ChannelBenchmark.sendReceiveUnlimited         1000  avgt   10  0.077 ±  0.002  ms/op
ChannelBenchmark.sendReceiveUnlimited        10000  avgt   10  0.419 ±  0.005  ms/op
ChannelBenchmark.sendReceiveUnlimited       100000  avgt   10  3.443 ±  0.061  ms/op
ChannelBenchmark.sendUnlimited                1000  avgt   10  0.008 ±  0.001  ms/op
ChannelBenchmark.sendUnlimited               10000  avgt   10  0.080 ±  0.001  ms/op
ChannelBenchmark.sendUnlimited              100000  avgt   10  0.800 ±  0.005  ms/op

@murfel murfel marked this pull request as draft October 16, 2025 15:59
}

private suspend fun send(count: Int, channel: Channel<Int>) = coroutineScope {
list.take(count).forEach { channel.send(it) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list.take(count) copies count elements to a new list, allocating a lot of new memory. I'd expect it to make a noticeable contribution to the runtime.

@fzhinkin
Copy link
Contributor

Just for the record, we discussed benchmarks with @murfel offline and she'll rework them.

@murfel
Copy link
Contributor Author

murfel commented Nov 4, 2025

Ran (on freshly restarted macbook, without any apps open but the terminal and system monitor)
java -jar benchmarks.jar ".ChannelBenchmark." -p count=1000,100000000 -p prefill=0,1000000,100000000

# Run complete. Total time: 00:37:21

Benchmark                                    (count)  (prefill)  Mode  Cnt     Score      Error  Units
ChannelBenchmark.manySendersManyReceivers       1000          0  avgt   10     0.113 ±    0.001  ms/op
ChannelBenchmark.manySendersManyReceivers       1000    1000000  avgt   10     0.118 ±    0.007  ms/op
ChannelBenchmark.manySendersManyReceivers       1000  100000000  avgt   10     0.357 ±    0.055  ms/op
ChannelBenchmark.manySendersManyReceivers  100000000          0  avgt   10  8796.152 ±  543.292  ms/op
ChannelBenchmark.manySendersManyReceivers  100000000    1000000  avgt   10  8683.527 ±  254.436  ms/op
ChannelBenchmark.manySendersManyReceivers  100000000  100000000  avgt   10  9434.746 ±  310.576  ms/op
ChannelBenchmark.manySendersOneReceiver         1000          0  avgt   10     0.084 ±    0.002  ms/op
ChannelBenchmark.manySendersOneReceiver         1000    1000000  avgt   10     0.068 ±    0.001  ms/op
ChannelBenchmark.manySendersOneReceiver         1000  100000000  avgt   10     0.327 ±    0.043  ms/op
ChannelBenchmark.manySendersOneReceiver    100000000          0  avgt   10  6759.587 ± 1126.828  ms/op
ChannelBenchmark.manySendersOneReceiver    100000000    1000000  avgt   10  6730.408 ±  112.128  ms/op
ChannelBenchmark.manySendersOneReceiver    100000000  100000000  avgt   10  6222.171 ±  256.355  ms/op
ChannelBenchmark.oneSenderManyReceivers         1000          0  avgt   10     0.119 ±    0.003  ms/op
ChannelBenchmark.oneSenderManyReceivers         1000    1000000  avgt   10     0.121 ±    0.003  ms/op
ChannelBenchmark.oneSenderManyReceivers         1000  100000000  avgt   10     0.353 ±    0.065  ms/op
ChannelBenchmark.oneSenderManyReceivers    100000000          0  avgt   10  8785.786 ±  567.569  ms/op
ChannelBenchmark.oneSenderManyReceivers    100000000    1000000  avgt   10  8698.243 ±  517.566  ms/op
ChannelBenchmark.oneSenderManyReceivers    100000000  100000000  avgt   10  8594.145 ±  416.015  ms/op
ChannelBenchmark.sendConflated                  1000        N/A  avgt   10     0.017 ±    0.001  ms/op
ChannelBenchmark.sendConflated             100000000        N/A  avgt   10  1504.701 ±   27.829  ms/op
ChannelBenchmark.sendReceiveConflated           1000        N/A  avgt   10     0.037 ±    0.001  ms/op
ChannelBenchmark.sendReceiveConflated      100000000        N/A  avgt   10  1722.869 ±   85.603  ms/op
ChannelBenchmark.sendReceiveRendezvous          1000        N/A  avgt   10     0.122 ±    0.018  ms/op
ChannelBenchmark.sendReceiveRendezvous     100000000        N/A  avgt   10  7300.491 ±  107.318  ms/op
ChannelBenchmark.sendReceiveUnlimited           1000          0  avgt   10     0.057 ±    0.002  ms/op
ChannelBenchmark.sendReceiveUnlimited           1000    1000000  avgt   10     0.056 ±    0.003  ms/op
ChannelBenchmark.sendReceiveUnlimited           1000  100000000  avgt   10     0.314 ±    0.038  ms/op
ChannelBenchmark.sendReceiveUnlimited      100000000          0  avgt   10  3645.250 ±  658.235  ms/op
ChannelBenchmark.sendReceiveUnlimited      100000000    1000000  avgt   10  3192.487 ±  372.223  ms/op
ChannelBenchmark.sendReceiveUnlimited      100000000  100000000  avgt   10  3965.029 ±  386.913  ms/op
ChannelBenchmark.sendUnlimited                  1000        N/A  avgt   10     0.006 ±    0.001  ms/op
ChannelBenchmark.sendUnlimited             100000000        N/A  avgt   10  1157.710 ±  248.811  ms/op

@murfel murfel marked this pull request as ready for review November 4, 2025 13:24
@murfel murfel requested a review from dkhalanskyjb November 4, 2025 13:24
@murfel murfel changed the title [Draft] Add channel benchmarks Add channel benchmarks Nov 4, 2025
@murfel
Copy link
Contributor Author

murfel commented Nov 4, 2025

Quick normalisation with ChatGPT

Produce the same table but divide the Score column [and the Error column] by the count column and Change to ns/op/element (https://chatgpt.com/share/e/690a0281-e828-800b-8895-144ecc4e07f3)

(Will do a proper Notebook for a JSON benchmark output after we agree on the benchmark correctness. Forgot to save this one as JSON and it takes 40 min to re-run.)

# Run complete. Total time: 00:37:21

Benchmark                                    (count)  (prefill)  Mode  Cnt     Score          Error        Units
ChannelBenchmark.manySendersManyReceivers       1000          0  avgt   10     113.000 ±     1.000  ns/op/element
ChannelBenchmark.manySendersManyReceivers       1000    1000000  avgt   10     118.000 ±     7.000  ns/op/element
ChannelBenchmark.manySendersManyReceivers       1000  100000000  avgt   10     357.000 ±    55.000  ns/op/element
ChannelBenchmark.manySendersManyReceivers  100000000          0  avgt   10      87.962 ±     5.433  ns/op/element
ChannelBenchmark.manySendersManyReceivers  100000000    1000000  avgt   10      86.835 ±     2.544  ns/op/element
ChannelBenchmark.manySendersManyReceivers  100000000  100000000  avgt   10      94.347 ±     3.106  ns/op/element
ChannelBenchmark.manySendersOneReceiver         1000          0  avgt   10      84.000 ±     2.000  ns/op/element
ChannelBenchmark.manySendersOneReceiver         1000    1000000  avgt   10      68.000 ±     1.000  ns/op/element
ChannelBenchmark.manySendersOneReceiver         1000  100000000  avgt   10     327.000 ±    43.000  ns/op/element
ChannelBenchmark.manySendersOneReceiver    100000000          0  avgt   10      67.596 ±    11.268  ns/op/element
ChannelBenchmark.manySendersOneReceiver    100000000    1000000  avgt   10      67.304 ±     1.121  ns/op/element
ChannelBenchmark.manySendersOneReceiver    100000000  100000000  avgt   10      62.222 ±     2.564  ns/op/element
ChannelBenchmark.oneSenderManyReceivers         1000          0  avgt   10     119.000 ±     3.000  ns/op/element
ChannelBenchmark.oneSenderManyReceivers         1000    1000000  avgt   10     121.000 ±     3.000  ns/op/element
ChannelBenchmark.oneSenderManyReceivers         1000  100000000  avgt   10     353.000 ±    65.000  ns/op/element
ChannelBenchmark.oneSenderManyReceivers    100000000          0  avgt   10      87.858 ±     5.676  ns/op/element
ChannelBenchmark.oneSenderManyReceivers    100000000    1000000  avgt   10      86.982 ±     5.176  ns/op/element
ChannelBenchmark.oneSenderManyReceivers    100000000  100000000  avgt   10      85.941 ±     4.160  ns/op/element
ChannelBenchmark.sendConflated                  1000        N/A  avgt   10      17.000 ±     1.000  ns/op/element
ChannelBenchmark.sendConflated             100000000        N/A  avgt   10      15.047 ±     0.278  ns/op/element
ChannelBenchmark.sendReceiveConflated           1000        N/A  avgt   10      37.000 ±     1.000  ns/op/element
ChannelBenchmark.sendReceiveConflated      100000000        N/A  avgt   10      17.229 ±     0.856  ns/op/element
ChannelBenchmark.sendReceiveRendezvous          1000        N/A  avgt   10     122.000 ±    18.000  ns/op/element
ChannelBenchmark.sendReceiveRendezvous     100000000        N/A  avgt   10      73.005 ±     1.073  ns/op/element
ChannelBenchmark.sendReceiveUnlimited           1000          0  avgt   10      57.000 ±     2.000  ns/op/element
ChannelBenchmark.sendReceiveUnlimited           1000    1000000  avgt   10      56.000 ±     3.000  ns/op/element
ChannelBenchmark.sendReceiveUnlimited           1000  100000000  avgt   10     314.000 ±    38.000  ns/op/element
ChannelBenchmark.sendReceiveUnlimited      100000000          0  avgt   10      36.453 ±     6.582  ns/op/element
ChannelBenchmark.sendReceiveUnlimited      100000000    1000000  avgt   10      31.925 ±     3.722  ns/op/element
ChannelBenchmark.sendReceiveUnlimited      100000000  100000000  avgt   10      39.651 ±     3.869  ns/op/element
ChannelBenchmark.sendUnlimited                  1000        N/A  avgt   10       6.000 ±     1.000  ns/op/element
ChannelBenchmark.sendUnlimited             100000000        N/A  avgt   10      11.577 ±     2.488  ns/op/element

repeat(maxCount) { add(it) }
}

@Setup(Level.Invocation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it has to be done before every benchmark function invocation and not once per trial / iteration?

JFTR, https://github.com/openjdk/jmh/blob/2a316030b509aa9874dd6ab04e21962ac92cd634/jmh-core/src/main/java/org/openjdk/jmh/annotations/Level.java#L85

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a tradeoff. After each invocation, there could be a little extra items in the channel, which can accumulate with iterations. I can rewrite runSendReceive to leave channel with the same number of elements as it came in with, but then it will slightly affect the benchmark. Possibly negligible, since it's only up to 4 items each time...

Comment on lines 131 to 136
if (receiveAll) {
channel.forEach { }
} else {
repeat(countPerReceiverAtLeast) {
channel.receive()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to send received values into a blackhole (i.e. consume them).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, but shall we save our instructions on that, since it works for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get it. What you're trying to save on?

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
@Fork(1)
open class ChannelBenchmark {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate what exactly you're trying to measure using these benchmarks?
Right now, it looks like "time required to create a new channel, send N messages into it (and, optionally, receive them), and then close the channel". However, I thought that initial idea was to measure the latency of sending (and receiving) a single message into the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

measure the latency of sending (and receiving) a single message into the channel

I do measure that, indirectly. Do you suggest to literally only send/receive one message per benchmark? Is that reliable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct measurements are always better than indirect. If the goal is to measure send/recv timing, let's measure it.

What makes you think it will be unreliable?

Copy link
Contributor Author

@murfel murfel Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct measurements are always better than indirect.

Not always. See below. Also depends on how you define "better".

If the goal is to measure send/recv timing, let's measure it.

Again, I am measuring that. My way of measuring is a valid way of measuring. Having to assert that makes me feel dismissed.

We can explore other ways to measure, for sure.

What makes you think it will be unreliable?

  1. Overhead of the measuring setup could be greater than the effect measured.
  2. Yet simplified setup might not capture a typical usage.
  3. Does not average over data structure amortization (e.g. our sent element could be the element which triggers the channel's internal data structure doubling / allocation) (or, on the contrary, the constant from amortization could be noticeable and we do in fact want to measure it)
  4. Does not average over GC

What setup did you have in mind, something like this?

@Benchmark
fun sendReceiveUnlimitedPrefilledSequential(wrapper: UnlimitedChannelWrapper, blackhole: Blackhole) =
    runBlocking {
        wrapper.channel.send(42)
        blackhole.consume(wrapper.channel.receive())
    }
ChannelBenchmark.sendReceiveUnlimitedPrefilledSequential         0          0  avgt   10  53.959 ±  0.168  ns/op
ChannelBenchmark.sendReceiveUnlimitedPrefilledSequential         0    1000000  avgt   10  60.069 ±  1.345  ns/op
ChannelBenchmark.sendReceiveUnlimitedPrefilledSequential         0  100000000  avgt   10  71.457 ± 13.101  ns/op

Or this? (no suspension, trySend/tryReceive)

@Benchmark
fun sendReceiveUnlimitedPrefilledSequentialNoSuspension(wrapper: UnlimitedChannelWrapper, blackhole: Blackhole) {
    wrapper.channel.trySend(42)
    blackhole.consume(wrapper.channel.tryReceive().getOrThrow())
}
Benchmark                                                  (count)  (prefill)  Mode  Cnt   Score   Error  Units
ChannelBenchmark.sendReceiveUnlimitedPrefilledNoSuspension        0          0  avgt   10  10.619 ± 0.270  ns/op
ChannelBenchmark.sendReceiveUnlimitedPrefilledNoSuspension        0    1000000  avgt   10  10.859 ± 0.330  ns/op
ChannelBenchmark.sendReceiveUnlimitedPrefilledNoSuspension        0  100000000  avgt   10  17.163 ± 1.523  ns/op

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal structure of the channel may be worth taking into account. For a prefilled channel with 32+ elements (32 is the default channel segment size), we can expect send and receive not to interact with one another at all, that is, the duration of send followed by receive should be roughly the sum of durations of send and receive, invoked independently. I imagine wrapper.channel.send(42) and blackhole.consume(wrapper.channel.receive()) could stay in different benchmarks without affecting the results too much.

For an empty channel, we could also try racing send and receive.

Using runBlocking in a benchmark that's only doing send doesn't seem optimal to me, I can imagine the run time getting dominated by the runBlocking machinery. I don't know what the proper way of doing this in JMH is, but I'd try a scheme like this:

internal class BenchmarkSynchronization() {
    private val state = AtomicInteger(0)
    private val benchmarkThread = Thread.currentThread()
    private val threadDoingWork = AtomicReference<Thread?>()
    
    fun awaitThreadAssignment(): Thread {
        assert(Thread.currentThread() === benchmarkThread)
        while (true) {
            val thread = threadDoingWork.get()
            if (thread != null) return thread
            LockSupport.parkNanos(Long.MAX_VALUE)
        }
    }
    
    fun awaitStartSignal() {
        threadDoingWork.set(Thread.currentThread())
        LockSupport.unpark(benchmarkThread)
        while (state.get() == 0) {
            LockSupport.parkNanos(Long.MAX_VALUE)
        }
    }
    
    fun signalFinish() {
        state.set(2)
        LockSupport.unpark(benchmarkThread)
    }

    fun runBenchmark(thread: Thread) {
        state.set(1)
        LockSupport.unpark(thread)
        while (state.get() != 2) {
            LockSupport.parkNanos(Long.MAX_VALUE)
        }
    }
}

(haven't actually tested the code). Then, the scheme would be:

// preparation
val synchronization = BenchmarkSynchronization()
GlobalScope.launch {
    synchronization.awaitStartSignal()
    try {
       // actual benchmark code here
    } finally {
        synchronization.signalFinish()
    }
}
val threadDoingWork = synchronization.awaitThreadAssignment()

// the @Benchmark itself
wrapper.synchronization.runBenchmark(wrapper.threadDoingWork)

@fzhinkin , is there a standard mechanism that encapsulates this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an empty channel, we could also try racing send and receive.

?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running them in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct measurements are always better than indirect.

Not always. See below. Also depends on how you define "better".

I mean, if our goal is to measure a latency of a certain operation and we have facilities to do so, then it's better to do it directly (to an extent, benchmark's results are averages anyway). By doing so, we can ensure that all unnecessary setup and teardown code (like, creating a channel) won't skew results.

On the other hand, if the goal is to measure end-to-end latency, like "time to create a channel and send 100k messages over it", then sure, the current approach works for that (moreover, I don't see how to measure it otherwise).

If the goal is to measure send/recv timing, let's measure it.

Again, I am measuring that. My way of measuring is a valid way of measuring. Having to assert that makes me feel dismissed.

See the comment, above. I was under the impression that the typical use case for channel is to be used indirectly (within a flow, for example), so for channels as they are we decided to measure a latency of a single operation to see how it will be affected by potential changes in the implementation.

I'm not saying that the way you're measuring it is invalid, but if there are facilities to measure latency of a single operation (well, the send-receive pair of operations), I'm voting for using it (unless there is an evidence that such a measurement is impossible or makes no sense).

We can explore other ways to measure, for sure.

What makes you think it will be unreliable?
Overhead of the measuring setup could be greater than the effect measured.

Setup (and teardown) actions performed before (after) the whole run (or an individual iteration) should not affect measurements (as they are performed outside of the measurement scope); it will affect the measurements when performed for each benchmark function invocation.

Yet simplified setup might not capture a typical usage.

I'm not sure if sending 400MB of data is a typical usage either. ;)

Does not average over data structure amortization (e.g. our sent element could be the element which triggers the channel's internal data structure doubling / allocation) (or, on the contrary, the constant from amortization could be noticeable and we do in fact want to measure it)

The benchmark function is continuously invoked over a configured period of time (you set it to 1 second).
If we reuse the same channel in each invocation, results will average over data structure amortization.

Does not average over GC

It's easier to focus on memory footprint as it is something we control directly (how many bytes we're allocating when performing an operation), rather than on GC pauses (they are a subject to various factors).

What setup did you have in mind, something like this?

Both approaches look sane (assuming the wrapper is not recreated for every benchmark call) and we can do both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkhalanskyjb, it feels like I didn't get you, but nevertheless: JMH provides some facilities to running benchmark methods concurrently and synchronize their execution:
https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_15_Asymmetric.java
https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_17_SyncIterations.java

As of runBlocking, it would be nice to have a kx-benchmarks maintainer here, who would solve a problem with benchmarking suspend-API for us. Oh, wait... 😄

Comment on lines 118 to 121
require(senders > 0 && receivers > 0)
// Can be used with more than num cores but needs thinking it through,
// e.g., what would it measure?
require(senders + receivers <= cores)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to include it into measurements? :)

Comment on lines 103 to 105
Channel<Int>(capacity).also {
sendManyItems(count, it)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, style.
From https://kotlinlang.org/docs/scope-functions.html#also:

When you see also in code, you can read it as " and also do the following with the object. "

In my opinion, sending many items to a channel is the main idea, not just something that you "also" do here, so I'd opt into either a form with a variable, like

val channel = Channel<Int>(capacity)
repeat(count) {
    channel.send(list[it])
}

or used let:

Channel<Int>(capacity).let {
    sendManyItems(count, it)
}

Of the two, I prefer the first one.

Comment on lines 95 to 104
private suspend fun sendManyItems(count: Int, channel: Channel<Int>) {
repeat(count) {
// NB: it is `send`, not `trySend`, on purpose, since we are testing the `send` performance here.
channel.send(list[it])
}
}

private suspend fun runSend(count: Int, capacity: Int) {
Channel<Int>(capacity).also {
sendManyItems(count, it)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, style: these two functions don't pass the https://wiki.haskell.org/Fairbairn_threshold for me, so I'd just inline them. Then, even the NB wouldn't be necessary, as it would be clear from the benchmark name that we are testing send.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept runSend

val receiveAll = channel.isEmpty
// send almost `count` items, up to `senders - 1` items will not be sent (negligible)
val countPerSender = count / senders
// for prefilled channel only: up to `receivers - 1` items of the sent items will not be received (negligible)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this.

In total, there will be countPerSender * senders elements sent in while this function is running, so there will be wrapper.prefill + countPerSender * senders elements ultimately sent to the channel. Every receiver will receive floor(countPerSender * senders / receivers) elements, that is, in total, floor(countPerSender * senders / receivers) * receivers will leave the channel, which can leave wrapper.prefill + receivers - 1 elements inside it.

For big enough values of prefill and a small enough count, none of the items sent in runSendReceive will be received.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct in all your statements and it does match my understanding, and what is written*. But what's the problem with that? Don't send less elements than you have receivers.

Nit: the value of prefill doesn't matter, since it doesn't participate in calculations.

*In my comment I disregard the prefill, since we don't touch it. Fixed the wording.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we will leave much more than receivers - 1 of the items sent from inside the function in the channel.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

suspend fun main() {
    val channel = Channel<Int>(100)
    // prefill
    repeat(50) {
        channel.send(it)
    }
    // fill
    repeat(32) {
        channel.send(100 + it)
    }
    // receive
    repeat(30) {
        println(channel.receive())
    }
}

Here, we prefill the channel, then send some items. Of the items sent in the // fill block, not even one will be received. To say "2 of the sent items will not be received" is not true either way: we can either say that 52 of the sent items won't be received (if we count the prefilled ones among those that were sent) or that 32 items won't be (if we don't).

Some ways to say what I think you mean here:

  • "There will be at most receivers - 1 fewer calls to receive than calls to send"
  • "There is a receive call for every send call, except at most receivers - 1 of them"
  • "The benchmark will leave the channel with at most receivers - 1 elements more than there were initially"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean "replace items with the count of items", the same comment you left below? Fixed. Yes, I was careless with wording that.

// Can be used with more than num cores but needs thinking it through,
// e.g., what would it measure?
require(senders + receivers <= cores)
// if the channel is prefilled, do not receive the prefilled items
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prefilled items will be received, as the channel is FIFO, so the way I'd explain the logic I see here is that we only want to receive as many items as there were sent, which in case of a non-prefilled channel means, all the items.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants