Parallel Loops

TanayLabUtilities.ParallelLoops.parallel_loop_wo_rng Function
parallel_loop_wo_rng(
    body::Function,
    indices::AbstractVector{<:Integer};
    name::AbstractString = ".loop",
    policy::Symbol = :greedy,
    progress::Maybe{Progress} = nothing,
)::Nothing

Run the body in parallel, passing it the iteration index . The policy is passed to @threads if it is one of (the default :greedy , :dynamic , or :static ). If it is :serial , then the loop is not run in parallel (useful for debugging).

If this is nested (and the containing loop is parallel), policy is ignored and the loop is executed serially. This allows functions to be parallel if invoked from the main thread, but become serial if invoked from inside an already parallel loop.

The name is used for flame_timed for the whole loop. If the loop is parallel, the whole loop is a line in the serial flame file, and each iteration is a line in the parallel flame file. The serial file therefore gives a view of the elapsed time of the top-level loops, to identify what actually matters, and the parallel file shows the internal breakdown of the computations in each loop. It is sadly impossible to include threaded tasks in the same flamegraph in a meaningful way (at least not without extending the flamegraph format and visualization).

If progress is specified, it is updated for each iteration.

Note

The code inside the loop body should not use any random number generation. Use parallel_loop_with_rng if random number generation is needed.

using Test
using Random

size = 10

for policy in (:serial, :greedy, :dynamic, :static)
    results = zeros(Int, size)
    parallel_loop_wo_rng(1:size; policy) do index
        results[index] = index
        return nothing
    end
    @test results == collect(1:size)
end

println("OK")

# output

OK

TanayLabUtilities.ParallelLoops.parallel_loop_with_rng Function
parallel_loop_with_rng(
    body::Function,
    indices::AbstractVector{<:Integer};
    name::AbstractString = ".loop",
    policy::Symbol = :greedy,
    progress::Maybe{Progress} = nothing,
    seed::Maybe{Integer} = nothing,
    rng::Maybe{AbstractRNG} = nothing
)::Nothing

Run the body in parallel, passing it the iteration index and a separate rng that is seeded to a reproducible state regardless of the allocation of tasks to threads. A copy of this rng is given to each iteration, after being reset to seed + index for reproducibility. If no seed is specified, it is just sampled rng before the loop starts. If the rng isn't given, then this uses (and sets for each iteration) the default_rng() . In this case passing it to the body is redundant but is still done for consistency.

The policy is passed to @threads if it is one of (the default :greedy , :dynamic , or :static ). If it is :serial , then the loop is not run in parallel (useful for debugging).

If this is nested (and the containing loop is parallel), policy is ignored and the loop is executed serially. This allows functions to be parallel if invoked from the main thread, but become serial if invoked from inside an already parallel loop.

The name is used for flame_timed for the whole loop. If the loop is parallel, the whole loop is a line in the serial flame file, and each iteration is a line in the parallel flame file. The serial file therefore gives a view of the elapsed time of the top-level loops, to identify what actually matters, and the parallel file shows the internal breakdown of the computations in each loop. It is sadly impossible to include threaded tasks in the same flamegraph in a meaningful way (at least not without extending the flamegraph format and visualization).

If progress is specified, it is updated for each iteration.

Note

Yes, the TaskLocalRNG is supposed to do this, but, it actually depends on the way tasks are allocated to threads. The implementation here will give the same results regardless of the thread scheduling policy. Sigh.

using Test
using Random

size = 10

function collect_rng(rng::AbstractRNG)::Vector{Float64}
    results = zeros(Float64, size)
    parallel_loop_with_rng(1:size; rng) do index, rng
        results[index] = rand(rng)
    end
    @test results[1] != results[2]
    return results
end

@test collect_rng(MersenneTwister(1)) == collect_rng(MersenneTwister(1))

function collect_default_rng()::Vector{Float64}
    results = zeros(Float64, size)
    parallel_loop_with_rng(1:size; seed = 123456, policy = :dynamic) do index, _
        results[index] = rand()
    end
    @test results[1] != results[2]
    return results
end

@test collect_default_rng() == collect_default_rng()

println("OK")

# output

OK

Index