builder: refactor job runner and use a shared semaphore across build jobs

Switching to a shared semaphore allows multi-build operations (compiler tests, package tests, etc.) to use the expected degree of parallelism efficiently.

While refactoring the job runner, the time complexity was also reduced from O(n^2) to O(n+m) (where n is the number of jobs, and m is the number of dependencies).
Этот коммит содержится в:
Nia Waldvogel 2021-12-24 13:19:39 -05:00 коммит произвёл Nia
родитель bb08a25edc
коммит e594dbc133
6 изменённых файлов: 151 добавлений и 115 удалений

Просмотреть файл

@ -489,7 +489,7 @@ func Build(pkgName, outpath string, config *compileopts.Config, action func(Buil
outext := filepath.Ext(outpath)
if outext == ".o" || outext == ".bc" || outext == ".ll" {
// Run jobs to produce the LLVM module.
err := runJobs(programJob, config.Options.Parallelism)
err := runJobs(programJob, config.Options.Semaphore)
if err != nil {
return err
}
@ -751,7 +751,7 @@ func Build(pkgName, outpath string, config *compileopts.Config, action func(Buil
// Run all jobs to compile and link the program.
// Do this now (instead of after elf-to-hex and similar conversions) as it
// is simpler and cannot be parallelized.
err = runJobs(linkJob, config.Options.Parallelism)
err = runJobs(linkJob, config.Options.Semaphore)
if err != nil {
return err
}

Просмотреть файл

@ -4,8 +4,12 @@ package builder
// parallel while taking care of dependencies.
import (
"container/heap"
"errors"
"fmt"
"runtime"
"sort"
"strings"
"time"
)
@ -29,7 +33,6 @@ type compileJob struct {
dependencies []*compileJob
result string // result (path)
run func(*compileJob) (err error)
state jobState
err error // error if finished
duration time.Duration // how long it took to run this job (only set after finishing)
}
@ -44,41 +47,20 @@ func dummyCompileJob(result string) *compileJob {
}
}
// readyToRun returns whether this job is ready to run: it is itself not yet
// started and all dependencies are finished.
func (job *compileJob) readyToRun() bool {
if job.state != jobStateQueued {
// Already running or finished, so shouldn't be run again.
return false
}
// Check dependencies.
for _, dep := range job.dependencies {
if dep.state != jobStateFinished {
// A dependency is not finished, so this job has to wait until it
// is.
return false
}
}
// All conditions are satisfied.
return true
}
// runJobs runs the indicated job and all its dependencies. For every job, all
// the dependencies are run first. It returns the error of the first job that
// fails.
// It runs all jobs in the order of the dependencies slice, depth-first.
// Therefore, if some jobs are preferred to run before others, they should be
// ordered as such in the job dependencies.
func runJobs(job *compileJob, parallelism int) error {
if parallelism == 0 {
// Have a default, if the parallelism isn't set. This is useful for
func runJobs(job *compileJob, sema chan struct{}) error {
if sema == nil {
// Have a default, if the semaphore isn't set. This is useful for
// tests.
parallelism = runtime.NumCPU()
sema = make(chan struct{}, runtime.NumCPU())
}
if parallelism < 1 {
return fmt.Errorf("-p flag must be at least 1, provided -p=%d", parallelism)
if cap(sema) == 0 {
return errors.New("cannot 0 jobs at a time")
}
// Create a slice of jobs to run, where all dependencies are run in order.
@ -97,64 +79,91 @@ func runJobs(job *compileJob, parallelism int) error {
}
addJobs(job)
// Create channels to communicate with the workers.
doneChan := make(chan *compileJob)
workerChan := make(chan *compileJob)
defer close(workerChan)
// Start a number of workers.
for i := 0; i < parallelism; i++ {
if jobRunnerDebug {
fmt.Println("## starting worker", i)
waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs))
dependents := make(map[*compileJob][]*compileJob, len(jobs))
jidx := make(map[*compileJob]int)
var ready intHeap
for i, job := range jobs {
jidx[job] = i
if len(job.dependencies) == 0 {
// This job is ready to run.
ready.Push(i)
continue
}
// Construct a map for dependencies which the job is currently waiting on.
waitDeps := make(map[*compileJob]struct{})
waiting[job] = waitDeps
// Add the job to the dependents list of each dependency.
for _, dep := range job.dependencies {
dependents[dep] = append(dependents[dep], job)
waitDeps[dep] = struct{}{}
}
go jobWorker(workerChan, doneChan)
}
// Create a channel to accept notifications of completion.
doneChan := make(chan *compileJob)
// Send each job in the jobs slice to a worker, taking care of job
// dependencies.
numRunningJobs := 0
var totalTime time.Duration
start := time.Now()
for {
// If there are free workers, try starting a new job (if one is
// available). If it succeeds, try again to fill the entire worker pool.
if numRunningJobs < parallelism {
jobToRun := nextJob(jobs)
if jobToRun != nil {
// Start job.
for len(ready.IntSlice) > 0 || numRunningJobs != 0 {
var completed *compileJob
if len(ready.IntSlice) > 0 {
select {
case sema <- struct{}{}:
// Start a job.
job := jobs[heap.Pop(&ready).(int)]
if jobRunnerDebug {
fmt.Println("## start: ", jobToRun.description)
fmt.Println("## start: ", job.description)
}
jobToRun.state = jobStateRunning
workerChan <- jobToRun
go runJob(job, doneChan)
numRunningJobs++
continue
case completed = <-doneChan:
// A job completed.
}
} else {
// Wait for a job to complete.
completed = <-doneChan
}
// When there are no jobs running, all jobs in the jobs slice must have
// been finished. Therefore, the work is done.
if numRunningJobs == 0 {
break
}
// Wait until a job is finished.
job := <-doneChan
job.state = jobStateFinished
numRunningJobs--
totalTime += job.duration
<-sema
if jobRunnerDebug {
fmt.Println("## finished:", job.description, "(time "+job.duration.String()+")")
}
if job.err != nil {
// Wait for running jobs to finish.
if completed.err != nil {
// Wait for any current jobs to finish.
for numRunningJobs != 0 {
<-doneChan
numRunningJobs--
}
// Return error of first failing job.
return job.err
// The build failed.
return completed.err
}
// Update total run time.
totalTime += completed.duration
// Update dependent jobs.
for _, j := range dependents[completed] {
wait := waiting[j]
delete(wait, completed)
if len(wait) == 0 {
// This job is now ready to run.
ready.Push(jidx[j])
delete(waiting, j)
}
}
}
if len(waiting) != 0 {
// There is a dependency cycle preventing some jobs from running.
return errDependencyCycle{waiting}
}
// Some statistics, if debugging.
@ -171,29 +180,50 @@ func runJobs(job *compileJob, parallelism int) error {
return nil
}
// nextJob returns the first ready-to-run job.
// This is an implementation detail of runJobs.
func nextJob(jobs []*compileJob) *compileJob {
for _, job := range jobs {
if job.readyToRun() {
return job
}
}
return nil
type errDependencyCycle struct {
waiting map[*compileJob]map[*compileJob]struct{}
}
// jobWorker is the goroutine that runs received jobs.
// This is an implementation detail of runJobs.
func jobWorker(workerChan, doneChan chan *compileJob) {
for job := range workerChan {
start := time.Now()
if job.run != nil {
err := job.run(job)
if err != nil {
job.err = err
}
func (err errDependencyCycle) Error() string {
waits := make([]string, 0, len(err.waiting))
for j, wait := range err.waiting {
deps := make([]string, 0, len(wait))
for dep := range wait {
deps = append(deps, dep.description)
}
job.duration = time.Since(start)
doneChan <- job
sort.Strings(deps)
waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]",
j.description, strings.Join(deps, ", "),
))
}
sort.Strings(waits)
return "deadlock:\n" + strings.Join(waits, "\n")
}
type intHeap struct {
sort.IntSlice
}
func (h *intHeap) Push(x interface{}) {
h.IntSlice = append(h.IntSlice, x.(int))
}
func (h *intHeap) Pop() interface{} {
x := h.IntSlice[len(h.IntSlice)-1]
h.IntSlice = h.IntSlice[:len(h.IntSlice)-1]
return x
}
// runJob runs a compile job and notifies doneChan of completion.
func runJob(job *compileJob, doneChan chan *compileJob) {
start := time.Now()
if job.run != nil {
err := job.run(job)
if err != nil {
job.err = err
}
}
job.duration = time.Since(start)
doneChan <- job
}

Просмотреть файл

@ -43,7 +43,7 @@ func (l *Library) Load(config *compileopts.Config, tmpdir string) (dir string, e
return "", err
}
defer unlock()
err = runJobs(job, config.Options.Parallelism)
err = runJobs(job, config.Options.Semaphore)
return filepath.Dir(job.result), err
}

Просмотреть файл

@ -32,7 +32,7 @@ type Options struct {
DumpSSA bool
VerifyIR bool
PrintCommands func(cmd string, args ...string)
Parallelism int // -p flag
Semaphore chan struct{} // -p flag controls cap
Debug bool
PrintSizes string
PrintAllocs *regexp.Regexp // regexp string

Просмотреть файл

@ -1237,7 +1237,7 @@ func main() {
PrintIR: *printIR,
DumpSSA: *dumpSSA,
VerifyIR: *verifyIR,
Parallelism: *parallelism,
Semaphore: make(chan struct{}, *parallelism),
Debug: !*nodebug,
PrintSizes: *printSize,
PrintStacks: *printStacks,

Просмотреть файл

@ -53,7 +53,6 @@ func TestCompiler(t *testing.T) {
"testing.go",
"zeroalloc.go",
}
_, minor, err := goenv.GetGorootVersion(goenv.Get("GOROOT"))
if err != nil {
t.Fatal("could not read version from GOROOT:", err)
@ -62,16 +61,18 @@ func TestCompiler(t *testing.T) {
tests = append(tests, "go1.17.go")
}
sema := make(chan struct{}, runtime.NumCPU())
if *testTarget != "" {
// This makes it possible to run one specific test (instead of all),
// which is especially useful to quickly check whether some changes
// affect a particular target architecture.
runPlatTests(optionsFromTarget(*testTarget), tests, t)
runPlatTests(optionsFromTarget(*testTarget, sema), tests, t)
return
}
t.Run("Host", func(t *testing.T) {
runPlatTests(optionsFromTarget(""), tests, t)
runPlatTests(optionsFromTarget("", sema), tests, t)
})
// Test a few build options.
@ -82,10 +83,11 @@ func TestCompiler(t *testing.T) {
t.Run("opt=1", func(t *testing.T) {
t.Parallel()
runTestWithConfig("stdlib.go", t, compileopts.Options{
GOOS: goenv.Get("GOOS"),
GOARCH: goenv.Get("GOARCH"),
GOARM: goenv.Get("GOARM"),
Opt: "1",
GOOS: goenv.Get("GOOS"),
GOARCH: goenv.Get("GOARCH"),
GOARM: goenv.Get("GOARM"),
Opt: "1",
Semaphore: sema,
}, nil, nil)
})
@ -94,10 +96,11 @@ func TestCompiler(t *testing.T) {
t.Run("opt=0", func(t *testing.T) {
t.Parallel()
runTestWithConfig("print.go", t, compileopts.Options{
GOOS: goenv.Get("GOOS"),
GOARCH: goenv.Get("GOARCH"),
GOARM: goenv.Get("GOARM"),
Opt: "0",
GOOS: goenv.Get("GOOS"),
GOARCH: goenv.Get("GOARCH"),
GOARM: goenv.Get("GOARM"),
Opt: "0",
Semaphore: sema,
}, nil, nil)
})
@ -112,6 +115,7 @@ func TestCompiler(t *testing.T) {
"someGlobal": "foobar",
},
},
Semaphore: sema,
}, nil, nil)
})
})
@ -123,28 +127,28 @@ func TestCompiler(t *testing.T) {
}
t.Run("EmulatedCortexM3", func(t *testing.T) {
runPlatTests(optionsFromTarget("cortex-m-qemu"), tests, t)
runPlatTests(optionsFromTarget("cortex-m-qemu", sema), tests, t)
})
t.Run("EmulatedRISCV", func(t *testing.T) {
runPlatTests(optionsFromTarget("riscv-qemu"), tests, t)
runPlatTests(optionsFromTarget("riscv-qemu", sema), tests, t)
})
if runtime.GOOS == "linux" {
t.Run("X86Linux", func(t *testing.T) {
runPlatTests(optionsFromOSARCH("linux/386"), tests, t)
runPlatTests(optionsFromOSARCH("linux/386", sema), tests, t)
})
t.Run("ARMLinux", func(t *testing.T) {
runPlatTests(optionsFromOSARCH("linux/arm/6"), tests, t)
runPlatTests(optionsFromOSARCH("linux/arm/6", sema), tests, t)
})
t.Run("ARM64Linux", func(t *testing.T) {
runPlatTests(optionsFromOSARCH("linux/arm64"), tests, t)
runPlatTests(optionsFromOSARCH("linux/arm64", sema), tests, t)
})
t.Run("WebAssembly", func(t *testing.T) {
runPlatTests(optionsFromTarget("wasm"), tests, t)
runPlatTests(optionsFromTarget("wasm", sema), tests, t)
})
t.Run("WASI", func(t *testing.T) {
runPlatTests(optionsFromTarget("wasi"), tests, t)
runPlatTests(optionsFromTarget("wasi", sema), tests, t)
})
}
}
@ -185,24 +189,26 @@ func runPlatTests(options compileopts.Options, tests []string, t *testing.T) {
}
}
func optionsFromTarget(target string) compileopts.Options {
func optionsFromTarget(target string, sema chan struct{}) compileopts.Options {
return compileopts.Options{
// GOOS/GOARCH are only used if target == ""
GOOS: goenv.Get("GOOS"),
GOARCH: goenv.Get("GOARCH"),
GOARM: goenv.Get("GOARM"),
Target: target,
GOOS: goenv.Get("GOOS"),
GOARCH: goenv.Get("GOARCH"),
GOARM: goenv.Get("GOARM"),
Target: target,
Semaphore: sema,
}
}
// optionsFromOSARCH returns a set of options based on the "osarch" string. This
// string is in the form of "os/arch/subarch", with the subarch only sometimes
// being necessary. Examples are "darwin/amd64" or "linux/arm/7".
func optionsFromOSARCH(osarch string) compileopts.Options {
func optionsFromOSARCH(osarch string, sema chan struct{}) compileopts.Options {
parts := strings.Split(osarch, "/")
options := compileopts.Options{
GOOS: parts[0],
GOARCH: parts[1],
GOOS: parts[0],
GOARCH: parts[1],
Semaphore: sema,
}
if options.GOARCH == "arm" {
options.GOARM = parts[2]