Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions app/controlplane/pkg/biz/workflowrun_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,89 @@ func (s *workflowRunIntegrationTestSuite) TestList() {
}
}

func (s *workflowRunIntegrationTestSuite) TestListExcludesSoftDeletedWorkflows() {
ctx := context.Background()

// Create a fresh workflow + run, soft-delete the workflow, and make sure
// the run is excluded from List. Regression guard for the case where
// org-scoped reads use the denormalized organization_id column on
// workflow_runs and could otherwise leak runs from deleted workflows.
wf, err := s.Workflow.Create(ctx, &biz.WorkflowCreateOpts{
Name: "to-be-deleted", OrgID: s.org2.ID, Project: "test-project",
})
s.Require().NoError(err)

_, err = s.WorkflowRun.Create(ctx, &biz.WorkflowRunCreateOpts{
WorkflowID: wf.ID.String(), ContractRevision: s.contractVersion, CASBackendID: s.casBackend.ID,
})
s.Require().NoError(err)

s.Require().NoError(s.Workflow.Delete(ctx, s.org2.ID, wf.ID.String()))

got, _, err := s.WorkflowRun.List(ctx, s.org2.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
s.Require().NoError(err)
for _, r := range got {
s.NotEqual(wf.ID, r.Workflow.ID, "run from soft-deleted workflow leaked into List")
}
}

func (s *workflowRunIntegrationTestSuite) TestListIsolatedByOrg() {
ctx := context.Background()

// org1 has runOrg1; org2 has runOrg2 + runOrg2Public (see setupWorkflowRunTestData).
// Regression guard for the org-scoping switch from the workflows edge to
// the denormalized organization_id column on workflow_runs.
got, _, err := s.WorkflowRun.List(ctx, s.org.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
s.Require().NoError(err)
gotIDs := make([]uuid.UUID, 0, len(got))
for _, r := range got {
gotIDs = append(gotIDs, r.ID)
}
s.ElementsMatch([]uuid.UUID{s.runOrg1.ID}, gotIDs, "org1 List leaked runs from another org")

got, _, err = s.WorkflowRun.List(ctx, s.org2.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
s.Require().NoError(err)
gotIDs = gotIDs[:0]
for _, r := range got {
gotIDs = append(gotIDs, r.ID)
}
s.ElementsMatch([]uuid.UUID{s.runOrg2.ID, s.runOrg2Public.ID}, gotIDs, "org2 List did not return its own runs")
}

func (s *workflowRunIntegrationTestSuite) TestListFilterByProjectIDs() {
ctx := context.Background()

// Create a second workflow in a different project in org2 and a run for it.
// Filtering by the original project's ID should exclude this run.
otherProjectWF, err := s.Workflow.Create(ctx, &biz.WorkflowCreateOpts{
Name: "wf-other-project", OrgID: s.org2.ID, Project: "other-project",
})
s.Require().NoError(err)

otherProjectRun, err := s.WorkflowRun.Create(ctx, &biz.WorkflowRunCreateOpts{
WorkflowID: otherProjectWF.ID.String(), ContractRevision: s.contractVersion, CASBackendID: s.casBackend.ID,
})
s.Require().NoError(err)

// With no project filter, all org2 runs (including the new one) are returned.
allRuns, _, err := s.WorkflowRun.List(ctx, s.org2.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
s.Require().NoError(err)
allIDs := make([]uuid.UUID, 0, len(allRuns))
for _, r := range allRuns {
allIDs = append(allIDs, r.ID)
}
s.Contains(allIDs, otherProjectRun.ID)

// Filtering by the original project's ID excludes runs from other projects.
filtered, _, err := s.WorkflowRun.List(ctx, s.org2.ID,
&biz.RunListFilters{ProjectIDs: []uuid.UUID{s.workflowOrg2.ProjectID}},
&pagination.CursorOptions{Limit: 10})
s.Require().NoError(err)
for _, r := range filtered {
s.NotEqual(otherProjectRun.ID, r.ID, "run from non-selected project leaked into List")
}
}

func (s *workflowRunIntegrationTestSuite) TestSaveAttestation() {
assert := assert.New(s.T())
ctx := context.Background()
Expand Down
32 changes: 32 additions & 0 deletions app/controlplane/pkg/data/ent/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-- atlas:txmode none

-- Denormalize organization_id onto workflow_runs so org-scoped list/aggregate
-- queries become sargable without joining workflows.
--
-- Why a trigger?
--
-- The control plane deploys as a multi-replica Deployment with rolling
-- updates. When this migration runs (in the initContainer of a new pod),
-- old pods are still serving traffic with code that does NOT set
-- organization_id on INSERT. The moment step 6 below enforces NOT NULL,
-- every INSERT from those old pods would fail with a constraint violation
-- until the rolling update replaces them — a window of seconds to minutes
-- in which workflow run creation is broken org-wide.
--
-- The BEFORE INSERT trigger below bridges that window: whenever a writer
-- doesn't set organization_id, the trigger fills it from the parent
-- workflow via a single PK lookup (~0.1ms). New code paths set the column
-- explicitly so the trigger's IF check short-circuits; the trigger only
-- does real work for inserts coming from the old replicas. Once every
-- replica runs the new code, the trigger is dead weight — a follow-up
-- release will drop both the trigger and its function.

-- 1. Nullable add (catalog-only, instant).
ALTER TABLE "workflow_runs" ADD COLUMN "organization_id" uuid;

-- 2. FK NOT VALID (no scan, brief AccessExclusive lock).
ALTER TABLE "workflow_runs"
ADD CONSTRAINT "workflow_runs_organizations_workflowruns"
FOREIGN KEY ("organization_id") REFERENCES "organizations" ("id")
ON UPDATE NO ACTION ON DELETE CASCADE NOT VALID;

-- 3. Trigger function: fills organization_id from the parent workflow when
-- the caller didn't set it. Removed by a follow-up migration in the next
-- release once all replicas set the column explicitly.
CREATE OR REPLACE FUNCTION fill_workflow_run_organization_id() RETURNS trigger AS $$
BEGIN
IF NEW.organization_id IS NULL THEN
SELECT organization_id INTO NEW.organization_id
FROM "workflows" WHERE id = NEW.workflow_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER workflow_runs_fill_organization_id
BEFORE INSERT ON "workflow_runs"
FOR EACH ROW EXECUTE FUNCTION fill_workflow_run_organization_id();

-- 4. Batched backfill. Concurrent inserts from old replicas are protected by
-- the trigger above, so they can't introduce new NULL rows mid-loop.
-- One COMMIT per batch keeps the longest row-lock window in the millisecond
-- range and avoids one giant WAL entry.
DO $$
DECLARE
rows_done INT;
BEGIN
LOOP
WITH batch AS (
SELECT wr.id, w.organization_id
FROM "workflow_runs" wr
JOIN "workflows" w ON wr.workflow_id = w.id
WHERE wr.organization_id IS NULL
LIMIT 5000
)
UPDATE "workflow_runs" wr
SET organization_id = b.organization_id
FROM batch b
WHERE wr.id = b.id;
GET DIAGNOSTICS rows_done = ROW_COUNT;
COMMIT;
EXIT WHEN rows_done = 0;
END LOOP;
END $$;

-- 5. Validate the FK now that data is consistent. SHARE UPDATE EXCLUSIVE
-- permits concurrent DML.
ALTER TABLE "workflow_runs" VALIDATE CONSTRAINT "workflow_runs_organizations_workflowruns";

-- 6. Enforce NOT NULL. In PG 12+ this is a verify-only scan (no rewrite).
-- Safe because the trigger guarantees no concurrent NULL inserts.
ALTER TABLE "workflow_runs" ALTER COLUMN "organization_id" SET NOT NULL;

-- 7. Create the org-scoped list index without blocking writes.
CREATE INDEX CONCURRENTLY "workflowrun_organization_id_created_at"
ON "workflow_runs" ("organization_id", "created_at" DESC);
3 changes: 2 additions & 1 deletion app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:Ob1pZMVVtMju/FJnsU/Li8Fg2DjcXUxzYDuaBWk9vp0=
h1:OSH+lqOh2mE49KklM6mUMDQjrL1N2nGHdz9aERNstTM=
20230706165452_init-schema.sql h1:VvqbNFEQnCvUVyj2iDYVQQxDM0+sSXqocpt/5H64k8M=
20230710111950-cas-backend.sql h1:A8iBuSzZIEbdsv9ipBtscZQuaBp3V5/VMw7eZH6GX+g=
20230712094107-cas-backends-workflow-runs.sql h1:a5rzxpVGyd56nLRSsKrmCFc9sebg65RWzLghKHh5xvI=
Expand Down Expand Up @@ -134,3 +134,4 @@ h1:Ob1pZMVVtMju/FJnsU/Li8Fg2DjcXUxzYDuaBWk9vp0=
20260504100323.sql h1:FP8y59ZXFUsyskbIfl/1nE7vo4OJcOPuALy3pAJaStQ=
20260511202105.sql h1:Tw9OkiWm7cT4p2pNklSUGM9DzKS38uUuYjXl8BdIwnQ=
20260514150303.sql h1:0bGVXYN5rBP9Hn9x/ou8JgKotKiFbSKWGHX2dBH/wCA=
20260516210119.sql h1:rfBnXQwPnrhVYAp/OIiFPGcS+Tx1x9CAMSDPGs8HIT8=
38 changes: 28 additions & 10 deletions app/controlplane/pkg/data/ent/migrate/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ var (
{Name: "policy_violations_count", Type: field.TypeInt32, Nullable: true},
{Name: "policy_violations_suppressed", Type: field.TypeInt32, Nullable: true},
{Name: "policy_has_gates", Type: field.TypeBool, Nullable: true},
{Name: "organization_id", Type: field.TypeUUID},
{Name: "version_id", Type: field.TypeUUID},
{Name: "workflow_id", Type: field.TypeUUID},
{Name: "workflow_run_contract_version", Type: field.TypeUUID, Nullable: true},
Expand All @@ -793,20 +794,26 @@ var (
PrimaryKey: []*schema.Column{WorkflowRunsColumns[0]},
ForeignKeys: []*schema.ForeignKey{
{
Symbol: "workflow_runs_project_versions_runs",
Symbol: "workflow_runs_organizations_workflowruns",
Columns: []*schema.Column{WorkflowRunsColumns[20]},
RefColumns: []*schema.Column{OrganizationsColumns[0]},
OnDelete: schema.Cascade,
},
{
Symbol: "workflow_runs_project_versions_runs",
Columns: []*schema.Column{WorkflowRunsColumns[21]},
RefColumns: []*schema.Column{ProjectVersionsColumns[0]},
OnDelete: schema.NoAction,
},
{
Symbol: "workflow_runs_workflows_workflowruns",
Columns: []*schema.Column{WorkflowRunsColumns[21]},
Columns: []*schema.Column{WorkflowRunsColumns[22]},
RefColumns: []*schema.Column{WorkflowsColumns[0]},
OnDelete: schema.Cascade,
},
{
Symbol: "workflow_runs_workflow_contract_versions_contract_version",
Columns: []*schema.Column{WorkflowRunsColumns[22]},
Columns: []*schema.Column{WorkflowRunsColumns[23]},
RefColumns: []*schema.Column{WorkflowContractVersionsColumns[0]},
OnDelete: schema.Cascade,
},
Expand All @@ -825,7 +832,7 @@ var (
{
Name: "workflowrun_workflow_id_created_at",
Unique: false,
Columns: []*schema.Column{WorkflowRunsColumns[21], WorkflowRunsColumns[1]},
Columns: []*schema.Column{WorkflowRunsColumns[22], WorkflowRunsColumns[1]},
Annotation: &entsql.IndexAnnotation{
DescColumns: map[string]bool{
WorkflowRunsColumns[1].Name: true,
Expand All @@ -835,7 +842,17 @@ var (
{
Name: "workflowrun_workflow_id_state_created_at",
Unique: false,
Columns: []*schema.Column{WorkflowRunsColumns[21], WorkflowRunsColumns[3], WorkflowRunsColumns[1]},
Columns: []*schema.Column{WorkflowRunsColumns[22], WorkflowRunsColumns[3], WorkflowRunsColumns[1]},
Annotation: &entsql.IndexAnnotation{
DescColumns: map[string]bool{
WorkflowRunsColumns[1].Name: true,
},
},
},
{
Name: "workflowrun_organization_id_created_at",
Unique: false,
Columns: []*schema.Column{WorkflowRunsColumns[20], WorkflowRunsColumns[1]},
Annotation: &entsql.IndexAnnotation{
DescColumns: map[string]bool{
WorkflowRunsColumns[1].Name: true,
Expand All @@ -860,12 +877,12 @@ var (
{
Name: "workflowrun_workflow_id",
Unique: false,
Columns: []*schema.Column{WorkflowRunsColumns[21]},
Columns: []*schema.Column{WorkflowRunsColumns[22]},
},
{
Name: "workflowrun_version_id_workflow_id",
Unique: false,
Columns: []*schema.Column{WorkflowRunsColumns[20], WorkflowRunsColumns[21]},
Columns: []*schema.Column{WorkflowRunsColumns[21], WorkflowRunsColumns[22]},
},
{
Name: "workflowrun_policy_status",
Expand Down Expand Up @@ -1014,9 +1031,10 @@ func init() {
WorkflowsTable.ForeignKeys[3].RefTable = WorkflowRunsTable
WorkflowContractsTable.ForeignKeys[0].RefTable = OrganizationsTable
WorkflowContractVersionsTable.ForeignKeys[0].RefTable = WorkflowContractsTable
WorkflowRunsTable.ForeignKeys[0].RefTable = ProjectVersionsTable
WorkflowRunsTable.ForeignKeys[1].RefTable = WorkflowsTable
WorkflowRunsTable.ForeignKeys[2].RefTable = WorkflowContractVersionsTable
WorkflowRunsTable.ForeignKeys[0].RefTable = OrganizationsTable
WorkflowRunsTable.ForeignKeys[1].RefTable = ProjectVersionsTable
WorkflowRunsTable.ForeignKeys[2].RefTable = WorkflowsTable
WorkflowRunsTable.ForeignKeys[3].RefTable = WorkflowContractVersionsTable
ReferrerReferencesTable.ForeignKeys[0].RefTable = ReferrersTable
ReferrerReferencesTable.ForeignKeys[1].RefTable = ReferrersTable
ReferrerWorkflowsTable.ForeignKeys[0].RefTable = ReferrersTable
Expand Down
Loading
Loading