diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 73341a6b..d097f507 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -6,6 +6,7 @@ class Configuration attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds + attr_writer :heartbeat_max_test_duration attr_accessor :lazy_load, :lazy_load_stream_batch_size attr_writer :lazy_load_streaming_timeout attr_accessor :lazy_load_test_helpers @@ -57,7 +58,7 @@ def initialize( grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil, max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil, queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil, - export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, + export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, heartbeat_max_test_duration: nil, lazy_load: false, lazy_load_stream_batch_size: nil, lazy_load_streaming_timeout: nil, lazy_load_test_helpers: nil, skip_stale_tests: false) @build_id = build_id @@ -86,6 +87,7 @@ def initialize( @warnings_file = warnings_file @debug_log = debug_log @max_missed_heartbeat_seconds = max_missed_heartbeat_seconds + @heartbeat_max_test_duration = heartbeat_max_test_duration @lazy_load = lazy_load @lazy_load_stream_batch_size = lazy_load_stream_batch_size || 5_000 @lazy_load_streaming_timeout = lazy_load_streaming_timeout @@ -153,6 +155,10 @@ def inactive_workers_timeout @inactive_workers_timeout || timeout end + def heartbeat_max_test_duration + @heartbeat_max_test_duration || (timeout * 10 if max_missed_heartbeat_seconds) + end + def max_consecutive_failures=(max) if max @circuit_breakers << CircuitBreaker::MaxConsecutiveFailures.new(max_consecutive_failures: max) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 0ead2884..1efb2866 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -63,7 +63,7 @@ def reconnect_attempts def with_heartbeat(id, lease: nil) if heartbeat_enabled? ensure_heartbeat_thread_alive! - heartbeat_state.set(:tick, id, lease) + heartbeat_state.set(:tick, id, lease, Process.clock_gettime(Process::CLOCK_MONOTONIC)) end yield @@ -386,16 +386,32 @@ def heartbeat Thread.current.name = "CI::Queue#heartbeat" Thread.current.abort_on_exception = true + capped = false + loop do command = heartbeat_state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command case command&.first when :tick - # command = [:tick, entry_id, lease_id] + next if capped + + max_duration = config.heartbeat_max_test_duration + if max_duration + # command = [:tick, entry_id, lease_id, started_at] + # Use the absolute start time from when with_heartbeat was called so that + # the elapsed calculation is not skewed by heartbeat thread startup delay. + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - command[3] + if elapsed >= max_duration + capped = true + next + end + end + + # command = [:tick, entry_id, lease_id, started_at] heartbeat_process.tick!(command[1], command[2]) when :reset # Test finished, stop ticking until next test starts - nil + capped = false when :stop break end diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index 9fa58611..2a405ff1 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -736,6 +736,16 @@ def parser queue_config.max_missed_heartbeat_seconds = time || 30 end + help = <<~EOS + Maximum duration in seconds that the heartbeat will tick for a single test. + If a test runs longer than this, the heartbeat stops and the test entry becomes + eligible for reclamation by another worker. + Defaults to timeout * 10 when heartbeat is enabled. + EOS + opts.on("--heartbeat-max-test-duration SECONDS", Float, help) do |seconds| + queue_config.heartbeat_max_test_duration = seconds + end + opts.on("-v", "--verbose", "Verbose. Show progress processing files.") do self.verbose = true diff --git a/ruby/test/ci/queue/configuration_test.rb b/ruby/test/ci/queue/configuration_test.rb index b39bc427..a6728577 100644 --- a/ruby/test/ci/queue/configuration_test.rb +++ b/ruby/test/ci/queue/configuration_test.rb @@ -200,5 +200,19 @@ def test_new_lazy_load_test_helpers_env assert_equal ["test/test_helper.rb", "test/support/helper.rb"], config.lazy_load_test_helper_paths end + def test_heartbeat_max_test_duration_defaults + # defaults to timeout*10 when heartbeat is enabled + config = Configuration.new(timeout: 5, max_missed_heartbeat_seconds: 1) + assert_equal 50, config.heartbeat_max_test_duration + + # nil when heartbeat is disabled (no max_missed_heartbeat_seconds) + config = Configuration.new(timeout: 5) + assert_nil config.heartbeat_max_test_duration + + # explicit value overrides the default + config = Configuration.new(timeout: 5, max_missed_heartbeat_seconds: 1, heartbeat_max_test_duration: 3) + assert_equal 3, config.heartbeat_max_test_duration + end + end end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 18339a55..f3139582 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -496,6 +496,76 @@ def test_heartbeat_only_checks_lease assert_nil result end + def test_heartbeat_max_test_duration_stops_heartbeat + queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 1, tests: [TEST_LIST.first], build_id: 'hb-cap') + queue.boot_heartbeat_process! + + entry = nil + lease = nil + queue.poll do |test| + entry = test.queue_entry + lease = queue.lease_for(entry) + + # Score should be updating while heartbeat ticks + queue.with_heartbeat(entry, lease: lease) do + sleep 0.5 + score_while_ticking = @redis.zscore(queue.send(:key, 'running'), entry) + refute_nil score_while_ticking, "Entry should be in running set while heartbeat ticks" + + # Sleep past the heartbeat cap (1s) + extra buffer + sleep 1.5 + + # After cap, score should have stopped updating. + # The entry should now be stale enough for reserve_lost to reclaim. + score_after_cap = @redis.zscore(queue.send(:key, 'running'), entry) + # Score should be frozen (not updated for >1s since cap at ~1s) + assert score_after_cap < CI::Queue.time_now.to_f - 1, "Score should be stale after heartbeat cap" + end + + queue.acknowledge(entry) + end + + refute_nil entry, "Test should have been reserved" + ensure + queue&.stop_heartbeat! + end + + def test_heartbeat_cap_resets_between_tests + # Two slow tests; cap fires after 1s so the first one goes stale. + # After the first test finishes, :reset is sent and capped becomes false, + # so the heartbeat should resume ticking for the second test. + tests = TEST_LIST.first(2) + queue = worker(1, max_missed_heartbeat_seconds: 3, heartbeat_max_test_duration: 1, tests: tests, build_id: 'hb-reset') + queue.boot_heartbeat_process! + + polled = [] + queue.poll do |test| + entry = test.queue_entry + lease = queue.lease_for(entry) + polled << entry + + queue.with_heartbeat(entry, lease: lease) do + if polled.size == 1 + # Sleep past cap for first test — heartbeat stops ticking + sleep 2 + score = @redis.zscore(queue.send(:key, 'running'), entry) + assert score < CI::Queue.time_now.to_f - 1, "First test score should be stale after cap" + else + # For second test, sleep briefly then verify score is fresh — reset worked + sleep 0.5 + score = @redis.zscore(queue.send(:key, 'running'), entry) + assert score >= CI::Queue.time_now.to_f - 2, "Second test score should be fresh after cap reset" + end + end + + queue.acknowledge(entry) + end + + assert_equal 2, polled.size, "Both tests should have been polled" + ensure + queue&.stop_heartbeat! + end + def test_resolve_entry_falls_back_to_resolver queue = worker(1, populate: false) queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok }) diff --git a/ruby/test/fixtures/test/consecutive_capped_tests.rb b/ruby/test/fixtures/test/consecutive_capped_tests.rb new file mode 100644 index 00000000..ee9e8dad --- /dev/null +++ b/ruby/test/fixtures/test/consecutive_capped_tests.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true +require 'test_helper' + +CI::Queue::Redis.max_sleep_time = 0.05 + +# Fixture for test_heartbeat_cap_resets_between_tests. +# +# test_alpha fires the heartbeat cap (sleep 2 > cap 1s) but finishes before going stale +# (sleep 2 < cap 1 + heartbeat 2 = 3s). This sets capped=true in the heartbeat thread. +# After test_alpha, :reset is sent and capped should be false. +# +# test_beta sleeps in the range (heartbeat=2, heartbeat+cap=3): +# - Without reset: no ticks, stale at t_B + 2s, finishes at t_B + 2.5s → STOLEN +# - With reset: ticks until cap at t_B + 1s, stale at t_B + 3s, finishes at t_B + 2.5s → NOT stolen +class ConsecutiveCappedTests < Minitest::Test + def test_alpha + sleep 2 + end + + def test_beta + sleep 2.5 + end +end diff --git a/ruby/test/fixtures/test/two_lost_tests.rb b/ruby/test/fixtures/test/two_lost_tests.rb new file mode 100644 index 00000000..8875e87d --- /dev/null +++ b/ruby/test/fixtures/test/two_lost_tests.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true +require 'test_helper' + +CI::Queue::Redis.max_sleep_time = 0.05 + +class TwoLostTests < Minitest::Test + + def test_alpha + sleep 3 + end + + def test_beta + sleep 3 + end + +end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index ea56442d..b0cc163c 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -97,6 +97,202 @@ def test_lost_test_with_heartbeat_monitor end end + def test_lost_test_with_heartbeat_max_duration + # Start worker 0 first so it claims the test before worker 1 starts polling. + # Worker 0 heartbeat caps at 0.3s → entry stale at ~t=2 → worker 1 steals at ~t=2. + # lost_test sleeps 3s, giving a ~1s window for the steal before the test finishes. + _, err = capture_subprocess_io do + t0 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '0', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '0.3', + '-Itest', + 'test/lost_test.rb', + chdir: 'test/fixtures/', + ) + end + + # Give worker 0 time to claim the test before worker 1 starts polling. + sleep 0.5 + + t1 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '1', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '0.3', + '-Itest', + 'test/lost_test.rb', + chdir: 'test/fixtures/', + ) + end + + [t0, t1].each(&:join) + end + + assert_empty filter_deprecation_warnings(err) + + Tempfile.open('warnings') do |warnings_file| + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', '1', + '--timeout', '1', + '--warnings-file', warnings_file.path, + '--heartbeat', + chdir: 'test/fixtures/', + ) + end + + assert_empty filter_deprecation_warnings(err) + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } + # Worker 0's heartbeat caps at 0.3s; the entry goes stale ~2s after the last tick + # (before lost_test finishes at t=3). Worker 1 steals it, generating a warning. + assert warnings.size >= 1, "Expected at least 1 RESERVED_LOST_TEST warning, got #{warnings.size}" + end + end + + def test_heartbeat_cap_doesnt_affect_fast_tests + # With cap enabled, fast-passing tests should complete normally with no entries + # going stale. The heartbeat cap should be a no-op when tests finish quickly. + _, err = capture_subprocess_io do + 2.times.map do |i| + Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', i.to_s, + '--timeout', '1', + '--heartbeat', '5', + '--heartbeat-max-test-duration', '60', + '-Itest', + 'test/passing_test.rb', + chdir: 'test/fixtures/', + ) + end + end.each(&:join) + end + + assert_empty filter_deprecation_warnings(err) + + Tempfile.open('warnings') do |warnings_file| + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', '1', + '--timeout', '1', + '--warnings-file', warnings_file.path, + '--heartbeat', + chdir: 'test/fixtures/', + ) + end + + assert_empty filter_deprecation_warnings(err) + result = normalize(out.lines[1].strip) + assert_equal "Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } + assert_equal 0, warnings.size, "No tests should be stolen -- heartbeat cap should not have fired" + end + end + + def test_heartbeat_cap_resets_between_tests + # Worker 0 is the sole run worker: it processes test_alpha first (cap fires at 1s, + # test completes at 2s — not stolen), then :reset clears capped and it picks up + # test_beta. A thief (worker 1) starts only after test_beta is in `running` so it + # cannot grab it from the queue; it can only steal if test_beta goes stale. + # + # With reset working: test_beta heartbeat ticks until cap at t_B+1s, stale at t_B+3s, + # finishes at t_B+2.5s → NOT stolen → 0 warnings. + # With broken reset: no ticks for test_beta, stale at t_B+2s, finishes at t_B+2.5s + # → stolen by the thief → 1 warning. + _, err = capture_subprocess_io do + t0 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '0', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '1', + '-Itest', + 'test/consecutive_capped_tests.rb', + chdir: 'test/fixtures/', + ) + end + + # Wait for worker 0 to finish test_alpha (2s sleep + up to ~2s startup) and claim + # test_beta. Once test_beta is in `running`, the thief cannot grab it from the queue. + sleep 5 + + t1 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '1', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '1', + '-Itest', + 'test/consecutive_capped_tests.rb', + chdir: 'test/fixtures/', + ) + end + + [t0, t1].each(&:join) + end + + assert_empty filter_deprecation_warnings(err) + + Tempfile.open('warnings') do |warnings_file| + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', '1', + '--timeout', '1', + '--warnings-file', warnings_file.path, + '--heartbeat', + chdir: 'test/fixtures/', + ) + end + + assert_empty filter_deprecation_warnings(err) + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } + assert_equal 0, warnings.size, "No tests should be stolen — heartbeat cap must reset between consecutive tests" + end + end + def test_lazy_loading_streaming out, err = capture_subprocess_io do threads = 2.times.map do |i|