sync: fix concurrent read-lock on write-locked RWMutex

This bug can be triggered by the following series of events:
A acquires a write lock
B starts waiting for a read lock
C starts waiting for a read lock
A releases the write lock

After this, both B and C are supposed to be resumed as a read-lock is available.
However, with the previous implementation, only C would be resumed immediately.
Other goroutines could immediately acquire the read lock, but B would not be resumed until C released the read lock.
Этот коммит содержится в:
Nia Waldvogel 2021-12-18 14:55:07 -05:00 коммит произвёл Nia
родитель 9eb13884de
коммит 5e719b0d3d

Просмотреть файл

@ -5,8 +5,7 @@ import (
_ "unsafe"
)
// These mutexes assume there is only one thread of operation: no goroutines,
// interrupts or anything else.
// These mutexes assume there is only one thread of operation and cannot be accessed safely from interrupts.
type Mutex struct {
locked bool
@ -41,35 +40,132 @@ func (m *Mutex) Unlock() {
}
type RWMutex struct {
m Mutex
readers uint32
// waitingWriters are all of the tasks waiting for write locks.
waitingWriters task.Stack
// waitingReaders are all of the tasks waiting for a read lock.
waitingReaders task.Stack
// state is the current state of the RWMutex.
// Iff the mutex is completely unlocked, it contains rwMutexStateUnlocked (aka 0).
// Iff the mutex is write-locked, it contains rwMutexStateWLocked.
// While the mutex is read-locked, it contains the current number of readers.
state uint32
}
const (
rwMutexStateUnlocked = uint32(0)
rwMutexStateWLocked = ^uint32(0)
rwMutexMaxReaders = rwMutexStateWLocked - 1
)
func (rw *RWMutex) Lock() {
rw.m.Lock()
if rw.state == 0 {
// The mutex is completely unlocked.
// Lock without waiting.
rw.state = rwMutexStateWLocked
return
}
// Wait for the lock to be released.
rw.waitingWriters.Push(task.Current())
task.Pause()
}
func (rw *RWMutex) Unlock() {
rw.m.Unlock()
switch rw.state {
case rwMutexStateWLocked:
// This is correct.
case rwMutexStateUnlocked:
// The mutex is already unlocked.
panic("sync: unlock of unlocked RWMutex")
default:
// The mutex is read-locked instead of write-locked.
panic("sync: write-unlock of read-locked RWMutex")
}
switch {
case rw.maybeUnblockReaders():
// Switched over to read mode.
case rw.maybeUnblockWriter():
// Transferred to another writer.
default:
// Nothing is waiting for the lock.
rw.state = rwMutexStateUnlocked
}
}
func (rw *RWMutex) RLock() {
if rw.readers == 0 {
rw.m.Lock()
if rw.state == rwMutexStateWLocked {
// Wait for the write lock to be released.
rw.waitingReaders.Push(task.Current())
task.Pause()
return
}
rw.readers++
if rw.state == rwMutexMaxReaders {
panic("sync: too many readers on RWMutex")
}
// Increase the reader count.
rw.state++
}
func (rw *RWMutex) RUnlock() {
if rw.readers == 0 {
switch rw.state {
case rwMutexStateUnlocked:
// The mutex is already unlocked.
panic("sync: unlock of unlocked RWMutex")
case rwMutexStateWLocked:
// The mutex is write-locked instead of read-locked.
panic("sync: read-unlock of write-locked RWMutex")
}
rw.readers--
if rw.readers == 0 {
rw.m.Unlock()
rw.state--
if rw.state == rwMutexStateUnlocked {
// This was the last reader.
// Try to unblock a writer.
rw.maybeUnblockWriter()
}
}
func (rw *RWMutex) maybeUnblockReaders() bool {
var n uint32
for {
t := rw.waitingReaders.Pop()
if t == nil {
break
}
n++
scheduleTask(t)
}
if n == 0 {
return false
}
rw.state = n
return true
}
func (rw *RWMutex) maybeUnblockWriter() bool {
t := rw.waitingWriters.Pop()
if t == nil {
return false
}
rw.state = rwMutexStateWLocked
scheduleTask(t)
return true
}
type Locker interface {
Lock()
Unlock()