Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/controlplane/cmd/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (

var orgRequirementsTracer = otelx.Tracer("chainloop-controlplane", "middleware/orgrequirements")

const validationTimeOffset = 5 * time.Minute
// Slightly larger than the background CAS backend checker cadence for default/fallback
// backends (30 min, see cmd/main.go), so the request path doesn't re-validate ahead of
// the background loop while it's still finishing a tick.
const validationTimeOffset = 35 * time.Minute

// ValidateCASBackend checks that the current organization has a valid CAS Backend configured
// If the last validation happened more than validationTimeOffset ago it will re-run the validation
Expand Down
43 changes: 42 additions & 1 deletion app/controlplane/pkg/biz/casbackend_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,35 @@ var casBackendCheckerTracer = otelx.Tracer("chainloop-controlplane", "biz/casbac
const (
defaultInterval = 30 * time.Minute
defaultValidationTimeout = 10 * time.Second
// Upper bound on how long a single tick is allowed to hold the
// distributed lock. Defends against a hung validation pinning the lock
// past one tick; the next tick will retry.
defaultMaxTickDuration = 25 * time.Minute

// Separate keys per scope so the two checker goroutines (defaults vs all backends)
// don't block each other.
lockKeyDefaultsScope = "cas-backend-checker:defaults"
lockKeyAllScope = "cas-backend-checker:all"
)

// DistributedLock is a best-effort, cluster-wide mutex used to make sure
// background jobs that should run on a single replica at a time aren't
// duplicated across pods.
type DistributedLock interface {
// TryAcquire attempts to acquire the lock identified by key without
// blocking. If acquired is true, the caller MUST invoke release when
// done. The lock is also released automatically if the underlying
// session is lost (e.g. pod crash).
TryAcquire(ctx context.Context, key string) (acquired bool, release func(), err error)
}

type CASBackendChecker struct {
logger *log.Helper
casBackendRepo CASBackendRepo
caseBackendUseCase *CASBackendUseCase
// Validation timeout for each backend check
validationTimeout time.Duration
lock DistributedLock
}

type CASBackendCheckerOpts struct {
Expand All @@ -53,12 +74,13 @@ type CASBackendCheckerOpts struct {

// NewCASBackendChecker creates a new CAS backend checker that will periodically validate
// the status of CAS backends
func NewCASBackendChecker(logger log.Logger, casBackendRepo CASBackendRepo, casBackendUseCase *CASBackendUseCase) *CASBackendChecker {
func NewCASBackendChecker(logger log.Logger, casBackendRepo CASBackendRepo, casBackendUseCase *CASBackendUseCase, lock DistributedLock) *CASBackendChecker {
return &CASBackendChecker{
logger: log.NewHelper(log.With(logger, "component", "biz/CASBackendChecker")),
casBackendRepo: casBackendRepo,
caseBackendUseCase: casBackendUseCase,
validationTimeout: defaultValidationTimeout,
lock: lock,
}
}

Expand Down Expand Up @@ -120,6 +142,25 @@ func (c *CASBackendChecker) Start(ctx context.Context, opts *CASBackendCheckerOp
// checkBackends validates all CAS backends (or just default and fallback ones based on configuration)
// using a worker pool for parallel processing with timeouts
func (c *CASBackendChecker) checkBackends(ctx context.Context, defaultsOrFallbacks bool) error {
key := lockKeyAllScope
if defaultsOrFallbacks {
key = lockKeyDefaultsScope
}
acquired, release, err := c.lock.TryAcquire(ctx, key)
if err != nil {
return fmt.Errorf("acquiring checker lock: %w", err)
}
if !acquired {
c.logger.Debugw("msg", "another replica is running the CAS backend check, skipping", "scope", key)
return nil
}
defer release()

// Cap how long we can hold the lock. If validations hang, the next tick
// retries instead of one stuck pod pinning the lock indefinitely.
ctx, cancel := context.WithTimeout(ctx, defaultMaxTickDuration)
defer cancel()

ctx, span := otelx.Start(ctx, casBackendCheckerTracer, "CASBackendChecker.checkBackends")
defer span.End()

Expand Down
17 changes: 11 additions & 6 deletions app/controlplane/pkg/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package data

import (
"context"
"database/sql"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -59,11 +60,15 @@ var ProviderSet = wire.NewSet(
NewProjectVersionRepo,
NewProjectsRepo,
NewGroupRepo,
NewPostgresLock,
)

// Data .
type Data struct {
DB *ent.Client
// Exposed for code paths that need raw SQL features ent doesn't surface,
// e.g. session-scoped advisory locks (pg_try_advisory_lock).
SQLDB *sql.DB
}

// Load DB schema
Expand All @@ -80,7 +85,7 @@ func NewData(c *config.DatabaseConfig, tp trace.TracerProvider, logger log.Logge
}

log := log.NewHelper(logger)
db, err := initSQLDatabase(c, tp, log)
db, sqlDB, err := initSQLDatabase(c, tp, log)
if err != nil {
log.Errorf("error initialing the DB : %v", err)
return nil, nil, fmt.Errorf("failed to initialized db: %w", err)
Expand All @@ -93,12 +98,12 @@ func NewData(c *config.DatabaseConfig, tp trace.TracerProvider, logger log.Logge
}
}

return &Data{DB: db}, cleanup, nil
return &Data{DB: db, SQLDB: sqlDB}, cleanup, nil
}

func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log.Helper) (*ent.Client, error) {
func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log.Helper) (*ent.Client, *sql.DB, error) {
if c.Driver != "pgx" {
return nil, fmt.Errorf("unsupported driver: %s", c.Driver)
return nil, nil, fmt.Errorf("unsupported driver: %s", c.Driver)
}

log.Debugf("connecting to db: driver=%s", c.Driver)
Expand All @@ -115,7 +120,7 @@ func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log
}),
)
if err != nil {
return nil, fmt.Errorf("error opening the connection, driver=%s: %w", c.Driver, err)
return nil, nil, fmt.Errorf("error opening the connection, driver=%s: %w", c.Driver, err)
}

if c.MaxOpenConns > 0 {
Expand All @@ -139,7 +144,7 @@ func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log

// NOTE: We do not run migrations automatically anymore
// Instead we leverage atlas cli to run migrations
return client, nil
return client, db, nil
}

func toTimePtr(t time.Time) *time.Time {
Expand Down
101 changes: 101 additions & 0 deletions app/controlplane/pkg/data/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//
// Copyright 2026 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package data

import (
"context"
"database/sql"
"fmt"
"hash/fnv"
"time"

"github.com/chainloop-dev/chainloop/app/controlplane/pkg/biz"
"github.com/go-kratos/kratos/v2/log"
)

// Cap on how long the release path may block. Defends against a stuck
// session: if pg_advisory_unlock can't return, we drop the connection
// and let Postgres release the lock on session disconnect.
const advisoryUnlockTimeout = 5 * time.Second

// PostgresLock implements biz.DistributedLock using Postgres session-level
// advisory locks (pg_try_advisory_lock / pg_advisory_unlock).
//
// Postgres is the only piece of infrastructure that's mandatory for the
// control plane — NATS is optional (used for distributed caches when
// present). Using advisory locks lets us coordinate background jobs
// across replicas without adding a new dependency (a NATS KV lease or a
// dedicated queue) that wouldn't be available in every deployment.
//
// Each lock holds a dedicated connection for its lifetime; releasing the
// lock returns the connection to the pool. If the pod crashes mid-run the
// connection drops and Postgres releases the lock automatically.
type PostgresLock struct {
db *sql.DB
log *log.Helper
}

func NewPostgresLock(d *Data, logger log.Logger) biz.DistributedLock {
return &PostgresLock{
db: d.SQLDB,
log: log.NewHelper(logger),
}
}

func (l *PostgresLock) TryAcquire(ctx context.Context, key string) (bool, func(), error) {
intKey := hashKey(key)

conn, err := l.db.Conn(ctx)
if err != nil {
return false, nil, fmt.Errorf("acquiring DB connection: %w", err)
}

var acquired bool
if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", intKey).Scan(&acquired); err != nil {
_ = conn.Close()
return false, nil, fmt.Errorf("pg_try_advisory_lock: %w", err)
}

if !acquired {
_ = conn.Close()
return false, nil, nil
}

release := func() {
// pg_advisory_unlock must run on the same session that took the lock,
// and must run even if the caller's context was cancelled (e.g. shutdown).
// Bounded so a stuck session can't hang the release path.
releaseCtx, cancel := context.WithTimeout(context.Background(), advisoryUnlockTimeout)
defer cancel()
if _, err := conn.ExecContext(releaseCtx, "SELECT pg_advisory_unlock($1)", intKey); err != nil {
l.log.Warnw("msg", "failed to release advisory lock", "key", key, "error", err)
}
if err := conn.Close(); err != nil {
l.log.Warnw("msg", "failed to return DB connection to pool", "key", key, "error", err)
}
}
return true, release, nil
}

// hashKey turns an opaque string key into the int64 that
// pg_advisory_lock expects. FNV-1a gives a stable, well-distributed
// mapping with effectively no collision risk for the handful of named
// locks this package uses.
func hashKey(key string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
return int64(h.Sum64()) //nolint:gosec // intentional wraparound; pg accepts any int64
}
Loading