@@ -211,6 +211,126 @@ internal expect class UndispatchedCoroutine<in T>(
211211 uCont : Continuation <T >
212212) : ScopeCoroutine<T>
213213
214+ // Used by withContext when context changes, but dispatcher stays the same
215+ internal actual class UndispatchedCoroutine <in T >actual constructor (
216+ context : CoroutineContext ,
217+ uCont : Continuation <T >
218+ ) : ScopeCoroutine<T>(if (context[UndispatchedMarker ] == null) context + UndispatchedMarker else context, uCont) {
219+
220+ /* *
221+ * The state of [ThreadContextElement]s associated with the current undispatched coroutine.
222+ * It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
223+ * See the followin, boiled down example with inlined `withContinuationContext` body:
224+ * ```
225+ * val state = saveThreadContext(ctx)
226+ * try {
227+ * invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
228+ * // COROUTINE_SUSPENDED is returned
229+ * } finally {
230+ * thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
231+ * // and it also calls saveThreadContext and clearThreadContext
232+ * }
233+ * ```
234+ *
235+ * Usage note:
236+ *
237+ * This part of the code is performance-sensitive.
238+ * It is a well-established pattern to wrap various activities into system-specific undispatched
239+ * `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
240+ * undispatched coroutines.
241+ * Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
242+ * that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected
243+ * when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses.
244+ * When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
245+ * start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
246+ * (You can read more about this effect as "GC nepotism").
247+ *
248+ * To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
249+ * - It's never accessed when we are sure there are no thread context elements
250+ * - It's cleaned up via [ThreadLocal.remove] as soon as the coroutine is suspended or finished.
251+ */
252+ private val threadStateToRecover = ThreadLocal <Pair <CoroutineContext , Any ?>>()
253+
254+ /*
255+ * Indicates that a coroutine has at least one thread context element associated with it
256+ * and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
257+ * Better than nullable thread-local for easier debugging.
258+ *
259+ * It is used as a performance optimization to avoid 'threadStateToRecover' initialization
260+ * (note: tl.get() initializes thread local),
261+ * and is prone to false-positives as it is never reset: otherwise
262+ * it may lead to logical data races between suspensions point where
263+ * coroutine is yet being suspended in one thread while already being resumed
264+ * in another.
265+ */
266+ @Volatile
267+ private var threadLocalIsSet = false
268+
269+ init {
270+ /*
271+ * This is a hack for a very specific case in #2930 unless #3253 is implemented.
272+ * 'ThreadLocalStressTest' covers this change properly.
273+ *
274+ * The scenario this change covers is the following:
275+ * 1) The coroutine is being started as plain non kotlinx.coroutines related suspend function,
276+ * e.g. `suspend fun main` or, more importantly, Ktor `SuspendFunGun`, that is invoking
277+ * `withContext(tlElement)` which creates `UndispatchedCoroutine`.
278+ * 2) It (original continuation) is then not wrapped into `DispatchedContinuation` via `intercept()`
279+ * and goes neither through `DC.run` nor through `resumeUndispatchedWith` that both
280+ * do thread context element tracking.
281+ * 3) So thread locals never got chance to get properly set up via `saveThreadContext`,
282+ * but when `withContext` finishes, it attempts to recover thread locals in its `afterResume`.
283+ *
284+ * Here we detect precisely this situation and properly setup context to recover later.
285+ *
286+ */
287+ if (uCont.context[ContinuationInterceptor ] !is CoroutineDispatcher ) {
288+ /*
289+ * We cannot just "read" the elements as there is no such API,
290+ * so we update-restore it immediately and use the intermediate value
291+ * as the initial state, leveraging the fact that thread context element
292+ * is idempotent and such situations are increasingly rare.
293+ */
294+ val values = updateThreadContext(context, null )
295+ restoreThreadContext(context, values)
296+ saveThreadContext(context, values)
297+ }
298+ }
299+
300+ fun saveThreadContext (context : CoroutineContext , oldValue : Any? ) {
301+ threadLocalIsSet = true // Specify that thread-local is touched at all
302+ threadStateToRecover.set(context to oldValue)
303+ }
304+
305+ fun clearThreadContext (): Boolean {
306+ return ! (threadLocalIsSet && threadStateToRecover.get() == null ).also {
307+ threadStateToRecover.remove()
308+ }
309+ }
310+
311+ override fun afterCompletionUndispatched () {
312+ clearThreadLocal()
313+ }
314+
315+ override fun afterResume (state : Any? ) {
316+ clearThreadLocal()
317+ // resume undispatched -- update context but stay on the same dispatcher
318+ val result = recoverResult(state, uCont)
319+ withContinuationContext(uCont, null ) {
320+ uCont.resumeWith(result)
321+ }
322+ }
323+
324+ private fun clearThreadLocal () {
325+ if (threadLocalIsSet) {
326+ threadStateToRecover.get()?.let { (ctx, value) ->
327+ restoreThreadContext(ctx, value)
328+ }
329+ threadStateToRecover.remove()
330+ }
331+ }
332+ }
333+
214334private const val UNDECIDED = 0
215335private const val SUSPENDED = 1
216336private const val RESUMED = 2
0 commit comments