Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions docs/design/2026_05_18_partial_6d_enable_storage_envelope.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,27 @@
into `store.WithEncryption` and `WithStorageEnvelopeGate` in
6D-6c-2.

- **6D-6c-2** (shipped, PR #826) — main.go production wiring:
`encryption.NewCipher(keystore)` + a single `encryption.StateCache`
threaded via `WithStateCache` into every per-shard `Applier`, with
`cache.ActiveStorageKeyID` / `cache.StorageEnvelopeActive` passed
into `store.WithEncryption` + `store.WithStorageEnvelopeGate` for
each shard's PebbleStore. Pulled forward the deterministic-nonce
core of Stage 7 (keystore hydration, local_epoch bump,
DeterministicNonceFactory) — see
[`2026_05_25_partial_6d6c2_production_storage_envelope_wiring.md`](2026_05_25_partial_6d6c2_production_storage_envelope_wiring.md).
- **6D-6c-3a** — main.go CapabilityFanout closure bound to the live
Raft membership view (`engine.Configuration` route snapshot across
all groups + a `DialFunc` over a dedicated `kv.GRPCConnCache`),
wired into the EnableStorageEnvelope server via
`WithEncryptionAdminCapabilityFanout` gated on encryption mutators.
See [`2026_05_25_partial_6d6c3a_capability_fanout_wiring.md`](2026_05_25_partial_6d6c3a_capability_fanout_wiring.md).

## Open milestones

- **6D-6c-2** — main.go production wiring: build
`encryption.NewCipher(keystore)` and construct a single
`encryption.StateCache` at startup (parallel to the shared
`*Keystore`). Thread the cache via `WithStateCache` into every
per-shard `Applier` inside `buildShardGroups`, and pass
`cache.ActiveStorageKeyID` / `cache.StorageEnvelopeActive`
(NOT the per-shard `Applier` delegates) into
`store.WithEncryption` + `store.WithStorageEnvelopeGate` for
each shard's PebbleStore. Reading via the StateCache directly
ensures every shard's storage layer sees the post-apply state
regardless of which shard's leader accepted the encryption
proposal.
- **6D-6c-3** — main.go CapabilityFanout closure bound to the
live Raft membership view (etcd engine route snapshot + admin
client DialFunc), and end-to-end integration test exercising
a single-node cluster Bootstrap → EnableStorageEnvelope →
Put → read-back-via-envelope.
- **6D-6c-3b** — end-to-end integration test exercising a single-node
cluster Bootstrap → EnableStorageEnvelope → Put →
read-back-via-envelope, on top of the 6D-6c-3a fan-out wiring.

## 0. Why this doc exists

Expand Down
117 changes: 117 additions & 0 deletions docs/design/2026_05_25_partial_6d6c3a_capability_fanout_wiring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Stage 6D-6c-3a — capability fan-out closure wiring

| Field | Value |
|---|---|
| Status | partial |
| Date | 2026-05-25 |
| Parent designs | [`2026_05_18_partial_6d_enable_storage_envelope.md`](2026_05_18_partial_6d_enable_storage_envelope.md) (6D-6c-3 milestone; §4 capability fan-out), [`2026_04_29_partial_data_at_rest_encryption.md`](2026_04_29_partial_data_at_rest_encryption.md) (§7.1 rollout) |
| Builds on | 6D-3 (`internal/admin.CapabilityFanout` helper), 6D-6a (`adapter.WithEncryptionAdminCapabilityFanout` option) |

**Lifecycle:** the §1 fan-out closure wiring shipped in this PR; the
6D-6c-3b end-to-end test (Bootstrap → EnableStorageEnvelope → Put →
read-back) remains open. Flips to `*_implemented_*` when 3b lands.

## 0. Why this doc exists

The `EnableStorageEnvelope` cutover RPC (6D-6a) runs the §4 capability
fan-out before proposing the cutover — but only when a
`CapabilityFanoutFn` closure is wired via
`WithEncryptionAdminCapabilityFanout`. `main.go` does **not** wire it
today, so `s.capabilityFanout == nil` and the cutover RPC refuses with
the §4 "fan-out not wired" `FailedPrecondition`. 6D-6c-3a builds and
wires that closure; 6D-6c-3b adds the end-to-end test on top.

The fan-out helper (`admin.CapabilityFanout(ctx, routes, dial,
timeout)`) and the RPC option already exist and are tested. This slice
is pure wiring of those existing pieces into `main.go` — no new RPC, no
wire-format change.

## 1. Scope (6D-6c-3a)

- `buildCapabilityFanoutFn(runtimes, connCache, timeout)
adapter.CapabilityFanoutFn` in a new `main_encryption_fanout.go`:
- **Snapshot builder** — for every runtime, call
`rt.engine.Configuration(ctx)` and map each `raftengine.Server`
to an `admin.RouteMember{FullNodeID: etcd.DeriveNodeID(srv.ID),
Address: srv.Address}`, splitting on `srv.Suffrage`
(`SuffrageLearner` → Learners, else → Voters, so empty/unannotated
suffrage counts as a voter per raftengine's convention) into one
`admin.RouteGroup{GroupID: rt.spec.id}`. The §4.1 contract is
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"every (voter ∪ learner) of **every** Raft group", so the
snapshot spans **all** runtimes, not just the cutover RPC's group.
- **DialFunc** — `connCache.ConnFor(addr)` → `pb.NewEncryptionAdminClient(conn)`,
with a **no-op cleanup** (the cache owns the conn lifecycle and
reuses it across probes; closing per-probe would defeat pooling).
- The closure runs `admin.CapabilityFanout(ctx, snapshot, dial,
timeout)`; a snapshot-build error (any `engine.Configuration`
failure) is returned as the closure error so the cutover RPC
fails closed (§4 maps it to a refusal).
- A **dedicated** `kv.GRPCConnCache` for the fan-out, created in
`startRaftServers` and closed via the cleanup stack. Not the
admin-forward cache: that one is gated on `--adminEnabled`, but the
cutover fan-out must dial whenever encryption mutators are enabled,
independent of the admin HTTP surface. `kv.GRPCConnCache.ConnFor`
already uses the shared `internalutil.GRPCDialOptions()` so the
fan-out dials with the same transport posture as every other
intra-cluster gRPC client.
- Wire the closure into `registerEncryptionAdminServer` via
`WithEncryptionAdminCapabilityFanout`, gated on the same
`enableMutators` boolean that gates the Proposer/LeaderView (the
fan-out is only meaningful when the cutover mutator is reachable).
- Fan-out timeout: a `const` in `main.go` (start at 5s — generous for
a small cluster GetCapability round-trip; the helper bounds the
whole fan-out by it regardless of member count).

### Out of scope (6D-6c-3b)

The end-to-end integration test (single-node cluster: Bootstrap →
EnableStorageEnvelope → Put → read-back-via-envelope) lands in 3b on
top of this wiring.

## 2. Fail-closed posture

- **Snapshot build error** (`engine.Configuration` fails on any
group) → closure returns the error → cutover RPC refuses. Never
propose a cutover against a membership view we could not fully
enumerate.
- **Unreachable / not-capable member** → handled inside
`CapabilityFanout` (verdict `Reachable=false` / `EncryptionCapable=false`
→ `OK=false`) → cutover refuses. No partial success (§4.3).
- **Closure not wired** (encryption mutators disabled) → unchanged
existing behavior: `s.capabilityFanout == nil` → cutover refuses
with the §4 "not wired" `FailedPrecondition`.

## 3. Why a dedicated conn cache (not the admin-forward one)

The admin-forward `connCache` is constructed only when `--adminEnabled`
(it backs the follower→leader admin write forwarder). The cutover
capability fan-out is orthogonal: it must dial every member's
`EncryptionAdmin` endpoint whenever the operator has enabled
encryption mutators, regardless of whether the admin HTTP API is
served. Coupling the two would make the fan-out silently inert on a
`--encryption-enabled` cluster that left `--adminEnabled` off. A
separate cache keeps the lifecycles independent and is cheap (one
idle `*grpc.ClientConn` per peer, closed on shutdown).

## 4. Self-review checklist (for the implementation PR)

- **Data loss / consistency** — read-only control-plane wiring; issues
no writes and changes no apply path. The cutover it gates is
unchanged (6D-6a).
- **Concurrency** — `CapabilityFanout` is already concurrent + timeout
bounded; the snapshot builder runs per-RPC, single-shot; the conn
cache is already concurrency-safe.
- **Performance** — fan-out runs once per cutover RPC (a rare operator
action), not on the data path.
- **Test coverage** — unit tests for the snapshot builder (Server →
RouteMember mapping, voter/learner split, multi-group, Configuration
error → closure error) and the DialFunc (cache reuse, nil-conn
guard). The full e2e is 3b.

## 5. Open questions

1. Fan-out timeout value — 5s proposed; revisit if large clusters need
more headroom (the helper bounds the whole fan-out, not per-probe).
2. Should learners that are mid-snapshot-catchup be probed? Yes — the
§4.1 contract is unconditional (voter ∪ learner); an unreachable
learner is a hard refusal, matching the parent design §8.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,7 @@ func startRaftServers(
// linter does not complain.
const extraOptsCap = 2
enableMutators := encryptionMutatorsEnabled()
encryptionCapabilityFanout := buildEncryptionCapabilityFanout(ctx, eg, runtimes, enableMutators)
for _, rt := range runtimes {
baseOpts := internalutil.GRPCServerOptions()
opts := make([]grpc.ServerOption, 0, len(baseOpts)+extraOptsCap)
Expand Down Expand Up @@ -1500,7 +1501,7 @@ func startRaftServers(
// per-shard loop. Each shard's own engine is the
// Proposer + LeaderView so the mutator proposes through
// the correct Raft group.
registerEncryptionAdminServer(gs, etcdraftengine.DeriveNodeID(*raftId), *encryptionSidecarPath, enableMutators, rt.engine)
registerEncryptionAdminServer(gs, etcdraftengine.DeriveNodeID(*raftId), *encryptionSidecarPath, enableMutators, rt.engine, encryptionCapabilityFanout)
registerAdminForwardServer(gs, forwardDeps, forwardLogger)
rt.registerGRPC(gs)
internalraftadmin.RegisterOperationalServices(ctx, gs, rt.engine, []string{"RawKV"})
Expand Down
9 changes: 8 additions & 1 deletion main_encryption_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type encryptionAdminEngine interface {
// (the §7.1 cutover refuses with ErrCapabilityCheckFailed);
// when set, capability probing reads the §5.1 keys.json and
// reports encryption_capable=true.
func registerEncryptionAdminServer(gs *grpc.Server, fullNodeID uint64, sidecarPath string, enableMutators bool, engine encryptionAdminEngine) {
func registerEncryptionAdminServer(gs *grpc.Server, fullNodeID uint64, sidecarPath string, enableMutators bool, engine encryptionAdminEngine, capabilityFanout adapter.CapabilityFanoutFn) {
opts := []adapter.EncryptionAdminServerOption{
adapter.WithEncryptionAdminFullNodeID(fullNodeID),
}
Expand All @@ -140,6 +140,13 @@ func registerEncryptionAdminServer(gs *grpc.Server, fullNodeID uint64, sidecarPa
adapter.WithEncryptionAdminProposer(engine),
adapter.WithEncryptionAdminLeaderView(engine),
)
// The §4 capability fan-out is only meaningful when the
// cutover mutator is reachable (same enableMutators gate as
// the Proposer/LeaderView). Without it the cutover RPC
// refuses with the §4 "fan-out not wired" FailedPrecondition.
if capabilityFanout != nil {
opts = append(opts, adapter.WithEncryptionAdminCapabilityFanout(capabilityFanout))
}
}
srv := adapter.NewEncryptionAdminServer(opts...)
if err := srv.Validate(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions main_encryption_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func callBootstrapAgainstNewServer(t *testing.T, enableMutators bool, engine enc
t.Helper()
listener := bufconn.Listen(1024 * 1024)
gs := grpc.NewServer()
registerEncryptionAdminServer(gs, 1, "", enableMutators, engine)
registerEncryptionAdminServer(gs, 1, "", enableMutators, engine, nil)
go func() { _ = gs.Serve(listener) }()
t.Cleanup(gs.Stop)
conn, err := grpc.NewClient(
Expand All @@ -161,7 +161,7 @@ func callBootstrapAgainstNewServer(t *testing.T, enableMutators bool, engine enc

func TestRegisterEncryptionAdminServer_Registers(t *testing.T) {
gs := grpc.NewServer()
registerEncryptionAdminServer(gs, 1, "", false, nil)
registerEncryptionAdminServer(gs, 1, "", false, nil, nil)
info := gs.GetServiceInfo()
if _, ok := info["EncryptionAdmin"]; !ok {
var registered []string
Expand Down
150 changes: 150 additions & 0 deletions main_encryption_fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"context"
"time"

"github.com/bootjp/elastickv/adapter"
"github.com/bootjp/elastickv/internal/admin"
"github.com/bootjp/elastickv/internal/raftengine"
etcdraftengine "github.com/bootjp/elastickv/internal/raftengine/etcd"
"github.com/bootjp/elastickv/kv"
pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
)

// capabilityFanoutTimeout bounds the whole §4 GetCapability fan-out the
// EnableStorageEnvelope cutover runs before proposing. The helper
// caps the entire fan-out (not per-probe) by this value regardless of
// member count, so a slow/unreachable peer cannot stall the operator's
// cutover RPC indefinitely — it surfaces as a Reachable=false verdict
// and the cutover fails closed. 5s is generous for a small-cluster
// GetCapability round-trip.
const capabilityFanoutTimeout = 5 * time.Second

// buildEncryptionCapabilityFanout builds the §4 capability fan-out
// closure shared across every shard's EncryptionAdmin server, or nil
// when encryption mutators are disabled (the cutover RPC is then
// unreachable, so the fan-out would never run). It owns a dedicated
// kv.GRPCConnCache — NOT the --adminEnabled-gated admin-forward cache —
// so the fan-out dials whenever encryption mutators are enabled,
// independent of the admin HTTP surface; the cache is drained on
// context cancellation via eg.
func buildEncryptionCapabilityFanout(ctx context.Context, eg *errgroup.Group, runtimes []*raftGroupRuntime, enableMutators bool) adapter.CapabilityFanoutFn {
if !enableMutators {
return nil
}
fanoutConnCache := &kv.GRPCConnCache{}
eg.Go(func() error {
<-ctx.Done()
if err := fanoutConnCache.Close(); err != nil {
return errors.Wrap(err, "close encryption capability fan-out gRPC connection cache")
}
return nil
})
return buildCapabilityFanoutFn(runtimes, fanoutConnCache, capabilityFanoutTimeout)
}

// buildCapabilityFanoutFn assembles the adapter.CapabilityFanoutFn the
// EnableStorageEnvelope server invokes for its §4 pre-flight check. The
// returned closure snapshots the live membership of every Raft group
// (the §4.1 "voter ∪ learner of every group" contract) and fans
// GetCapability out to each unique node.
//
// dial reuses connCache so repeated cutover attempts share one
// *grpc.ClientConn per peer; the cleanup closure is a no-op because the
// cache owns the connection lifecycle (closed on shutdown by the
// caller, not per-probe). A snapshot-build error fails the closure so
// the cutover refuses rather than proposing against a membership view
// it could not fully enumerate (fail-closed, §4.3 no-partial-success).
func buildCapabilityFanoutFn(runtimes []*raftGroupRuntime, connCache *kv.GRPCConnCache, timeout time.Duration) adapter.CapabilityFanoutFn {
dial := func(_ context.Context, address string) (pb.EncryptionAdminClient, func(), error) {
conn, err := connCache.ConnFor(address)
if err != nil {
return nil, nil, errors.Wrapf(err, "capability fan-out: dial %s", address)
}
return pb.NewEncryptionAdminClient(conn), func() {}, nil
}
Comment on lines +62 to +68
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The dial closure ignores the context.Context parameter passed by the fan-out helper. In accordance with project standards, context.Context should be used for managing deadlines and cancellation. The context should be propagated to the connection cache to ensure the fan-out timeout is strictly respected and to avoid goroutine leaks if a dial hangs.

Suggested change
dial := func(_ context.Context, address string) (pb.EncryptionAdminClient, func(), error) {
conn, err := connCache.ConnFor(address)
if err != nil {
return nil, nil, errors.Wrapf(err, "capability fan-out: dial %s", address)
}
return pb.NewEncryptionAdminClient(conn), func() {}, nil
}
dial := func(ctx context.Context, address string) (pb.EncryptionAdminClient, func(), error) {
conn, err := connCache.ConnFor(ctx, address)
if err != nil {
return nil, nil, errors.Wrapf(err, "capability fan-out: dial %s", address)
}
return pb.NewEncryptionAdminClient(conn), func() {}, nil
}
References
  1. When designing interfaces, use context.Context for managing deadlines and cancellation to ensure timeouts are strictly respected.

return func(ctx context.Context) (admin.CapabilityFanoutResult, error) {
snapshot, err := capabilityRouteSnapshot(ctx, runtimes)
if err != nil {
return admin.CapabilityFanoutResult{}, err
}
return admin.CapabilityFanout(ctx, snapshot, dial, timeout)
}
}

// configReader is the narrow membership-view surface
// capabilityRouteSnapshot needs from a Raft engine. raftengine.Engine
// satisfies it structurally; defining it here lets the snapshot
// builder be unit-tested with a stub that returns a Configuration
// error (the fail-closed path) without standing up a full engine.
type configReader interface {
Configuration(ctx context.Context) (raftengine.Configuration, error)
}

// groupConfigSource pairs a group id with its membership reader.
type groupConfigSource struct {
groupID uint64
engine configReader
}

// capabilityRouteSnapshot builds the all-groups admin.RouteSnapshot
// from each runtime's live Raft Configuration. It reads each engine
// via snapshotEngine() (engineMu.RLock) because the fan-out closure
// runs during live EnableStorageEnvelope RPCs, concurrently with a
// possible Close() that clears the engine field — a direct field read
// would be a data race. A nil runtime/engine is skipped.
func capabilityRouteSnapshot(ctx context.Context, runtimes []*raftGroupRuntime) (admin.RouteSnapshot, error) {
sources := make([]groupConfigSource, 0, len(runtimes))
for _, rt := range runtimes {
if rt == nil {
continue
}
eng := rt.snapshotEngine()
if eng == nil {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
continue
}
sources = append(sources, groupConfigSource{groupID: rt.spec.id, engine: eng})
}
return routeSnapshotFromSources(ctx, sources)
}

// routeSnapshotFromSources maps each group's Configuration into the
// admin.RouteSnapshot. Any Configuration error aborts the whole
// snapshot so the caller fails closed — the cutover must never
// proceed against a membership view it could not fully enumerate.
func routeSnapshotFromSources(ctx context.Context, sources []groupConfigSource) (admin.RouteSnapshot, error) {
groups := make([]admin.RouteGroup, 0, len(sources))
for _, s := range sources {
cfg, err := s.engine.Configuration(ctx)
if err != nil {
return admin.RouteSnapshot{}, errors.Wrapf(err, "capability fan-out: configuration for group %d", s.groupID)
}
groups = append(groups, routeGroupFromConfiguration(s.groupID, cfg))
}
return admin.RouteSnapshot{Groups: groups}, nil
}

// routeGroupFromConfiguration maps one Raft group's live Configuration
// to an admin.RouteGroup: each server becomes a RouteMember keyed by
// etcd.DeriveNodeID(srv.ID) (the node identity the rest of the
// encryption stack uses), split into Voters / Learners on Suffrage.
// Empty suffrage counts as voter, matching raftengine's convention
// that an unannotated peer (e.g. mid-WAL-replay) is a voter.
func routeGroupFromConfiguration(groupID uint64, cfg raftengine.Configuration) admin.RouteGroup {
group := admin.RouteGroup{GroupID: groupID}
for _, srv := range cfg.Servers {
member := admin.RouteMember{
FullNodeID: etcdraftengine.DeriveNodeID(srv.ID),
Address: srv.Address,
}
if srv.Suffrage == etcdraftengine.SuffrageLearner {
group.Learners = append(group.Learners, member)
} else {
group.Voters = append(group.Voters, member)
}
}
return group
}
Loading
Loading