-
Notifications
You must be signed in to change notification settings - Fork 2
feat(encryption): Stage 6D-6c-3a - wire capability fan-out closure into EnableStorageEnvelope #830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90dad5d
cdf5021
26ecce5
664d8f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| # Stage 6D-6c-3a — capability fan-out closure wiring | ||
|
|
||
| | Field | Value | | ||
| |---|---| | ||
| | Status | partial | | ||
| | Date | 2026-05-25 | | ||
| | Parent designs | [`2026_05_18_partial_6d_enable_storage_envelope.md`](2026_05_18_partial_6d_enable_storage_envelope.md) (6D-6c-3 milestone; §4 capability fan-out), [`2026_04_29_partial_data_at_rest_encryption.md`](2026_04_29_partial_data_at_rest_encryption.md) (§7.1 rollout) | | ||
| | Builds on | 6D-3 (`internal/admin.CapabilityFanout` helper), 6D-6a (`adapter.WithEncryptionAdminCapabilityFanout` option) | | ||
|
|
||
| **Lifecycle:** the §1 fan-out closure wiring shipped in this PR; the | ||
| 6D-6c-3b end-to-end test (Bootstrap → EnableStorageEnvelope → Put → | ||
| read-back) remains open. Flips to `*_implemented_*` when 3b lands. | ||
|
|
||
| ## 0. Why this doc exists | ||
|
|
||
| The `EnableStorageEnvelope` cutover RPC (6D-6a) runs the §4 capability | ||
| fan-out before proposing the cutover — but only when a | ||
| `CapabilityFanoutFn` closure is wired via | ||
| `WithEncryptionAdminCapabilityFanout`. `main.go` does **not** wire it | ||
| today, so `s.capabilityFanout == nil` and the cutover RPC refuses with | ||
| the §4 "fan-out not wired" `FailedPrecondition`. 6D-6c-3a builds and | ||
| wires that closure; 6D-6c-3b adds the end-to-end test on top. | ||
|
|
||
| The fan-out helper (`admin.CapabilityFanout(ctx, routes, dial, | ||
| timeout)`) and the RPC option already exist and are tested. This slice | ||
| is pure wiring of those existing pieces into `main.go` — no new RPC, no | ||
| wire-format change. | ||
|
|
||
| ## 1. Scope (6D-6c-3a) | ||
|
|
||
| - `buildCapabilityFanoutFn(runtimes, connCache, timeout) | ||
| adapter.CapabilityFanoutFn` in a new `main_encryption_fanout.go`: | ||
| - **Snapshot builder** — for every runtime, call | ||
| `rt.engine.Configuration(ctx)` and map each `raftengine.Server` | ||
| to an `admin.RouteMember{FullNodeID: etcd.DeriveNodeID(srv.ID), | ||
| Address: srv.Address}`, splitting on `srv.Suffrage` | ||
| (`SuffrageLearner` → Learners, else → Voters, so empty/unannotated | ||
| suffrage counts as a voter per raftengine's convention) into one | ||
| `admin.RouteGroup{GroupID: rt.spec.id}`. The §4.1 contract is | ||
| "every (voter ∪ learner) of **every** Raft group", so the | ||
| snapshot spans **all** runtimes, not just the cutover RPC's group. | ||
| - **DialFunc** — `connCache.ConnFor(addr)` → `pb.NewEncryptionAdminClient(conn)`, | ||
| with a **no-op cleanup** (the cache owns the conn lifecycle and | ||
| reuses it across probes; closing per-probe would defeat pooling). | ||
| - The closure runs `admin.CapabilityFanout(ctx, snapshot, dial, | ||
| timeout)`; a snapshot-build error (any `engine.Configuration` | ||
| failure) is returned as the closure error so the cutover RPC | ||
| fails closed (§4 maps it to a refusal). | ||
| - A **dedicated** `kv.GRPCConnCache` for the fan-out, created in | ||
| `startRaftServers` and closed via the cleanup stack. Not the | ||
| admin-forward cache: that one is gated on `--adminEnabled`, but the | ||
| cutover fan-out must dial whenever encryption mutators are enabled, | ||
| independent of the admin HTTP surface. `kv.GRPCConnCache.ConnFor` | ||
| already uses the shared `internalutil.GRPCDialOptions()` so the | ||
| fan-out dials with the same transport posture as every other | ||
| intra-cluster gRPC client. | ||
| - Wire the closure into `registerEncryptionAdminServer` via | ||
| `WithEncryptionAdminCapabilityFanout`, gated on the same | ||
| `enableMutators` boolean that gates the Proposer/LeaderView (the | ||
| fan-out is only meaningful when the cutover mutator is reachable). | ||
| - Fan-out timeout: a `const` in `main.go` (start at 5s — generous for | ||
| a small cluster GetCapability round-trip; the helper bounds the | ||
| whole fan-out by it regardless of member count). | ||
|
|
||
| ### Out of scope (6D-6c-3b) | ||
|
|
||
| The end-to-end integration test (single-node cluster: Bootstrap → | ||
| EnableStorageEnvelope → Put → read-back-via-envelope) lands in 3b on | ||
| top of this wiring. | ||
|
|
||
| ## 2. Fail-closed posture | ||
|
|
||
| - **Snapshot build error** (`engine.Configuration` fails on any | ||
| group) → closure returns the error → cutover RPC refuses. Never | ||
| propose a cutover against a membership view we could not fully | ||
| enumerate. | ||
| - **Unreachable / not-capable member** → handled inside | ||
| `CapabilityFanout` (verdict `Reachable=false` / `EncryptionCapable=false` | ||
| → `OK=false`) → cutover refuses. No partial success (§4.3). | ||
| - **Closure not wired** (encryption mutators disabled) → unchanged | ||
| existing behavior: `s.capabilityFanout == nil` → cutover refuses | ||
| with the §4 "not wired" `FailedPrecondition`. | ||
|
|
||
| ## 3. Why a dedicated conn cache (not the admin-forward one) | ||
|
|
||
| The admin-forward `connCache` is constructed only when `--adminEnabled` | ||
| (it backs the follower→leader admin write forwarder). The cutover | ||
| capability fan-out is orthogonal: it must dial every member's | ||
| `EncryptionAdmin` endpoint whenever the operator has enabled | ||
| encryption mutators, regardless of whether the admin HTTP API is | ||
| served. Coupling the two would make the fan-out silently inert on a | ||
| `--encryption-enabled` cluster that left `--adminEnabled` off. A | ||
| separate cache keeps the lifecycles independent and is cheap (one | ||
| idle `*grpc.ClientConn` per peer, closed on shutdown). | ||
|
|
||
| ## 4. Self-review checklist (for the implementation PR) | ||
|
|
||
| - **Data loss / consistency** — read-only control-plane wiring; issues | ||
| no writes and changes no apply path. The cutover it gates is | ||
| unchanged (6D-6a). | ||
| - **Concurrency** — `CapabilityFanout` is already concurrent + timeout | ||
| bounded; the snapshot builder runs per-RPC, single-shot; the conn | ||
| cache is already concurrency-safe. | ||
| - **Performance** — fan-out runs once per cutover RPC (a rare operator | ||
| action), not on the data path. | ||
| - **Test coverage** — unit tests for the snapshot builder (Server → | ||
| RouteMember mapping, voter/learner split, multi-group, Configuration | ||
| error → closure error) and the DialFunc (cache reuse, nil-conn | ||
| guard). The full e2e is 3b. | ||
|
|
||
| ## 5. Open questions | ||
|
|
||
| 1. Fan-out timeout value — 5s proposed; revisit if large clusters need | ||
| more headroom (the helper bounds the whole fan-out, not per-probe). | ||
| 2. Should learners that are mid-snapshot-catchup be probed? Yes — the | ||
| §4.1 contract is unconditional (voter ∪ learner); an unreachable | ||
| learner is a hard refusal, matching the parent design §8. | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,150 @@ | ||||||||||||||||||||||||||||||
| package main | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| "github.com/bootjp/elastickv/adapter" | ||||||||||||||||||||||||||||||
| "github.com/bootjp/elastickv/internal/admin" | ||||||||||||||||||||||||||||||
| "github.com/bootjp/elastickv/internal/raftengine" | ||||||||||||||||||||||||||||||
| etcdraftengine "github.com/bootjp/elastickv/internal/raftengine/etcd" | ||||||||||||||||||||||||||||||
| "github.com/bootjp/elastickv/kv" | ||||||||||||||||||||||||||||||
| pb "github.com/bootjp/elastickv/proto" | ||||||||||||||||||||||||||||||
| "github.com/cockroachdb/errors" | ||||||||||||||||||||||||||||||
| "golang.org/x/sync/errgroup" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // capabilityFanoutTimeout bounds the whole §4 GetCapability fan-out the | ||||||||||||||||||||||||||||||
| // EnableStorageEnvelope cutover runs before proposing. The helper | ||||||||||||||||||||||||||||||
| // caps the entire fan-out (not per-probe) by this value regardless of | ||||||||||||||||||||||||||||||
| // member count, so a slow/unreachable peer cannot stall the operator's | ||||||||||||||||||||||||||||||
| // cutover RPC indefinitely — it surfaces as a Reachable=false verdict | ||||||||||||||||||||||||||||||
| // and the cutover fails closed. 5s is generous for a small-cluster | ||||||||||||||||||||||||||||||
| // GetCapability round-trip. | ||||||||||||||||||||||||||||||
| const capabilityFanoutTimeout = 5 * time.Second | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // buildEncryptionCapabilityFanout builds the §4 capability fan-out | ||||||||||||||||||||||||||||||
| // closure shared across every shard's EncryptionAdmin server, or nil | ||||||||||||||||||||||||||||||
| // when encryption mutators are disabled (the cutover RPC is then | ||||||||||||||||||||||||||||||
| // unreachable, so the fan-out would never run). It owns a dedicated | ||||||||||||||||||||||||||||||
| // kv.GRPCConnCache — NOT the --adminEnabled-gated admin-forward cache — | ||||||||||||||||||||||||||||||
| // so the fan-out dials whenever encryption mutators are enabled, | ||||||||||||||||||||||||||||||
| // independent of the admin HTTP surface; the cache is drained on | ||||||||||||||||||||||||||||||
| // context cancellation via eg. | ||||||||||||||||||||||||||||||
| func buildEncryptionCapabilityFanout(ctx context.Context, eg *errgroup.Group, runtimes []*raftGroupRuntime, enableMutators bool) adapter.CapabilityFanoutFn { | ||||||||||||||||||||||||||||||
| if !enableMutators { | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| fanoutConnCache := &kv.GRPCConnCache{} | ||||||||||||||||||||||||||||||
| eg.Go(func() error { | ||||||||||||||||||||||||||||||
| <-ctx.Done() | ||||||||||||||||||||||||||||||
| if err := fanoutConnCache.Close(); err != nil { | ||||||||||||||||||||||||||||||
| return errors.Wrap(err, "close encryption capability fan-out gRPC connection cache") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
| return buildCapabilityFanoutFn(runtimes, fanoutConnCache, capabilityFanoutTimeout) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // buildCapabilityFanoutFn assembles the adapter.CapabilityFanoutFn the | ||||||||||||||||||||||||||||||
| // EnableStorageEnvelope server invokes for its §4 pre-flight check. The | ||||||||||||||||||||||||||||||
| // returned closure snapshots the live membership of every Raft group | ||||||||||||||||||||||||||||||
| // (the §4.1 "voter ∪ learner of every group" contract) and fans | ||||||||||||||||||||||||||||||
| // GetCapability out to each unique node. | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // dial reuses connCache so repeated cutover attempts share one | ||||||||||||||||||||||||||||||
| // *grpc.ClientConn per peer; the cleanup closure is a no-op because the | ||||||||||||||||||||||||||||||
| // cache owns the connection lifecycle (closed on shutdown by the | ||||||||||||||||||||||||||||||
| // caller, not per-probe). A snapshot-build error fails the closure so | ||||||||||||||||||||||||||||||
| // the cutover refuses rather than proposing against a membership view | ||||||||||||||||||||||||||||||
| // it could not fully enumerate (fail-closed, §4.3 no-partial-success). | ||||||||||||||||||||||||||||||
| func buildCapabilityFanoutFn(runtimes []*raftGroupRuntime, connCache *kv.GRPCConnCache, timeout time.Duration) adapter.CapabilityFanoutFn { | ||||||||||||||||||||||||||||||
| dial := func(_ context.Context, address string) (pb.EncryptionAdminClient, func(), error) { | ||||||||||||||||||||||||||||||
| conn, err := connCache.ConnFor(address) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return nil, nil, errors.Wrapf(err, "capability fan-out: dial %s", address) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return pb.NewEncryptionAdminClient(conn), func() {}, nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+62
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
References
|
||||||||||||||||||||||||||||||
| return func(ctx context.Context) (admin.CapabilityFanoutResult, error) { | ||||||||||||||||||||||||||||||
| snapshot, err := capabilityRouteSnapshot(ctx, runtimes) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return admin.CapabilityFanoutResult{}, err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return admin.CapabilityFanout(ctx, snapshot, dial, timeout) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // configReader is the narrow membership-view surface | ||||||||||||||||||||||||||||||
| // capabilityRouteSnapshot needs from a Raft engine. raftengine.Engine | ||||||||||||||||||||||||||||||
| // satisfies it structurally; defining it here lets the snapshot | ||||||||||||||||||||||||||||||
| // builder be unit-tested with a stub that returns a Configuration | ||||||||||||||||||||||||||||||
| // error (the fail-closed path) without standing up a full engine. | ||||||||||||||||||||||||||||||
| type configReader interface { | ||||||||||||||||||||||||||||||
| Configuration(ctx context.Context) (raftengine.Configuration, error) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // groupConfigSource pairs a group id with its membership reader. | ||||||||||||||||||||||||||||||
| type groupConfigSource struct { | ||||||||||||||||||||||||||||||
| groupID uint64 | ||||||||||||||||||||||||||||||
| engine configReader | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // capabilityRouteSnapshot builds the all-groups admin.RouteSnapshot | ||||||||||||||||||||||||||||||
| // from each runtime's live Raft Configuration. It reads each engine | ||||||||||||||||||||||||||||||
| // via snapshotEngine() (engineMu.RLock) because the fan-out closure | ||||||||||||||||||||||||||||||
| // runs during live EnableStorageEnvelope RPCs, concurrently with a | ||||||||||||||||||||||||||||||
| // possible Close() that clears the engine field — a direct field read | ||||||||||||||||||||||||||||||
| // would be a data race. A nil runtime/engine is skipped. | ||||||||||||||||||||||||||||||
| func capabilityRouteSnapshot(ctx context.Context, runtimes []*raftGroupRuntime) (admin.RouteSnapshot, error) { | ||||||||||||||||||||||||||||||
| sources := make([]groupConfigSource, 0, len(runtimes)) | ||||||||||||||||||||||||||||||
| for _, rt := range runtimes { | ||||||||||||||||||||||||||||||
| if rt == nil { | ||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| eng := rt.snapshotEngine() | ||||||||||||||||||||||||||||||
| if eng == nil { | ||||||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| sources = append(sources, groupConfigSource{groupID: rt.spec.id, engine: eng}) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return routeSnapshotFromSources(ctx, sources) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // routeSnapshotFromSources maps each group's Configuration into the | ||||||||||||||||||||||||||||||
| // admin.RouteSnapshot. Any Configuration error aborts the whole | ||||||||||||||||||||||||||||||
| // snapshot so the caller fails closed — the cutover must never | ||||||||||||||||||||||||||||||
| // proceed against a membership view it could not fully enumerate. | ||||||||||||||||||||||||||||||
| func routeSnapshotFromSources(ctx context.Context, sources []groupConfigSource) (admin.RouteSnapshot, error) { | ||||||||||||||||||||||||||||||
| groups := make([]admin.RouteGroup, 0, len(sources)) | ||||||||||||||||||||||||||||||
| for _, s := range sources { | ||||||||||||||||||||||||||||||
| cfg, err := s.engine.Configuration(ctx) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return admin.RouteSnapshot{}, errors.Wrapf(err, "capability fan-out: configuration for group %d", s.groupID) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| groups = append(groups, routeGroupFromConfiguration(s.groupID, cfg)) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return admin.RouteSnapshot{Groups: groups}, nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // routeGroupFromConfiguration maps one Raft group's live Configuration | ||||||||||||||||||||||||||||||
| // to an admin.RouteGroup: each server becomes a RouteMember keyed by | ||||||||||||||||||||||||||||||
| // etcd.DeriveNodeID(srv.ID) (the node identity the rest of the | ||||||||||||||||||||||||||||||
| // encryption stack uses), split into Voters / Learners on Suffrage. | ||||||||||||||||||||||||||||||
| // Empty suffrage counts as voter, matching raftengine's convention | ||||||||||||||||||||||||||||||
| // that an unannotated peer (e.g. mid-WAL-replay) is a voter. | ||||||||||||||||||||||||||||||
| func routeGroupFromConfiguration(groupID uint64, cfg raftengine.Configuration) admin.RouteGroup { | ||||||||||||||||||||||||||||||
| group := admin.RouteGroup{GroupID: groupID} | ||||||||||||||||||||||||||||||
| for _, srv := range cfg.Servers { | ||||||||||||||||||||||||||||||
| member := admin.RouteMember{ | ||||||||||||||||||||||||||||||
| FullNodeID: etcdraftengine.DeriveNodeID(srv.ID), | ||||||||||||||||||||||||||||||
| Address: srv.Address, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| if srv.Suffrage == etcdraftengine.SuffrageLearner { | ||||||||||||||||||||||||||||||
| group.Learners = append(group.Learners, member) | ||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||
| group.Voters = append(group.Voters, member) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return group | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.