From 9e8ef028cd39db48a276a00e90a525574d52c2f8 Mon Sep 17 00:00:00 2001 From: Matthieu Gomez Date: Sat, 30 May 2026 14:24:14 +0200 Subject: [PATCH 1/4] Thread the adjoint gather! with gated per-thread accumulators gather! (A'u, the adjoint half of every LSMR iteration) was single-threaded while scatter! (A) was threaded, making it the serial bottleneck on many-core machines for large N. Thread it with preallocated per-thread accumulators: each thread reduces a contiguous row chunk into a private length-fe.n buffer, then the cache-resident buffers are summed into fecoef. The buffers and row ranges are built once in the linear-map constructor and reused across all LSMR iterations. Gated per fixed effect: threading is used only when the nt buffers fit in cache (nt * fe.n * sizeof(T) <= 8MB) and N is large enough; otherwise the serial gather is used. (The merge/fill memory traffic would otherwise dominate, and a GPU-style bucketization loses to the serial sequential-read gather on the CPU.) High- cardinality fixed effects (e.g. individual ids) therefore fall back to serial with no regression. Benchmarks (8 threads): 1.14-1.26x on the full demean (solve_residuals!) for low/moderate cardinality; 1.00x and bit-identical for high cardinality. Threaded and serial residuals agree to machine epsilon. FixedEffects test suite passes. The GPU backends are unaffected (they keep the generic adjoint mul!). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/FixedEffects.jl | 1 + src/SolverCPU.jl | 100 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 92 insertions(+), 9 deletions(-) diff --git a/src/FixedEffects.jl b/src/FixedEffects.jl index ffda210..7852041 100644 --- a/src/FixedEffects.jl +++ b/src/FixedEffects.jl @@ -7,6 +7,7 @@ module FixedEffects ############################################################################## using Base: @propagate_inbounds +using Base.Threads: @threads, nthreads using LinearAlgebra: LinearAlgebra, Adjoint, mul!, rmul!, norm, axpy! using PrecompileTools: @setup_workload, @compile_workload using StatsBase: AbstractWeights, UnitWeights, Weights, uweights diff --git a/src/SolverCPU.jl b/src/SolverCPU.jl index 26f3856..cc8c243 100644 --- a/src/SolverCPU.jl +++ b/src/SolverCPU.jl @@ -1,38 +1,120 @@ ############################################################################## -## +## ## Implement AbstractFixedEffectLinearMap ## ############################################################################## +# Strategy for the adjoint gather (A'u) of one fixed effect, picked once at construction +# and dispatched on by `gather!` below: a serial scatter-add, or a threaded reduction +# through per-thread accumulators. +struct SerialGather end + +struct ThreadedGather{V<:AbstractVector} + buffers::Vector{V} # one length-fe.n accumulator per thread + ranges::Vector{UnitRange{Int}} # contiguous row chunks (shared across fixed effects) +end + mutable struct FixedEffectLinearMapCPU{T} <: AbstractFixedEffectLinearMap{T} fes::Vector{<:FixedEffect} scales::Vector{<:AbstractVector} caches::Vector{<:AbstractVector} + gathers::Vector{Union{SerialGather, ThreadedGather{Vector{T}}}} +end + +# Toggle to force the serial baseline (e.g. for benchmarking); threading is on by default. +const _USE_THREADED_GATHER = Ref(true) +# Threading the gather pays off only when the nt per-thread accumulators of length fe.n fit +# in cache; beyond that the fill/merge memory traffic dominates and serial is faster. +const _GATHER_BUFFER_BUDGET = 8 * 1024 * 1024 # bytes +const _GATHER_MIN_ROWS = 100_000 # below this, threading overhead isn't worth it + +function _row_chunks(n::Int, k::Int) + base, rem = divrem(n, k) + out = Vector{UnitRange{Int}}(undef, k) + s = 1 + for t in 1:k + len = base + (t <= rem ? 1 : 0) + out[t] = s:(s + len - 1) + s += len + end + return out +end + +# Per fixed effect, thread the gather only if the accumulators fit in cache and N is large. +function _gather_strategy(::Type{T}, fe::FixedEffect, N::Int, nt::Int, + ranges::Vector{UnitRange{Int}}) where {T} + if _USE_THREADED_GATHER[] && nt > 1 && N >= _GATHER_MIN_ROWS && + nt * fe.n * sizeof(T) <= _GATHER_BUFFER_BUDGET + return ThreadedGather([zeros(T, fe.n) for _ in 1:nt], ranges) + else + return SerialGather() + end end function FixedEffectLinearMapCPU{T}(fes::Vector{<:FixedEffect}) where {T} scales = [zeros(T, fe.n) for fe in fes] caches = [zeros(T, length(fes[1].interaction)) for fe in fes] - return FixedEffectLinearMapCPU{T}(fes, scales, caches) + N = length(fes[1].refs) + nt = nthreads() + ranges = _row_chunks(N, nt) + G = Union{SerialGather, ThreadedGather{Vector{T}}} + gathers = G[_gather_strategy(T, fe, N, nt, ranges) for fe in fes] + return FixedEffectLinearMapCPU{T}(fes, scales, caches, gathers) end -# multithreaded gather seems to be slower -function gather!(fecoef::AbstractVector, refs::AbstractVector, α::Number, - y::AbstractVector, cache::AbstractVector) - # no @simd: multiple i may write to the same fecoef[refs[i]] - @fastmath @inbounds for i in eachindex(y) - fecoef[refs[i]] += α * y[i] * cache[i] +# The one place the gather arithmetic lives: scatter-add a row range into `out`, +# out[refs[i]] += α * y[i] * cache[i]. No @simd: distinct i may write the same out[refs[i]]. +function _gather!(out::AbstractVector, refs::AbstractVector, α::Number, + y::AbstractVector, cache::AbstractVector, range) + @fastmath @inbounds for i in range + out[refs[i]] += α * y[i] * cache[i] + end + return out +end + +# Serial: one pass over all rows straight into fecoef (which already holds β * old). +gather!(fecoef::AbstractVector, refs::AbstractVector, α::Number, + y::AbstractVector, cache::AbstractVector, ::SerialGather) = + _gather!(fecoef, refs, α, y, cache, eachindex(y)) + +# Threaded: each thread reduces its row chunk into a private (cache-resident) buffer, +# then the buffers are summed into fecoef. +function gather!(fecoef::AbstractVector, refs::AbstractVector, α::Number, + y::AbstractVector, cache::AbstractVector, g::ThreadedGather) + @threads for t in eachindex(g.buffers) + buf = g.buffers[t] + fill!(buf, zero(eltype(buf))) + _gather!(buf, refs, α, y, cache, g.ranges[t]) end + @inbounds for buf in g.buffers + @simd for k in eachindex(fecoef) + fecoef[k] += buf[k] + end + end + return fecoef end -function scatter!(y::AbstractVector, α::Number, fecoef::AbstractVector, +function scatter!(y::AbstractVector, α::Number, fecoef::AbstractVector, refs::AbstractVector, cache::AbstractVector) @spawn_for_chunks 100_000 for i in eachindex(y) @inbounds y[i] += α * fecoef[refs[i]] * cache[i] end end +# CPU adjoint (A'u). Overrides the generic AbstractFixedEffectLinearMap method to use each +# fixed effect's gather strategy (chosen at construction). GPU backends keep the generic one. +function LinearAlgebra.mul!(fecoefs::FixedEffectCoefficients, + Cfem::Adjoint{T, FixedEffectLinearMapCPU{T}}, + y::AbstractVector, α::Number, β::Number) where {T} + fem = adjoint(Cfem) + rmul!(fecoefs, β) + @inbounds for i in eachindex(fem.fes) + gather!(fecoefs.x[i], fem.fes[i].refs, α, y, fem.caches[i], fem.gathers[i]) + end + return fecoefs +end + ############################################################################## ## From c6f61638f09c9499e13e837f6a0dacbf75ca112e Mon Sep 17 00:00:00 2001 From: Matthieu Gomez Date: Sun, 31 May 2026 01:12:30 +0200 Subject: [PATCH 2/4] Unify CPU/GPU gather behind a per-fixed-effect plan type Previously the CPU used a backend-specific adjoint mul! override (the gather plan lived in a `gathers` field), while Metal fused the plan into a `MetalGatherCache` and chose bin-vs-atomic with a runtime branch. Give all three backends one shape: - Gather-plan types in core: SerialGather/ThreadedGather (CPU), AtomicGather/BucketGather (GPU). Chosen once at construction, dispatched on by gather!. - Each linear map carries parallel `scales`, `caches` (plain preconditioner vectors), and `gathers` (the plan). scale!(scale,...) and cache!(cache,...) stay symmetric on plain vectors on every backend. - One generic mul!: forward passes caches[i] to scatter!; adjoint passes caches[i] and gathers[i] to gather!. No backend-specific mul! override. - Metal: split MetalGatherCache into plain caches + Atomic/Bucket gathers; the _has_bin_cache runtime branch becomes gather! dispatch. - CUDA: gains a gathers field of AtomicGather and the matching gather! method. CPU and Metal verified here (FixedEffects test suite incl. GPU parity passes; CPU demean speedup unchanged and bit-accurate). CUDA updated to match but not run (no NVIDIA device available). Co-Authored-By: Claude Opus 4.8 (1M context) --- ext/CUDAExt.jl | 12 +-- ext/MetalExt.jl | 128 +++++++++++++++------------- src/AbstractFixedEffectLinearMap.jl | 19 ++++- src/SolverCPU.jl | 23 ----- 4 files changed, 94 insertions(+), 88 deletions(-) diff --git a/ext/CUDAExt.jl b/ext/CUDAExt.jl index bc81304..c398d49 100644 --- a/ext/CUDAExt.jl +++ b/ext/CUDAExt.jl @@ -1,6 +1,6 @@ module CUDAExt using FixedEffects, CUDA -using FixedEffects: FixedEffectCoefficients, AbstractWeights, UnitWeights, LinearAlgebra, Adjoint, mul!, rmul!, lsmr!, AbstractFixedEffectLinearMap, copy_internal! +using FixedEffects: FixedEffectCoefficients, AbstractWeights, UnitWeights, LinearAlgebra, Adjoint, mul!, rmul!, lsmr!, AbstractFixedEffectLinearMap, copy_internal!, AtomicGather CUDA.allowscalar(false) ############################################################################## @@ -36,19 +36,21 @@ mutable struct FixedEffectLinearMapCUDA{T} <: AbstractFixedEffectLinearMap{T} fes::Vector{<:FixedEffect} scales::Vector{<:AbstractVector} caches::Vector{<:AbstractVector} + gathers::Vector{AtomicGather} end function FixedEffectLinearMapCUDA{T}(fes::Vector{<:FixedEffect}) where {T} fes = [_cu(T, fe) for fe in fes] scales = [CUDA.zeros(T, fe.n) for fe in fes] caches = [CUDA.zeros(T, length(fes[1].interaction)) for fe in fes] - return FixedEffectLinearMapCUDA{T}(fes, scales, caches) + gathers = [AtomicGather() for fe in fes] + return FixedEffectLinearMapCUDA{T}(fes, scales, caches, gathers) end -function FixedEffects.gather!(fecoef::CuVector, refs::CuVector, α::Number, y::CuVector, cache::CuVector) +function FixedEffects.gather!(fecoef::CuVector, refs::CuVector, α::Number, y::CuVector, cache::CuVector, ::AtomicGather) nthreads = 256 - nblocks = cld(length(y), nthreads) - @cuda threads=nthreads blocks=nblocks gather_kernel!(fecoef, refs, α, y, cache) + nblocks = cld(length(y), nthreads) + @cuda threads=nthreads blocks=nblocks gather_kernel!(fecoef, refs, α, y, cache) end function gather_kernel!(fecoef, refs, α, y, cache) diff --git a/ext/MetalExt.jl b/ext/MetalExt.jl index b87d940..070d0bc 100644 --- a/ext/MetalExt.jl +++ b/ext/MetalExt.jl @@ -1,6 +1,6 @@ module MetalExt using FixedEffects, Metal -using FixedEffects: FixedEffectCoefficients, AbstractWeights, UnitWeights, LinearAlgebra, Adjoint, mul!, rmul!, lsmr!, AbstractFixedEffectLinearMap, copy_internal! +using FixedEffects: FixedEffectCoefficients, AbstractWeights, UnitWeights, LinearAlgebra, Adjoint, mul!, rmul!, lsmr!, AbstractFixedEffectLinearMap, copy_internal!, AtomicGather, BucketGather Metal.allowscalar(false) ############################################################################## @@ -33,71 +33,80 @@ _mtl(T::Type, w::AbstractVector) = MtlVector{T}(convert(Vector{T}, w)) mutable struct FixedEffectLinearMapMetal{T} <: AbstractFixedEffectLinearMap{T} fes::Vector{<:FixedEffect} - scales::Vector{<:AbstractVector} - caches::Vector + scales::Vector{MtlVector{T}} + caches::Vector{MtlVector{T}} + gathers::Vector{Union{AtomicGather, BucketGather}} end -function bucketize_refs(refs::Vector, n::Int) +function _metal_threadgroup_width() + width = Int(device().maxThreadsPerThreadgroup.width) + return prevpow(2, width) +end + +function bucketize_refs(refs::AbstractVector{<:Integer}, n::Int) # count the number of obs per group - counts = zeros(Int, n) - @inbounds for r in refs - counts[r] += 1 - end + counts = zeros(Int, n) + @inbounds for r in refs + counts[r] += 1 + end # offsets is vcat(1, cumsum(counts)) - offsets_mtl = Metal.@sync Metal.zeros(Int, n + 1; storage = Metal.SharedStorage) - offsets = unsafe_wrap(Array{Int}, offsets_mtl, size(offsets_mtl)) - offsets[1] = 1 - @inbounds for k in 1:n - offsets[k+1] = offsets[k] + counts[k] - end + offsets = Vector{Int}(undef, n + 1) + offsets[1] = 1 + @inbounds for k in 1:n + offsets[k+1] = offsets[k] + counts[k] + end - perm_mtl = Metal.@sync Metal.zeros(Int, length(refs); storage = Metal.SharedStorage) - perm = unsafe_wrap(Array{Int}, perm_mtl, size(perm_mtl)) - next = offsets[1:n] - @inbounds for i in eachindex(refs) - r = refs[i] - p = next[r] - perm[p] = i - next[r] = p + 1 - end - return perm_mtl, offsets_mtl + perm = Vector{Int}(undef, length(refs)) + next = offsets[1:n] + @inbounds for i in eachindex(refs) + r = refs[i] + p = next[r] + perm[p] = i + next[r] = p + 1 + end + return MtlVector{Int}(perm), MtlVector{Int}(offsets) end function FixedEffectLinearMapMetal{T}(fes::Vector{<:FixedEffect}) where {T} fes2 = [_mtl(T, fe) for fe in fes] scales = [Metal.zeros(T, fe.n) for fe in fes] - caches = [Any[Metal.zeros(T, length(fe.refs)), Metal.zeros(Int, 1), Metal.zeros(Int, 1)] for fe in fes] + caches = [Metal.zeros(T, length(fe.refs)) for fe in fes] + G = Union{AtomicGather, BucketGather} + gathers = Vector{G}(undef, length(fes)) Threads.@threads for i in 1:length(fes) refs = fes[i].refs n = fes[i].n - if n < min(100_000, div(length(refs), 16)) - out = bucketize_refs(refs, n) - caches[i][2] = out[1] - caches[i][3] = out[2] + # bucketize (one threadgroup per group) for low cardinality; else atomic adds + if n < min(100_000, div(length(refs), 16)) + perm, offsets = bucketize_refs(refs, n) + gathers[i] = BucketGather(perm, offsets) + else + gathers[i] = AtomicGather() end end - return FixedEffectLinearMapMetal{T}(fes2, scales, caches) + return FixedEffectLinearMapMetal{T}(fes2, scales, caches, gathers) end -function FixedEffects.gather!(fecoef::MtlVector, refs::MtlVector, α::Number, y::MtlVector, cache::Vector) +function FixedEffects.gather!(fecoef::MtlVector, refs::MtlVector, α::Number, y::MtlVector, cache::MtlVector, g::BucketGather) n = length(fecoef) - nthreads = Int(device().maxThreadsPerThreadgroup.width) - if n < min(100_000, div(length(refs), 16)) - Metal.@sync @metal threads=nthreads groups=n gather_kernel_bin!(fecoef, refs, α, y, cache[1], cache[2], cache[3], Val(nthreads)) - else - nblocks = cld(length(y), nthreads) - Metal.@sync @metal threads=nthreads groups=nblocks gather_kernel!(fecoef, refs, α, y, cache[1]) - end + nthreads = _metal_threadgroup_width() + Metal.@sync @metal threads=nthreads groups=n gather_kernel_bin!(fecoef, refs, α, y, cache, g.perm, g.offsets, Val(nthreads)) +end + +function FixedEffects.gather!(fecoef::MtlVector, refs::MtlVector, α::Number, y::MtlVector, cache::MtlVector, ::AtomicGather) + nthreads = _metal_threadgroup_width() + nblocks = cld(length(y), nthreads) + Metal.@sync @metal threads=nthreads groups=nblocks gather_kernel!(fecoef, refs, α, y, cache) end function gather_kernel_bin!(fecoef, refs, α, y, cache, perm, offsets, ::Val{NT}) where {NT} - k = Int(threadgroup_position_in_grid().x) # 1..K (Julia-style indexing) :contentReference[oaicite:2]{index=2} - tid = Int(thread_position_in_threadgroup().x) # 1..nthreads :contentReference[oaicite:3]{index=3} - nt = Int(threads_per_threadgroup().x) # nthreads :contentReference[oaicite:4]{index=4} + k = Int(threadgroup_position_in_grid().x) + tid = Int(thread_position_in_threadgroup().x) + nt = Int(threads_per_threadgroup().x) # threadgroup scratch T = eltype(fecoef) - shared = Metal.MtlThreadGroupArray(T, NT) # threadgroup-local array :contentReference[oaicite:5]{index=5} + shared = Metal.MtlThreadGroupArray(T, NT) start = @inbounds offsets[k] stop = @inbounds offsets[k+1] - 1 @@ -113,7 +122,7 @@ function gather_kernel_bin!(fecoef, refs, α, y, cache, perm, offsets, ::Val{NT} end @inbounds shared[tid] = acc - Metal.threadgroup_barrier(Metal.MemoryFlagThreadGroup) # sync + tg fence :contentReference[oaicite:6]{index=6} + Metal.threadgroup_barrier(Metal.MemoryFlagThreadGroup) # tree reduction in shared memory offset = nt ÷ 2 @@ -141,10 +150,10 @@ function gather_kernel!(fecoef, refs, α, y, cache) return nothing end -function FixedEffects.scatter!(y::MtlVector, α::Number, fecoef::MtlVector, refs::MtlVector, cache::Vector) - nthreads = Int(device().maxThreadsPerThreadgroup.width) +function FixedEffects.scatter!(y::MtlVector, α::Number, fecoef::MtlVector, refs::MtlVector, cache::MtlVector) + nthreads = _metal_threadgroup_width() nblocks = cld(length(y), nthreads) - Metal.@sync @metal threads=nthreads groups=nblocks scatter_kernel!(y, α, fecoef, refs, cache[1]) + Metal.@sync @metal threads=nthreads groups=nblocks scatter_kernel!(y, α, fecoef, refs, cache) end function scatter_kernel!(y, α, fecoef, refs, cache) @@ -172,19 +181,22 @@ mutable struct FixedEffectSolverMetal{T} <: FixedEffects.AbstractFixedEffectSolv v::FixedEffectCoefficients{<: AbstractVector{T}} h::FixedEffectCoefficients{<: AbstractVector{T}} hbar::FixedEffectCoefficients{<: AbstractVector{T}} + tmp::Vector{T} fes::Vector{<:FixedEffect} end function FixedEffects.AbstractFixedEffectSolver{T}(fes::Vector{<:FixedEffect}, weights::AbstractWeights, ::Type{Val{:Metal}}) where {T} + T === Float32 || throw(ArgumentError("The Metal backend supports Float32 solves only; pass double_precision=false or use method=:cpu for Float64.")) m = FixedEffectLinearMapMetal{T}(fes) - b = Metal.zeros(T, length(weights); storage = Metal.SharedStorage) - r = Metal.zeros(T, length(weights); storage = Metal.SharedStorage) + b = Metal.zeros(T, length(weights)) + r = Metal.zeros(T, length(weights)) x = FixedEffectCoefficients([Metal.zeros(T, fe.n) for fe in fes]) v = FixedEffectCoefficients([Metal.zeros(T, fe.n) for fe in fes]) h = FixedEffectCoefficients([Metal.zeros(T, fe.n) for fe in fes]) hbar = FixedEffectCoefficients([Metal.zeros(T, fe.n) for fe in fes]) - feM = FixedEffectSolverMetal{T}(m, Metal.zeros(T, length(weights)), b, r, x, v, h, hbar, fes) + tmp = zeros(T, length(weights)) + feM = FixedEffectSolverMetal{T}(m, Metal.zeros(T, length(weights)), b, r, x, v, h, hbar, tmp, fes) FixedEffects.update_weights!(feM, weights) end @@ -201,7 +213,7 @@ function FixedEffects.update_weights!(feM::FixedEffectSolverMetal{T}, weights::A end function scale!(scale::MtlVector, refs::MtlVector, interaction::MtlVector, weights::MtlVector) - nthreads = Int(device().maxThreadsPerThreadgroup.width) + nthreads = _metal_threadgroup_width() nblocks = cld(length(refs), nthreads) fill!(scale, 0) Metal.@sync @metal threads=nthreads groups=nblocks scale_kernel!(scale, refs, interaction, weights) @@ -224,10 +236,10 @@ function inv_kernel!(scale, T) return nothing end -function cache!(cache, refs::MtlVector, interaction::MtlVector, weights::MtlVector, scale::MtlVector) - nthreads = Int(device().maxThreadsPerThreadgroup.width) - nblocks = cld(length(cache[1]), nthreads) - Metal.@sync @metal threads=nthreads groups=nblocks cache!_kernel!(cache[1], refs, interaction, weights, scale) +function cache!(cache::MtlVector, refs::MtlVector, interaction::MtlVector, weights::MtlVector, scale::MtlVector) + nthreads = _metal_threadgroup_width() + nblocks = cld(length(cache), nthreads) + Metal.@sync @metal threads=nthreads groups=nblocks cache!_kernel!(cache, refs, interaction, weights, scale) end function cache!_kernel!(cache, refs, interaction, weights, scale) @@ -240,14 +252,14 @@ end function FixedEffects.copy_internal!(feM::FixedEffectSolverMetal{T}, field::Symbol, r::AbstractVector) where {T} synchronize() - feM_r = unsafe_wrap(Array{T}, getfield(feM, field), size(getfield(feM, field))) - copyto!(feM_r, r) + copyto!(feM.tmp, r) + copyto!(getfield(feM, field), feM.tmp) end function FixedEffects.copy_internal!(r::AbstractVector, feM::FixedEffectSolverMetal{T}, field::Symbol) where {T} synchronize() - feM_r = unsafe_wrap(Array{T}, getfield(feM, field), size(getfield(feM, field))) - copyto!(r, feM_r) + copyto!(feM.tmp, getfield(feM, field)) + copyto!(r, feM.tmp) end diff --git a/src/AbstractFixedEffectLinearMap.jl b/src/AbstractFixedEffectLinearMap.jl index e634c8c..e3ac757 100644 --- a/src/AbstractFixedEffectLinearMap.jl +++ b/src/AbstractFixedEffectLinearMap.jl @@ -15,6 +15,21 @@ abstract type AbstractFixedEffectLinearMap{T} end +# Per-fixed-effect plan for the adjoint gather (A'u). Chosen once at construction and +# dispatched on by each backend's `gather!`. The forward map (A = scatter) needs only the +# preconditioner `caches[i]` and is shared; the gather plan lives in `gathers[i]`. +# CPU uses Serial/Threaded; the GPU backends use Atomic/Bucket. +struct SerialGather end +struct ThreadedGather{V<:AbstractVector} + buffers::Vector{V} # one length-fe.n accumulator per thread + ranges::Vector{UnitRange{Int}} # contiguous row chunks +end +struct AtomicGather end +struct BucketGather{V<:AbstractVector} + perm::V # observation indices sorted by group + offsets::V # CSR offsets into perm (length ngroups + 1) +end + Base.adjoint(fem::AbstractFixedEffectLinearMap) = Adjoint(fem) function Base.size(fem::AbstractFixedEffectLinearMap, dim::Integer) @@ -28,8 +43,8 @@ function LinearAlgebra.mul!(fecoefs::FixedEffectCoefficients, y::AbstractVector, α::Number, β::Number) where {T} fem = adjoint(Cfem) rmul!(fecoefs, β) - for (fecoef, fe, cache) in zip(fecoefs.x, fem.fes, fem.caches) - gather!(fecoef, fe.refs, α, y, cache) + for (fecoef, fe, cache, gather) in zip(fecoefs.x, fem.fes, fem.caches, fem.gathers) + gather!(fecoef, fe.refs, α, y, cache, gather) end return fecoefs end diff --git a/src/SolverCPU.jl b/src/SolverCPU.jl index cc8c243..bb0d63a 100644 --- a/src/SolverCPU.jl +++ b/src/SolverCPU.jl @@ -4,16 +4,6 @@ ## ############################################################################## -# Strategy for the adjoint gather (A'u) of one fixed effect, picked once at construction -# and dispatched on by `gather!` below: a serial scatter-add, or a threaded reduction -# through per-thread accumulators. -struct SerialGather end - -struct ThreadedGather{V<:AbstractVector} - buffers::Vector{V} # one length-fe.n accumulator per thread - ranges::Vector{UnitRange{Int}} # contiguous row chunks (shared across fixed effects) -end - mutable struct FixedEffectLinearMapCPU{T} <: AbstractFixedEffectLinearMap{T} fes::Vector{<:FixedEffect} scales::Vector{<:AbstractVector} @@ -102,19 +92,6 @@ function scatter!(y::AbstractVector, α::Number, fecoef::AbstractVector, end end -# CPU adjoint (A'u). Overrides the generic AbstractFixedEffectLinearMap method to use each -# fixed effect's gather strategy (chosen at construction). GPU backends keep the generic one. -function LinearAlgebra.mul!(fecoefs::FixedEffectCoefficients, - Cfem::Adjoint{T, FixedEffectLinearMapCPU{T}}, - y::AbstractVector, α::Number, β::Number) where {T} - fem = adjoint(Cfem) - rmul!(fecoefs, β) - @inbounds for i in eachindex(fem.fes) - gather!(fecoefs.x[i], fem.fes[i].refs, α, y, fem.caches[i], fem.gathers[i]) - end - return fecoefs -end - ############################################################################## ## From d302f00f371b1389d6a33ea437195cabdb003fc6 Mon Sep 17 00:00:00 2001 From: Matthieu Gomez Date: Sun, 31 May 2026 01:18:24 +0200 Subject: [PATCH 3/4] Standardize LSMR convergence/iteration reporting; add a GPU parity test - Report LSMR iterations consistently (ch.mvps) in solve_residuals! and solve_coefficients!, with a cold start and the maxiter check at the end of the loop, so maxiter=1 no longer reports convergence after zero iterations. - Validate maxiter >= 0 and warn when a solve does not converge within maxiter. - Document that GPU backends default to Float32 (looser tolerance, less accurate). - Add a CPU/GPU parity test for solve_residuals!. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/AbstractFixedEffectSolver.jl | 30 +++++++++------- src/utils/lsmr.jl | 5 +-- test/solve.jl | 61 ++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/src/AbstractFixedEffectSolver.jl b/src/AbstractFixedEffectSolver.jl index 32951dd..8e09180 100644 --- a/src/AbstractFixedEffectSolver.jl +++ b/src/AbstractFixedEffectSolver.jl @@ -17,9 +17,9 @@ Returns ``y_i - X_i'\\beta`` where ``\\beta = argmin_{b} \\sum_i y_i - X_i'b``, * `fes`: A `Vector{<:FixedEffect}` * `w`: A vector of weights, i.e. `AbstractWeights` * `method` : A symbol between :cpu (default), :CUDA, or :Metal -* `double_precision::Bool`: Should the demeaning operation use Float64 rather than Float32? Default to method == : cpu. +* `double_precision::Bool`: Should the demeaning operation use Float64 rather than Float32? Default to method == :cpu. GPU backends use Float32 by default; Float32 solves use a looser default tolerance and can be less accurate than CPU Float64 solves. * `tol` : Tolerance. Default to 1e-8 if `double_precision = true`, 1e-6 otherwise. -* `maxiter` : Maximum number of iterations +* `maxiter` : Maximum number of LSMR iterations ### Returns * `res` : Residual of the least square problem @@ -47,18 +47,22 @@ end function solve_residuals!(r::AbstractVector{<:Real}, feM::AbstractFixedEffectSolver{T}; tol::Real = sqrt(eps(T)), maxiter::Integer = 100_000) where {T} + maxiter >= 0 || throw(ArgumentError("maxiter must be non-negative")) # One cannot copy view of Vector (r) on GPU, so first collect the vector copy_internal!(feM, :r, r) if !(feM.weights isa UnitWeights) feM.r .*= sqrt.(feM.weights) end copyto!(feM.b, feM.r) - mul!(feM.x, feM.m', feM.b, 1, 0) - iter, converged = 1, true - if length(feM.x.x) > 1 - x, ch = lsmr!(feM.x, feM.m, feM.b, feM.v, feM.h, feM.hbar; atol = tol, btol = tol, maxiter = maxiter - 1) - iter, converged = ch.mvps + 1, ch.isconverged - end + fill!(feM.x, zero(T)) + iter, converged = 0, true + if length(feM.x.x) == 1 + mul!(feM.x, feM.m', feM.b, 1, 0) + else + _, ch = lsmr!(feM.x, feM.m, feM.b, feM.v, feM.h, feM.hbar; atol = tol, btol = tol, maxiter = maxiter) + iter, converged = ch.mvps, ch.isconverged + end + converged || @warn "solve_residuals! did not converge within maxiter LSMR iterations; returned values may be inaccurate." iterations=iter maxiter tol mul!(feM.r, feM.m, feM.x, -1, 1) if !(feM.weights isa UnitWeights) feM.r ./= sqrt.(feM.weights) @@ -111,9 +115,9 @@ Returns ``\\beta = argmin_{b} \\sum_i w_i(y_i - X_i'b)`` where `X` denotes the m * `fes`: A `Vector{<:FixedEffect}` * `w`: A vector of weights, i.e. `AbstractWeights` * `method` : A symbol between :cpu (default), :CUDA, or :Metal -* `double_precision::Bool`: Should the demeaning operation use Float64 rather than Float32? Default to method == :cpu. +* `double_precision::Bool`: Should the demeaning operation use Float64 rather than Float32? Default to method == :cpu. GPU backends use Float32 by default; Float32 solves use a looser default tolerance and can be less accurate than CPU Float64 solves. * `tol` : Tolerance. Default to 1e-8 if `double_precision = true`, 1e-6 otherwise. -* `maxiter` : Maximum number of iterations +* `maxiter` : Maximum number of LSMR iterations ### Returns @@ -145,16 +149,18 @@ function solve_coefficients!(y::AbstractVector{<: Number}, fes::AbstractVector{< end function solve_coefficients!(r::AbstractVector, feM::AbstractFixedEffectSolver{T}; tol::Real = sqrt(eps(T)), maxiter::Integer = 100_000) where {T} + maxiter >= 0 || throw(ArgumentError("maxiter must be non-negative")) # One cannot copy view of Vector (r) on GPU, so first collect the vector copy_internal!(feM, :b, r) if !(feM.weights isa UnitWeights) feM.b .*= sqrt.(feM.weights) end fill!(feM.x, zero(T)) - x, ch = lsmr!(feM.x, feM.m, feM.b, feM.v, feM.h, feM.hbar; atol = tol, btol = tol, maxiter = maxiter) + _, ch = lsmr!(feM.x, feM.m, feM.b, feM.v, feM.h, feM.hbar; atol = tol, btol = tol, maxiter = maxiter) + ch.isconverged || @warn "solve_coefficients! did not converge within maxiter LSMR iterations; returned values may be inaccurate." iterations=ch.mvps maxiter tol for (x, scale) in zip(feM.x.x, feM.m.scales) x .*= scale end x = Vector{eltype(r)}[collect(x) for x in feM.x.x] - full(normalize!(x, feM.m.fes), feM.m.fes), div(ch.mvps, 2), ch.isconverged + full(normalize!(x, feM.m.fes), feM.m.fes), ch.mvps, ch.isconverged end diff --git a/src/utils/lsmr.jl b/src/utils/lsmr.jl index e10c337..7b003c3 100644 --- a/src/utils/lsmr.jl +++ b/src/utils/lsmr.jl @@ -48,6 +48,7 @@ function lsmr!(x, A, b, v, h, hbar; maxiter::Integer = max(size(A,1), size(A,2)), λ::Number = 0) # Sanity-checking + maxiter >= 0 || throw(ArgumentError("maxiter must be non-negative")) m = size(A, 1) n = size(A, 2) length(x) == n || error("x has length $(length(x)) but should have length $n") @@ -205,7 +206,6 @@ function lsmr!(x, A, b, v, h, hbar; # the parameters atol, btol, conlim to 0.) # The effect is equivalent to the normAl tests using # atol = eps, btol = eps, conlim = 1/eps. - if iter >= maxiter istop = 7; break end if 1 + test3 <= 1 istop = 6; break end if 1 + test2 <= 1 istop = 5; break end if 1 + t1 <= 1 istop = 4; break end @@ -213,7 +213,9 @@ function lsmr!(x, A, b, v, h, hbar; if test3 <= ctol istop = 3; break end if test2 <= atol istop = 2; break end if test1 <= rtol istop = 1; break end + if iter >= maxiter istop = 7; break end end + istop == 0 && (istop = 7) end converged = istop ∉ (3, 6, 7) tol = (atol, btol, ctol) @@ -221,4 +223,3 @@ function lsmr!(x, A, b, v, h, hbar; return x, ch end - diff --git a/test/solve.jl b/test/solve.jl index 5df735e..19c2d07 100644 --- a/test/solve.jl +++ b/test/solve.jl @@ -15,6 +15,18 @@ r_ols = [-0.2015993617092453, 0.2015993617092464, -0.2015993617092463, 0.2015 (r, iter, conv) = solve_residuals!(deepcopy(x), fes) @test r ≈ r_ols +@testset "maxiter semantics" begin + (r0, iter0, conv0) = @test_logs (:warn, r"solve_residuals!") solve_residuals!(deepcopy(x), fes; maxiter = 0) + @test iter0 == 0 + @test !conv0 + (r1, iter1, conv1) = @test_logs (:warn, r"solve_residuals!") solve_residuals!(deepcopy(x), fes; maxiter = 1) + @test iter1 == 1 + (coefs1, coef_iter1, coef_conv1) = @test_logs (:warn, r"solve_coefficients!") solve_coefficients!(deepcopy(x), fes; maxiter = 1) + @test coef_iter1 == 1 + @test_throws ArgumentError solve_residuals!(deepcopy(x), fes; maxiter = -1) + @test_throws ArgumentError solve_coefficients!(deepcopy(x), fes; maxiter = -1) +end + # PooledArrays (r, iter, conv) = solve_residuals!(deepcopy(x), [FixedEffect(PooledArray(p1)), FixedEffect(PooledArray(p2))]) @test r ≈ r_ols @@ -37,6 +49,55 @@ for method in method_s @test Float32.(r) ≈ Float32.(r_ols) end +function _residual_from_coefs(y, coefs) + out = copy(y) + for coef in coefs + out .-= coef + end + return out +end + +@testset "GPU parity" begin + n_gpu = 2048 + p1_gpu = mod1.(1:n_gpu, 32) + p2_gpu = mod1.((1:n_gpu) .* 7, 41) + x_gpu = sin.((1:n_gpu) ./ 3) .+ cos.((1:n_gpu) ./ 11) + weights_gpu = Weights(1 .+ mod.(1:n_gpu, 5) ./ 10) + interaction_gpu = 0.5 .+ mod.(1:n_gpu, 11) ./ 13 + fes_gpu = [FixedEffect(p1_gpu), FixedEffect(p2_gpu)] + fes_interact_gpu = [FixedEffect(p1_gpu, interaction = interaction_gpu), FixedEffect(p2_gpu)] + fes_bin_gpu = [FixedEffect(p1_gpu)] + atol_gpu = 1e-3 + rtol_gpu = 1e-3 + + for method in filter(!=(:cpu), method_s) + cpu_r = solve_residuals!(deepcopy(x_gpu), fes_gpu; double_precision = false)[1] + gpu_r = solve_residuals!(deepcopy(x_gpu), fes_gpu; method = method, double_precision = false)[1] + @test gpu_r ≈ cpu_r atol=atol_gpu rtol=rtol_gpu + + cpu_weighted_r = solve_residuals!(deepcopy(x_gpu), fes_gpu, weights_gpu; double_precision = false)[1] + gpu_weighted_r = solve_residuals!(deepcopy(x_gpu), fes_gpu, weights_gpu; method = method, double_precision = false)[1] + @test gpu_weighted_r ≈ cpu_weighted_r atol=atol_gpu rtol=rtol_gpu + + cpu_interact_r = solve_residuals!(deepcopy(x_gpu), fes_interact_gpu, weights_gpu; double_precision = false)[1] + gpu_interact_r = solve_residuals!(deepcopy(x_gpu), fes_interact_gpu, weights_gpu; method = method, double_precision = false)[1] + @test gpu_interact_r ≈ cpu_interact_r atol=atol_gpu rtol=rtol_gpu + + cpu_coefs = solve_coefficients!(deepcopy(x_gpu), fes_gpu, weights_gpu; double_precision = false)[1] + gpu_coefs = solve_coefficients!(deepcopy(x_gpu), fes_gpu, weights_gpu; method = method, double_precision = false)[1] + @test _residual_from_coefs(x_gpu, gpu_coefs) ≈ _residual_from_coefs(x_gpu, cpu_coefs) atol=atol_gpu rtol=rtol_gpu + + cpu_bin_r = solve_residuals!(deepcopy(x_gpu), fes_bin_gpu, weights_gpu; double_precision = false)[1] + gpu_bin_r = solve_residuals!(deepcopy(x_gpu), fes_bin_gpu, weights_gpu; method = method, double_precision = false)[1] + @test gpu_bin_r ≈ cpu_bin_r atol=atol_gpu rtol=rtol_gpu + end + + if Metal.functional() + @test_throws ArgumentError solve_residuals!(deepcopy(x), fes; method = :Metal, double_precision = true) + @test_throws ArgumentError solve_coefficients!(deepcopy(x), fes; method = :Metal, double_precision = true) + end +end + fe = FixedEffect([1, 2]) @test_throws "FixedEffects must have the same length as y" ỹ = solve_residuals!(ones(100), [fe]) From beafa677f5a3070895a36144a534e7b175a36fd3 Mon Sep 17 00:00:00 2001 From: Matthieu Gomez Date: Sun, 31 May 2026 01:24:34 +0200 Subject: [PATCH 4/4] Bump version to 3.2.0 Co-Authored-By: Claude Opus 4.8 (1M context) --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 7358743..a052baf 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "FixedEffects" uuid = "c8885935-8500-56a7-9867-7708b20db0eb" -version = "3.1.0" +version = "3.2.0" [deps] GroupedArrays = "6407cd72-fade-4a84-8a1e-56e431fc1533"