From a4f345774744929526820b4fc9333783a7facbed Mon Sep 17 00:00:00 2001 From: Jaden Weiss Date: Fri, 29 May 2020 23:46:58 -0400 Subject: [PATCH] runtime: make channels work in interrupts --- src/examples/systick/systick.go | 17 +++++---- src/internal/task/queue.go | 28 +++++++++++++- src/runtime/arch_cortexm.go | 4 ++ src/runtime/arch_tinygoriscv.go | 8 ++++ src/runtime/chan.go | 43 +++++++++++++++++++++ src/runtime/gc_conservative.go | 30 ++++++++++++++- src/runtime/gc_extalloc.go | 51 ++++++++++++++++++++----- src/runtime/interrupt/interrupt_none.go | 25 ++++++++++++ src/runtime/runtime_atsamd21.go | 31 +++++++++++---- src/runtime/scheduler.go | 15 ++++---- src/runtime/scheduler_any.go | 3 ++ src/runtime/scheduler_none.go | 2 + src/runtime/wait_other.go | 8 ++++ 13 files changed, 232 insertions(+), 33 deletions(-) create mode 100644 src/runtime/interrupt/interrupt_none.go create mode 100644 src/runtime/wait_other.go diff --git a/src/examples/systick/systick.go b/src/examples/systick/systick.go index b678c3fa..f03fd316 100644 --- a/src/examples/systick/systick.go +++ b/src/examples/systick/systick.go @@ -5,6 +5,8 @@ import ( "machine" ) +var timerCh = make(chan struct{}, 1) + func main() { machine.LED.Configure(machine.PinConfig{Mode: machine.PinOutput}) @@ -12,17 +14,18 @@ func main() { arm.SetupSystemTimer(machine.CPUFrequency() / 10) for { + machine.LED.Low() + <-timerCh + machine.LED.High() + <-timerCh } } -var led_state bool - //export SysTick_Handler func timer_isr() { - if led_state { - machine.LED.Low() - } else { - machine.LED.High() + select { + case timerCh <- struct{}{}: + default: + // The consumer is running behind. } - led_state = !led_state } diff --git a/src/internal/task/queue.go b/src/internal/task/queue.go index 4b0a22bf..5a00d95a 100644 --- a/src/internal/task/queue.go +++ b/src/internal/task/queue.go @@ -1,5 +1,7 @@ package task +import "runtime/interrupt" + const asserts = false // Queue is a FIFO container of tasks. @@ -10,7 +12,9 @@ type Queue struct { // Push a task onto the queue. func (q *Queue) Push(t *Task) { + i := interrupt.Disable() if asserts && t.Next != nil { + interrupt.Restore(i) panic("runtime: pushing a task to a queue with a non-nil Next pointer") } if q.tail != nil { @@ -21,12 +25,15 @@ func (q *Queue) Push(t *Task) { if q.head == nil { q.head = t } + interrupt.Restore(i) } // Pop a task off of the queue. func (q *Queue) Pop() *Task { + i := interrupt.Disable() t := q.head if t == nil { + interrupt.Restore(i) return nil } q.head = t.Next @@ -34,11 +41,13 @@ func (q *Queue) Pop() *Task { q.tail = nil } t.Next = nil + interrupt.Restore(i) return t } // Append pops the contents of another queue and pushes them onto the end of this queue. func (q *Queue) Append(other *Queue) { + i := interrupt.Disable() if q.head == nil { q.head = other.head } else { @@ -46,6 +55,15 @@ func (q *Queue) Append(other *Queue) { } q.tail = other.tail other.head, other.tail = nil, nil + interrupt.Restore(i) +} + +// Empty checks if the queue is empty. +func (q *Queue) Empty() bool { + i := interrupt.Disable() + empty := q.head == nil + interrupt.Restore(i) + return empty } // Stack is a LIFO container of tasks. @@ -57,19 +75,24 @@ type Stack struct { // Push a task onto the stack. func (s *Stack) Push(t *Task) { + i := interrupt.Disable() if asserts && t.Next != nil { + interrupt.Restore(i) panic("runtime: pushing a task to a stack with a non-nil Next pointer") } s.top, t.Next = t, s.top + interrupt.Restore(i) } // Pop a task off of the stack. func (s *Stack) Pop() *Task { + i := interrupt.Disable() t := s.top if t != nil { s.top = t.Next t.Next = nil } + interrupt.Restore(i) return t } @@ -89,10 +112,13 @@ func (t *Task) tail() *Task { // Queue moves the contents of the stack into a queue. // Elements can be popped from the queue in the same order that they would be popped from the stack. func (s *Stack) Queue() Queue { + i := interrupt.Disable() head := s.top s.top = nil - return Queue{ + q := Queue{ head: head, tail: head.tail(), } + interrupt.Restore(i) + return q } diff --git a/src/runtime/arch_cortexm.go b/src/runtime/arch_cortexm.go index 3359eb66..1ccdd96c 100644 --- a/src/runtime/arch_cortexm.go +++ b/src/runtime/arch_cortexm.go @@ -102,3 +102,7 @@ func procPin() { func procUnpin() { arm.EnableInterrupts(procPinnedMask) } + +func waitForEvents() { + arm.Asm("wfe") +} diff --git a/src/runtime/arch_tinygoriscv.go b/src/runtime/arch_tinygoriscv.go index 98fbed70..7aa34b50 100644 --- a/src/runtime/arch_tinygoriscv.go +++ b/src/runtime/arch_tinygoriscv.go @@ -90,3 +90,11 @@ func procPin() { func procUnpin() { riscv.EnableInterrupts(procPinnedMask) } + +func waitForEvents() { + mask := riscv.DisableInterrupts() + if !runqueue.Empty() { + riscv.Asm("wfi") + } + riscv.EnableInterrupts(mask) +} diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 37ad154a..c2da4f05 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -25,6 +25,7 @@ package runtime import ( "internal/task" + "runtime/interrupt" "unsafe" ) @@ -308,13 +309,17 @@ func (ch *channel) trySend(value unsafe.Pointer) bool { return false } + i := interrupt.Disable() + switch ch.state { case chanStateEmpty, chanStateBuf: // try to dump the value directly into the buffer if ch.push(value) { ch.state = chanStateBuf + interrupt.Restore(i) return true } + interrupt.Restore(i) return false case chanStateRecv: // unblock reciever @@ -328,16 +333,21 @@ func (ch *channel) trySend(value unsafe.Pointer) bool { ch.state = chanStateEmpty } + interrupt.Restore(i) return true case chanStateSend: // something else is already waiting to send + interrupt.Restore(i) return false case chanStateClosed: + interrupt.Restore(i) runtimePanic("send on closed channel") default: + interrupt.Restore(i) runtimePanic("invalid channel state") } + interrupt.Restore(i) return false } @@ -351,6 +361,8 @@ func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) { return false, false } + i := interrupt.Disable() + switch ch.state { case chanStateBuf, chanStateSend: // try to pop the value directly from the buffer @@ -373,6 +385,7 @@ func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) { ch.state = chanStateEmpty } + interrupt.Restore(i) return true, true } else if ch.blocked != nil { // unblock next sender if applicable @@ -386,19 +399,24 @@ func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) { ch.state = chanStateEmpty } + interrupt.Restore(i) return true, true } + interrupt.Restore(i) return false, false case chanStateRecv, chanStateEmpty: // something else is already waiting to recieve + interrupt.Restore(i) return false, false case chanStateClosed: if ch.pop(value) { + interrupt.Restore(i) return true, true } // channel closed - nothing to recieve memzero(value, ch.elementSize) + interrupt.Restore(i) return true, false default: runtimePanic("invalid channel state") @@ -447,14 +465,18 @@ type chanSelectState struct { // This operation will block unless a value is immediately available. // May panic if the channel is closed. func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) { + i := interrupt.Disable() + if ch.trySend(value) { // value immediately sent chanDebug(ch) + interrupt.Restore(i) return } if ch == nil { // A nil channel blocks forever. Do not schedule this goroutine again. + interrupt.Restore(i) deadlock() } @@ -468,6 +490,7 @@ func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList } ch.blocked = blockedlist chanDebug(ch) + interrupt.Restore(i) task.Pause() sender.Ptr = nil } @@ -477,14 +500,18 @@ func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList // The recieved value is copied into the value pointer. // Returns the comma-ok value. func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool { + i := interrupt.Disable() + if rx, ok := ch.tryRecv(value); rx { // value immediately available chanDebug(ch) + interrupt.Restore(i) return ok } if ch == nil { // A nil channel blocks forever. Do not schedule this goroutine again. + interrupt.Restore(i) deadlock() } @@ -498,6 +525,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList } ch.blocked = blockedlist chanDebug(ch) + interrupt.Restore(i) task.Pause() ok := receiver.Data == 1 receiver.Ptr, receiver.Data = nil, 0 @@ -511,15 +539,18 @@ func chanClose(ch *channel) { // Not allowed by the language spec. runtimePanic("close of nil channel") } + i := interrupt.Disable() switch ch.state { case chanStateClosed: // Not allowed by the language spec. + interrupt.Restore(i) runtimePanic("close of closed channel") case chanStateSend: // This panic should ideally on the sending side, not in this goroutine. // But when a goroutine tries to send while the channel is being closed, // that is clearly invalid: the send should have been completed already // before the close. + interrupt.Restore(i) runtimePanic("close channel during send") case chanStateRecv: // unblock all receivers with the zero value @@ -531,6 +562,7 @@ func chanClose(ch *channel) { // Easy case. No available sender or receiver. } ch.state = chanStateClosed + interrupt.Restore(i) chanDebug(ch) } @@ -541,8 +573,11 @@ func chanClose(ch *channel) { // TODO: do this in a round-robin fashion (as specified in the Go spec) instead // of picking the first one that can proceed. func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { + istate := interrupt.Disable() + if selected, ok := tryChanSelect(recvbuf, states); selected != ^uintptr(0) { // one channel was immediately ready + interrupt.Restore(istate) return selected, ok } @@ -570,6 +605,7 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelB case chanStateRecv: // already in correct state default: + interrupt.Restore(istate) runtimePanic("invalid channel state") } } else { @@ -582,6 +618,7 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelB case chanStateBuf: // already in correct state default: + interrupt.Restore(istate) runtimePanic("invalid channel state") } } @@ -594,6 +631,7 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelB t.Data = 1 // wait for one case to fire + interrupt.Restore(istate) task.Pause() // figure out which one fired and return the ok value @@ -602,22 +640,27 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelB // tryChanSelect is like chanSelect, but it does a non-blocking select operation. func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) { + istate := interrupt.Disable() + // See whether we can receive from one of the channels. for i, state := range states { if state.value == nil { // A receive operation. if rx, ok := state.ch.tryRecv(recvbuf); rx { chanDebug(state.ch) + interrupt.Restore(istate) return uintptr(i), ok } } else { // A send operation: state.value is not nil. if state.ch.trySend(state.value) { chanDebug(state.ch) + interrupt.Restore(istate) return uintptr(i), true } } } + interrupt.Restore(istate) return ^uintptr(0), false } diff --git a/src/runtime/gc_conservative.go b/src/runtime/gc_conservative.go index 18594b62..fd0d3150 100644 --- a/src/runtime/gc_conservative.go +++ b/src/runtime/gc_conservative.go @@ -29,6 +29,8 @@ package runtime // Moss. import ( + "internal/task" + "runtime/interrupt" "unsafe" ) @@ -292,10 +294,36 @@ func GC() { } // Mark phase: mark all reachable objects, recursively. - markGlobals() markStack() + markGlobals() + + // Channel operations in interrupts may move task pointers around while we are marking. + // Therefore we need to scan the runqueue seperately. + var markedTaskQueue task.Queue +runqueueScan: + for !runqueue.Empty() { + // Pop the next task off of the runqueue. + t := runqueue.Pop() + + // Mark the task if it has not already been marked. + markRoot(uintptr(unsafe.Pointer(&runqueue)), uintptr(unsafe.Pointer(t))) + + // Push the task onto our temporary queue. + markedTaskQueue.Push(t) + } + finishMark() + // Restore the runqueue. + i := interrupt.Disable() + if !runqueue.Empty() { + // Something new came in while finishing the mark. + interrupt.Restore(i) + goto runqueueScan + } + runqueue = markedTaskQueue + interrupt.Restore(i) + // Sweep phase: free all non-marked objects and unmark marked objects for // the next collection cycle. sweep() diff --git a/src/runtime/gc_extalloc.go b/src/runtime/gc_extalloc.go index 296ac7a0..7272d34c 100644 --- a/src/runtime/gc_extalloc.go +++ b/src/runtime/gc_extalloc.go @@ -2,7 +2,11 @@ package runtime -import "unsafe" +import ( + "internal/task" + "runtime/interrupt" + "unsafe" +) // This garbage collector implementation allows TinyGo to use an external memory allocator. // It appends a header to the end of every allocation which the garbage collector uses for tracking purposes. @@ -417,9 +421,9 @@ func (t *memTreap) destroy() { // This is used to detect if the collector is invoking itself or trying to allocate memory. var gcrunning bool -// activeMem is a treap used to store marked allocations which have already been scanned. +// activeMem is a queue used to store marked allocations which have already been scanned. // This is only used when the garbage collector is running. -var activeMem memTreap +var activeMem memScanQueue func GC() { if gcDebug { @@ -449,10 +453,27 @@ func GC() { // These can be quickly compared against to eliminate most false positives. firstPtr, lastPtr = allocations.minAddr(), allocations.maxAddr() - // Start by scanning all of the global variables and the stack. - markGlobals() + // Start by scanning the stack. markStack() + // Scan all globals. + markGlobals() + + // Channel operations in interrupts may move task pointers around while we are marking. + // Therefore we need to scan the runqueue seperately. + var markedTaskQueue task.Queue +runqueueScan: + for !runqueue.Empty() { + // Pop the next task off of the runqueue. + t := runqueue.Pop() + + // Mark the task if it has not already been marked. + markRoot(uintptr(unsafe.Pointer(&runqueue)), uintptr(unsafe.Pointer(t))) + + // Push the task onto our temporary queue. + markedTaskQueue.Push(t) + } + // Scan all referenced allocations, building a new treap with marked allocations. // The marking process deletes the allocations from the old allocations treap, so they are only queued once. for !scanQueue.empty() { @@ -462,19 +483,29 @@ func GC() { // Scan and mark all nodes that this references. n.scan() - // Insert this node into the new treap. - activeMem.insert(n) + // Insert this node into the active memory queue. + activeMem.push(n) } + i := interrupt.Disable() + if !runqueue.Empty() { + // Something new came in while finishing the mark. + interrupt.Restore(i) + goto runqueueScan + } + runqueue = markedTaskQueue + interrupt.Restore(i) + // The allocations treap now only contains unreferenced nodes. Destroy them all. allocations.destroy() if gcAsserts && !allocations.empty() { runtimePanic("failed to fully destroy allocations") } - // Replace the allocations treap with the new treap. - allocations = activeMem - activeMem = memTreap{} + // Treapify the active memory queue. + for !activeMem.empty() { + allocations.insert(activeMem.pop()) + } if gcDebug { println("GC finished") diff --git a/src/runtime/interrupt/interrupt_none.go b/src/runtime/interrupt/interrupt_none.go new file mode 100644 index 00000000..89109304 --- /dev/null +++ b/src/runtime/interrupt/interrupt_none.go @@ -0,0 +1,25 @@ +// +build !baremetal + +package interrupt + +// State represents the previous global interrupt state. +type State uintptr + +// Disable disables all interrupts and returns the previous interrupt state. It +// can be used in a critical section like this: +// +// state := interrupt.Disable() +// // critical section +// interrupt.Restore(state) +// +// Critical sections can be nested. Make sure to call Restore in the same order +// as you called Disable (this happens naturally with the pattern above). +func Disable() (state State) { + return 0 +} + +// Restore restores interrupts to what they were before. Give the previous state +// returned by Disable as a parameter. If interrupts were disabled before +// calling Disable, this will not re-enable interrupts, allowing for nested +// cricital sections. +func Restore(state State) {} diff --git a/src/runtime/runtime_atsamd21.go b/src/runtime/runtime_atsamd21.go index 1055db0c..1d84a4a9 100644 --- a/src/runtime/runtime_atsamd21.go +++ b/src/runtime/runtime_atsamd21.go @@ -216,14 +216,14 @@ func initRTC() { sam.RTC_MODE0.CTRL.SetBits(sam.RTC_MODE0_CTRL_ENABLE) waitForSync() - intr := interrupt.New(sam.IRQ_RTC, func(intr interrupt.Interrupt) { + rtcInterrupt := interrupt.New(sam.IRQ_RTC, func(intr interrupt.Interrupt) { // disable IRQ for CMP0 compare sam.RTC_MODE0.INTFLAG.Set(sam.RTC_MODE0_INTENSET_CMP0) timerWakeup.Set(1) }) - intr.SetPriority(0xc0) - intr.Enable() + rtcInterrupt.SetPriority(0xc0) + rtcInterrupt.Enable() } func waitForSync() { @@ -261,7 +261,10 @@ func sleepTicks(d timeUnit) { for d != 0 { ticks() // update timestamp ticks := uint32(d) - timerSleep(ticks) + if !timerSleep(ticks) { + // Bail out early to handle a non-time interrupt. + return + } d -= timeUnit(ticks) } } @@ -280,7 +283,9 @@ func ticks() timeUnit { } // ticks are in microseconds -func timerSleep(ticks uint32) { +// Returns true if the timer completed. +// Returns false if another interrupt occured which requires an early return to scheduler. +func timerSleep(ticks uint32) bool { timerWakeup.Set(0) if ticks < 7 { // Due to around 6 clock ticks delay waiting for the register value to @@ -302,8 +307,20 @@ func timerSleep(ticks uint32) { // enable IRQ for CMP0 compare sam.RTC_MODE0.INTENSET.SetBits(sam.RTC_MODE0_INTENSET_CMP0) - for timerWakeup.Get() == 0 { - arm.Asm("wfi") +wait: + arm.Asm("wfe") + if timerWakeup.Get() != 0 { + return true + } + if hasScheduler { + // The interurpt may have awoken a goroutine, so bail out early. + // Disable IRQ for CMP0 compare. + sam.RTC_MODE0.INTENCLR.SetBits(sam.RTC_MODE0_INTENSET_CMP0) + return false + } else { + // This is running without a scheduler. + // The application expects this to sleep the whole time. + goto wait } } diff --git a/src/runtime/scheduler.go b/src/runtime/scheduler.go index a8c9f16b..42cc0fef 100644 --- a/src/runtime/scheduler.go +++ b/src/runtime/scheduler.go @@ -20,6 +20,8 @@ import ( const schedulerDebug = false +var schedulerDone bool + // Queues used by the scheduler. var ( runqueue task.Queue @@ -114,7 +116,7 @@ func addSleepTask(t *task.Task, duration timeUnit) { func scheduler() { // Main scheduler loop. var now timeUnit - for { + for !schedulerDone { scheduleLog("") scheduleLog(" schedule") if sleepQueue != nil { @@ -135,12 +137,11 @@ func scheduler() { t := runqueue.Pop() if t == nil { if sleepQueue == nil { - // No more tasks to execute. - // It would be nice if we could detect deadlocks here, because - // there might still be functions waiting on each other in a - // deadlock. - scheduleLog(" no tasks left!") - return + if asyncScheduler { + return + } + waitForEvents() + continue } timeLeft := timeUnit(sleepQueue.Data) - (now - sleepQueueBaseTime) if schedulerDebug { diff --git a/src/runtime/scheduler_any.go b/src/runtime/scheduler_any.go index fb7ca6e9..57fc1624 100644 --- a/src/runtime/scheduler_any.go +++ b/src/runtime/scheduler_any.go @@ -19,6 +19,9 @@ func run() { initAll() postinit() callMain() + schedulerDone = true }() scheduler() } + +const hasScheduler = true diff --git a/src/runtime/scheduler_none.go b/src/runtime/scheduler_none.go index e40615fe..ec83003f 100644 --- a/src/runtime/scheduler_none.go +++ b/src/runtime/scheduler_none.go @@ -21,3 +21,5 @@ func run() { postinit() callMain() } + +const hasScheduler = false diff --git a/src/runtime/wait_other.go b/src/runtime/wait_other.go new file mode 100644 index 00000000..20f418d3 --- /dev/null +++ b/src/runtime/wait_other.go @@ -0,0 +1,8 @@ +// +build !tinygo.riscv +// +build !cortexm + +package runtime + +func waitForEvents() { + runtimePanic("deadlocked: no event source") +}