Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 101 additions & 65 deletions src/internal/task/task_threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ type state struct {
// is needed to be able to scan the stack.
stackTop uintptr

// Lowest address of the stack.
// This is populated when the thread is stopped by the GC.
stackBottom uintptr

// Next task in the activeTasks queue.
QueueNext *Task

// Semaphore to pause/resume the thread atomically.
pauseSem Semaphore

// Semaphore used for stack scanning.
// We can't reuse pauseSem here since the thread might have been paused for
// other reasons (for example, because it was waiting on a channel).
gcSem Semaphore
}

// Goroutine counter, starting at 0 for the main goroutine.
Expand Down Expand Up @@ -96,6 +95,9 @@ func (t *Task) Resume() {
t.state.pauseSem.Post()
}

// otherGoroutines is the total number of live goroutines minus one.
var otherGoroutines uint32

// Start a new OS thread.
func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) {
t := &Task{}
Expand All @@ -115,6 +117,7 @@ func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) {
}
t.state.QueueNext = activeTasks
activeTasks = t
otherGoroutines++
activeTaskLock.Unlock()
}

Expand All @@ -135,6 +138,7 @@ func taskExited(t *Task) {
break
}
}
otherGoroutines--
activeTaskLock.Unlock()

// Sanity check.
Expand All @@ -143,9 +147,42 @@ func taskExited(t *Task) {
}
}

// Futex to wait on until all tasks have finished scanning the stack.
// This is basically a sync.WaitGroup.
var scanDoneFutex Futex
// scanWaitGroup is used to wait on until all threads have finished the current state transition.
var scanWaitGroup waitGroup

type waitGroup struct {
f Futex
}

func initWaitGroup(n uint32) waitGroup {
var wg waitGroup
wg.f.Store(n)
return wg
}

func (wg *waitGroup) done() {
if wg.f.Add(^uint32(0)) == 0 {
wg.f.WakeAll()
}
}

func (wg *waitGroup) wait() {
for {
val := wg.f.Load()
if val == 0 {
return
}
wg.f.Wait(val)
}
}

// gcState is used to track and notify threads when the GC is stopping/resuming.
var gcState Futex

const (
gcStateResumed = iota
gcStateStopped
)

// GC scan phase. Because we need to stop the world while scanning, this kinda
// needs to be done in the tasks package.
Expand All @@ -155,100 +192,99 @@ var scanDoneFutex Futex
func GCStopWorldAndScan() {
current := Current()

// Don't allow new goroutines to be started while pausing/resuming threads
// in the stop-the-world phase.
activeTaskLock.Lock()
// NOTE: This does not need to be atomic.
if gcState.Load() == gcStateResumed {
// Don't allow new goroutines to be started while pausing/resuming threads
// in the stop-the-world phase.
activeTaskLock.Lock()

// Pause all other threads.
numOtherThreads := uint32(0)
for t := activeTasks; t != nil; t = t.state.QueueNext {
if t != current {
numOtherThreads++
tinygo_task_send_gc_signal(t.state.thread)
}
}
// Wait for threads to finish resuming.
scanWaitGroup.wait()

// Store the number of threads to wait for in the futex.
// This is the equivalent of doing an initial wg.Add(numOtherThreads).
scanDoneFutex.Store(numOtherThreads)
// Change the gc state to stopped.
// NOTE: This does not need to be atomic.
gcState.Store(gcStateStopped)

// Scan the current stack, and all current registers.
scanCurrentStack()
// Set the number of threads to wait for.
scanWaitGroup = initWaitGroup(otherGoroutines)

// Wake each paused thread for the first time so it will scan the stack.
for t := activeTasks; t != nil; t = t.state.QueueNext {
if t != current {
t.state.gcSem.Post()
// Pause all other threads.
for t := activeTasks; t != nil; t = t.state.QueueNext {
if t != current {
tinygo_task_send_gc_signal(t.state.thread)
}
}

// Wait for the threads to finish stopping.
scanWaitGroup.wait()
}

// Wait until all threads have finished scanning their stack.
// This is the equivalent of wg.Wait()
for {
val := scanDoneFutex.Load()
if val == 0 {
break
// Scan other thread stacks.
for t := activeTasks; t != nil; t = t.state.QueueNext {
if t != current {
markRoots(t.state.stackBottom, t.state.stackTop)
}
scanDoneFutex.Wait(val)
}

// Scan the current stack, and all current registers.
scanCurrentStack()

// Scan all globals (implemented in the runtime).
gcScanGlobals()
}

// After the GC is done scanning, resume all other threads.
//
// This must only be called after a GCStopWorldAndScan call.
func GCResumeWorld() {
current := Current()

// Wake each paused thread for the second time, so they will resume normal
// operation.
for t := activeTasks; t != nil; t = t.state.QueueNext {
if t != current {
t.state.gcSem.Post()
}
// NOTE: This does not need to be atomic.
if gcState.Load() == gcStateResumed {
// This is already resumed.
return
}

// Set the wait group to track resume progress.
scanWaitGroup = initWaitGroup(otherGoroutines)

// Set the state to resumed.
gcState.Store(gcStateResumed)

// Wake all of the stopped threads.
gcState.WakeAll()

// Allow goroutines to start and exit again.
activeTaskLock.Unlock()
}

//go:linkname markRoots runtime.markRoots
func markRoots(start, end uintptr)

// Scan globals, implemented in the runtime package.
func gcScanGlobals()

var stackScanLock PMutex

//export tinygo_task_gc_pause
func tingyo_task_gc_pause(sig int32) {
// Wait until we get the signal to start scanning the stack.
Current().state.gcSem.Wait()

// Scan the thread stack.
// Only scan a single thread stack at a time, because the GC marking phase
// doesn't support parallelism.
// TODO: it may be possible to call markRoots directly (without saving
// registers) since we are in a signal handler that already saved a bunch of
// registers. This is an optimization left for a future time.
stackScanLock.Lock()
scanCurrentStack()
stackScanLock.Unlock()
// Write the entrty stack pointer to the state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor spelling nit.

Current().state.stackBottom = uintptr(stacksave())

// Notify the GC that we are stopped.
scanWaitGroup.done()

// Equivalent of wg.Done(): subtract one from the futex and if the result is
// 0 (meaning we were the last in the waitgroup), wake the waiting thread.
n := uint32(1)
if scanDoneFutex.Add(-n) == 0 {
scanDoneFutex.Wake()
// Wait for the GC to resume.
for gcState.Load() == gcStateStopped {
gcState.Wait(gcStateStopped)
}

// Wait until we get the signal we can resume normally (after the mark phase
// has finished).
Current().state.gcSem.Wait()
// Notify the GC that we have resumed.
scanWaitGroup.done()
}

//go:export tinygo_scanCurrentStack
func scanCurrentStack()

//go:linkname stacksave runtime.stacksave
func stacksave() unsafe.Pointer

// Return the highest address of the current stack.
func StackTop() uintptr {
return Current().state.stackTop
Expand Down
26 changes: 2 additions & 24 deletions src/runtime/gc_boehm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ var zeroSizedAlloc uint8

var gcLock task.PMutex

// Normally false, set to true during a GC scan when all other threads get
// paused.
var needsResumeWorld bool

func initHeap() {
libgc_init()

Expand All @@ -48,20 +44,8 @@ func gcInit()

//export tinygo_runtime_bdwgc_callback
func gcCallback() {
if hasParallelism && needsResumeWorld {
// Should never happen, check for it anyway.
runtimePanic("gc: world already stopped")
}

// Mark globals and all stacks, and stop the world if we're using threading.
gcMarkReachable()

// If we use a scheduler with parallelism (the threads scheduler for
// example), we need to call gcResumeWorld() after scanning has finished.
if hasParallelism {
// Note that we need to resume the world after finishing the GC call.
needsResumeWorld = true
}
}

func markRoots(start, end uintptr) {
Expand All @@ -87,7 +71,6 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer {
}

gcLock.Lock()
needsResumeWorld = false
var ptr unsafe.Pointer
if layout == gclayout.NoPtrs.AsPtr() {
// This object is entirely pointer free, for example make([]int, ...).
Expand All @@ -104,9 +87,7 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer {
// Memory returned from libgc_malloc has already been zeroed, so nothing
// to do here.
}
if needsResumeWorld {
gcResumeWorld()
}
gcResumeWorld()
gcLock.Unlock()
if ptr == nil {
runtimePanic("gc: out of memory")
Expand All @@ -121,11 +102,8 @@ func free(ptr unsafe.Pointer) {

func GC() {
gcLock.Lock()
needsResumeWorld = false
libgc_gcollect()
if needsResumeWorld {
gcResumeWorld()
}
gcResumeWorld()
gcLock.Unlock()
}

Expand Down
Loading