Skip to content

[feature request] Worker scheduler based on current load #76

@MilesCranmer

Description

@MilesCranmer

I make significant use of Julia's Distributed functionality in my SymbolicRegression.jl package. It uses asynchronous computing for a genetic algorithm: there are ~10 "populations" of individuals which can evolve independently of each other, can be merged asynchronously, and then continue evolving separately again. Thus, I can put each population into a separate @spawnat statement, then fetch and merge them into a bank of individuals when they are complete, and then @spawnat again with a mixed population.

However, the evolution time over each population is quite variable. I find that if I use a number of procs equal to my cores, oftentimes there will be many processors not being used at any given time. e.g., I have 20 populations, and 10 cores. Sometimes, @spawnat :any will be "out of phase" with the busiest workers, and keep allocating jobs to the already-busy workers, while the free workers don't receive any work. [having procs > cores would be an overkill solution that would lead to issues, such as load being larger than # of cores, too much memory usage, longer startup time, etc]

To fix this, I wrote a simple (implementation-specific) scheduler that allocates to the processor with the fewest jobs already in its queue. This boosts performance by nearly double.

So, I am wondering if @spawnat :any could be a bit more clever about which worker it puts a job onto? This is the current worker scheduler:
https://github.com/JuliaLang/julia/blob/bb5b98e72a151c41471d8cc14cacb495d647fb7f/stdlib/Distributed/src/macros.jl#L3-L13
As you can see, it goes through the list of workers one-by-one, regardless of whether they are free or busy.

This is what I am using for a scheduler, which gives me a big performance boost:

function next_worker(worker_assignment::Dict{Any, Int}, procs::Vector{Int})::Int

    # Count number of jobs on each process:
    job_counts = Dict(proc=>0 for proc in procs)
    for (key, value) in worker_assignment
        job_counts[value] += 1
    end

    # Return worker with fewest jobs:
    least_busy_worker = reduce(
        (proc1, proc2) -> job_counts[proc1] <= job_counts[proc2] ? proc1 : proc2,
        procs
    )
    return least_busy_worker
end

It simply counts the number of jobs allocated to each worker, and then allocates to the one with the fewest. (independent of the expected length of the jobs, etc.).

Could something like this be implemented more generally in the Distributed module of Julia? e.g., have a counter for each process that is incremented upon spawning, decremented upon job completion.

Thanks!
Miles

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions