diff --git a/builder/build.go b/builder/build.go index 46b2aae1..a2675736 100644 --- a/builder/build.go +++ b/builder/build.go @@ -489,7 +489,7 @@ func Build(pkgName, outpath string, config *compileopts.Config, action func(Buil outext := filepath.Ext(outpath) if outext == ".o" || outext == ".bc" || outext == ".ll" { // Run jobs to produce the LLVM module. - err := runJobs(programJob, config.Options.Parallelism) + err := runJobs(programJob, config.Options.Semaphore) if err != nil { return err } @@ -751,7 +751,7 @@ func Build(pkgName, outpath string, config *compileopts.Config, action func(Buil // Run all jobs to compile and link the program. // Do this now (instead of after elf-to-hex and similar conversions) as it // is simpler and cannot be parallelized. - err = runJobs(linkJob, config.Options.Parallelism) + err = runJobs(linkJob, config.Options.Semaphore) if err != nil { return err } diff --git a/builder/jobs.go b/builder/jobs.go index 16bd214e..3d510974 100644 --- a/builder/jobs.go +++ b/builder/jobs.go @@ -4,8 +4,12 @@ package builder // parallel while taking care of dependencies. import ( + "container/heap" + "errors" "fmt" "runtime" + "sort" + "strings" "time" ) @@ -29,7 +33,6 @@ type compileJob struct { dependencies []*compileJob result string // result (path) run func(*compileJob) (err error) - state jobState err error // error if finished duration time.Duration // how long it took to run this job (only set after finishing) } @@ -44,41 +47,20 @@ func dummyCompileJob(result string) *compileJob { } } -// readyToRun returns whether this job is ready to run: it is itself not yet -// started and all dependencies are finished. -func (job *compileJob) readyToRun() bool { - if job.state != jobStateQueued { - // Already running or finished, so shouldn't be run again. - return false - } - - // Check dependencies. - for _, dep := range job.dependencies { - if dep.state != jobStateFinished { - // A dependency is not finished, so this job has to wait until it - // is. - return false - } - } - - // All conditions are satisfied. - return true -} - // runJobs runs the indicated job and all its dependencies. For every job, all // the dependencies are run first. It returns the error of the first job that // fails. // It runs all jobs in the order of the dependencies slice, depth-first. // Therefore, if some jobs are preferred to run before others, they should be // ordered as such in the job dependencies. -func runJobs(job *compileJob, parallelism int) error { - if parallelism == 0 { - // Have a default, if the parallelism isn't set. This is useful for +func runJobs(job *compileJob, sema chan struct{}) error { + if sema == nil { + // Have a default, if the semaphore isn't set. This is useful for // tests. - parallelism = runtime.NumCPU() + sema = make(chan struct{}, runtime.NumCPU()) } - if parallelism < 1 { - return fmt.Errorf("-p flag must be at least 1, provided -p=%d", parallelism) + if cap(sema) == 0 { + return errors.New("cannot 0 jobs at a time") } // Create a slice of jobs to run, where all dependencies are run in order. @@ -97,64 +79,91 @@ func runJobs(job *compileJob, parallelism int) error { } addJobs(job) - // Create channels to communicate with the workers. - doneChan := make(chan *compileJob) - workerChan := make(chan *compileJob) - defer close(workerChan) - - // Start a number of workers. - for i := 0; i < parallelism; i++ { - if jobRunnerDebug { - fmt.Println("## starting worker", i) + waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs)) + dependents := make(map[*compileJob][]*compileJob, len(jobs)) + jidx := make(map[*compileJob]int) + var ready intHeap + for i, job := range jobs { + jidx[job] = i + if len(job.dependencies) == 0 { + // This job is ready to run. + ready.Push(i) + continue + } + + // Construct a map for dependencies which the job is currently waiting on. + waitDeps := make(map[*compileJob]struct{}) + waiting[job] = waitDeps + + // Add the job to the dependents list of each dependency. + for _, dep := range job.dependencies { + dependents[dep] = append(dependents[dep], job) + waitDeps[dep] = struct{}{} } - go jobWorker(workerChan, doneChan) } + // Create a channel to accept notifications of completion. + doneChan := make(chan *compileJob) + // Send each job in the jobs slice to a worker, taking care of job // dependencies. numRunningJobs := 0 var totalTime time.Duration start := time.Now() - for { - // If there are free workers, try starting a new job (if one is - // available). If it succeeds, try again to fill the entire worker pool. - if numRunningJobs < parallelism { - jobToRun := nextJob(jobs) - if jobToRun != nil { - // Start job. + for len(ready.IntSlice) > 0 || numRunningJobs != 0 { + var completed *compileJob + if len(ready.IntSlice) > 0 { + select { + case sema <- struct{}{}: + // Start a job. + job := jobs[heap.Pop(&ready).(int)] if jobRunnerDebug { - fmt.Println("## start: ", jobToRun.description) + fmt.Println("## start: ", job.description) } - jobToRun.state = jobStateRunning - workerChan <- jobToRun + go runJob(job, doneChan) numRunningJobs++ continue + + case completed = <-doneChan: + // A job completed. } + } else { + // Wait for a job to complete. + completed = <-doneChan } - - // When there are no jobs running, all jobs in the jobs slice must have - // been finished. Therefore, the work is done. - if numRunningJobs == 0 { - break - } - - // Wait until a job is finished. - job := <-doneChan - job.state = jobStateFinished numRunningJobs-- - totalTime += job.duration + <-sema if jobRunnerDebug { fmt.Println("## finished:", job.description, "(time "+job.duration.String()+")") } - if job.err != nil { - // Wait for running jobs to finish. + if completed.err != nil { + // Wait for any current jobs to finish. for numRunningJobs != 0 { <-doneChan numRunningJobs-- } - // Return error of first failing job. - return job.err + + // The build failed. + return completed.err } + + // Update total run time. + totalTime += completed.duration + + // Update dependent jobs. + for _, j := range dependents[completed] { + wait := waiting[j] + delete(wait, completed) + if len(wait) == 0 { + // This job is now ready to run. + ready.Push(jidx[j]) + delete(waiting, j) + } + } + } + if len(waiting) != 0 { + // There is a dependency cycle preventing some jobs from running. + return errDependencyCycle{waiting} } // Some statistics, if debugging. @@ -171,29 +180,50 @@ func runJobs(job *compileJob, parallelism int) error { return nil } -// nextJob returns the first ready-to-run job. -// This is an implementation detail of runJobs. -func nextJob(jobs []*compileJob) *compileJob { - for _, job := range jobs { - if job.readyToRun() { - return job - } - } - return nil +type errDependencyCycle struct { + waiting map[*compileJob]map[*compileJob]struct{} } -// jobWorker is the goroutine that runs received jobs. -// This is an implementation detail of runJobs. -func jobWorker(workerChan, doneChan chan *compileJob) { - for job := range workerChan { - start := time.Now() - if job.run != nil { - err := job.run(job) - if err != nil { - job.err = err - } +func (err errDependencyCycle) Error() string { + waits := make([]string, 0, len(err.waiting)) + for j, wait := range err.waiting { + deps := make([]string, 0, len(wait)) + for dep := range wait { + deps = append(deps, dep.description) } - job.duration = time.Since(start) - doneChan <- job + sort.Strings(deps) + + waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]", + j.description, strings.Join(deps, ", "), + )) } + sort.Strings(waits) + return "deadlock:\n" + strings.Join(waits, "\n") +} + +type intHeap struct { + sort.IntSlice +} + +func (h *intHeap) Push(x interface{}) { + h.IntSlice = append(h.IntSlice, x.(int)) +} + +func (h *intHeap) Pop() interface{} { + x := h.IntSlice[len(h.IntSlice)-1] + h.IntSlice = h.IntSlice[:len(h.IntSlice)-1] + return x +} + +// runJob runs a compile job and notifies doneChan of completion. +func runJob(job *compileJob, doneChan chan *compileJob) { + start := time.Now() + if job.run != nil { + err := job.run(job) + if err != nil { + job.err = err + } + } + job.duration = time.Since(start) + doneChan <- job } diff --git a/builder/library.go b/builder/library.go index 64bd8ecd..a0c365ed 100644 --- a/builder/library.go +++ b/builder/library.go @@ -43,7 +43,7 @@ func (l *Library) Load(config *compileopts.Config, tmpdir string) (dir string, e return "", err } defer unlock() - err = runJobs(job, config.Options.Parallelism) + err = runJobs(job, config.Options.Semaphore) return filepath.Dir(job.result), err } diff --git a/compileopts/options.go b/compileopts/options.go index 8339f2ce..a82dfcf9 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -32,7 +32,7 @@ type Options struct { DumpSSA bool VerifyIR bool PrintCommands func(cmd string, args ...string) - Parallelism int // -p flag + Semaphore chan struct{} // -p flag controls cap Debug bool PrintSizes string PrintAllocs *regexp.Regexp // regexp string diff --git a/main.go b/main.go index 4797c0b4..a97fd148 100644 --- a/main.go +++ b/main.go @@ -1237,7 +1237,7 @@ func main() { PrintIR: *printIR, DumpSSA: *dumpSSA, VerifyIR: *verifyIR, - Parallelism: *parallelism, + Semaphore: make(chan struct{}, *parallelism), Debug: !*nodebug, PrintSizes: *printSize, PrintStacks: *printStacks, diff --git a/main_test.go b/main_test.go index 1e5dfba7..e35bfbbe 100644 --- a/main_test.go +++ b/main_test.go @@ -53,7 +53,6 @@ func TestCompiler(t *testing.T) { "testing.go", "zeroalloc.go", } - _, minor, err := goenv.GetGorootVersion(goenv.Get("GOROOT")) if err != nil { t.Fatal("could not read version from GOROOT:", err) @@ -62,16 +61,18 @@ func TestCompiler(t *testing.T) { tests = append(tests, "go1.17.go") } + sema := make(chan struct{}, runtime.NumCPU()) + if *testTarget != "" { // This makes it possible to run one specific test (instead of all), // which is especially useful to quickly check whether some changes // affect a particular target architecture. - runPlatTests(optionsFromTarget(*testTarget), tests, t) + runPlatTests(optionsFromTarget(*testTarget, sema), tests, t) return } t.Run("Host", func(t *testing.T) { - runPlatTests(optionsFromTarget(""), tests, t) + runPlatTests(optionsFromTarget("", sema), tests, t) }) // Test a few build options. @@ -82,10 +83,11 @@ func TestCompiler(t *testing.T) { t.Run("opt=1", func(t *testing.T) { t.Parallel() runTestWithConfig("stdlib.go", t, compileopts.Options{ - GOOS: goenv.Get("GOOS"), - GOARCH: goenv.Get("GOARCH"), - GOARM: goenv.Get("GOARM"), - Opt: "1", + GOOS: goenv.Get("GOOS"), + GOARCH: goenv.Get("GOARCH"), + GOARM: goenv.Get("GOARM"), + Opt: "1", + Semaphore: sema, }, nil, nil) }) @@ -94,10 +96,11 @@ func TestCompiler(t *testing.T) { t.Run("opt=0", func(t *testing.T) { t.Parallel() runTestWithConfig("print.go", t, compileopts.Options{ - GOOS: goenv.Get("GOOS"), - GOARCH: goenv.Get("GOARCH"), - GOARM: goenv.Get("GOARM"), - Opt: "0", + GOOS: goenv.Get("GOOS"), + GOARCH: goenv.Get("GOARCH"), + GOARM: goenv.Get("GOARM"), + Opt: "0", + Semaphore: sema, }, nil, nil) }) @@ -112,6 +115,7 @@ func TestCompiler(t *testing.T) { "someGlobal": "foobar", }, }, + Semaphore: sema, }, nil, nil) }) }) @@ -123,28 +127,28 @@ func TestCompiler(t *testing.T) { } t.Run("EmulatedCortexM3", func(t *testing.T) { - runPlatTests(optionsFromTarget("cortex-m-qemu"), tests, t) + runPlatTests(optionsFromTarget("cortex-m-qemu", sema), tests, t) }) t.Run("EmulatedRISCV", func(t *testing.T) { - runPlatTests(optionsFromTarget("riscv-qemu"), tests, t) + runPlatTests(optionsFromTarget("riscv-qemu", sema), tests, t) }) if runtime.GOOS == "linux" { t.Run("X86Linux", func(t *testing.T) { - runPlatTests(optionsFromOSARCH("linux/386"), tests, t) + runPlatTests(optionsFromOSARCH("linux/386", sema), tests, t) }) t.Run("ARMLinux", func(t *testing.T) { - runPlatTests(optionsFromOSARCH("linux/arm/6"), tests, t) + runPlatTests(optionsFromOSARCH("linux/arm/6", sema), tests, t) }) t.Run("ARM64Linux", func(t *testing.T) { - runPlatTests(optionsFromOSARCH("linux/arm64"), tests, t) + runPlatTests(optionsFromOSARCH("linux/arm64", sema), tests, t) }) t.Run("WebAssembly", func(t *testing.T) { - runPlatTests(optionsFromTarget("wasm"), tests, t) + runPlatTests(optionsFromTarget("wasm", sema), tests, t) }) t.Run("WASI", func(t *testing.T) { - runPlatTests(optionsFromTarget("wasi"), tests, t) + runPlatTests(optionsFromTarget("wasi", sema), tests, t) }) } } @@ -185,24 +189,26 @@ func runPlatTests(options compileopts.Options, tests []string, t *testing.T) { } } -func optionsFromTarget(target string) compileopts.Options { +func optionsFromTarget(target string, sema chan struct{}) compileopts.Options { return compileopts.Options{ // GOOS/GOARCH are only used if target == "" - GOOS: goenv.Get("GOOS"), - GOARCH: goenv.Get("GOARCH"), - GOARM: goenv.Get("GOARM"), - Target: target, + GOOS: goenv.Get("GOOS"), + GOARCH: goenv.Get("GOARCH"), + GOARM: goenv.Get("GOARM"), + Target: target, + Semaphore: sema, } } // optionsFromOSARCH returns a set of options based on the "osarch" string. This // string is in the form of "os/arch/subarch", with the subarch only sometimes // being necessary. Examples are "darwin/amd64" or "linux/arm/7". -func optionsFromOSARCH(osarch string) compileopts.Options { +func optionsFromOSARCH(osarch string, sema chan struct{}) compileopts.Options { parts := strings.Split(osarch, "/") options := compileopts.Options{ - GOOS: parts[0], - GOARCH: parts[1], + GOOS: parts[0], + GOARCH: parts[1], + Semaphore: sema, } if options.GOARCH == "arm" { options.GOARM = parts[2]