From f4add3e4c1a7a468b684ac31b8fff6e88db82f82 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 27 May 2026 14:58:58 -0400 Subject: [PATCH 1/4] Configurable calcite transpose rule in PROJECT_REWRITE phase --- .../api/config/OptimizerConfigOptions.java | 49 +++++++++++++++++++ .../optimize/program/FlinkBatchProgram.scala | 37 +++++++++++++- .../optimize/program/FlinkStreamProgram.scala | 39 ++++++++++++++- .../plan/rules/FlinkBatchRuleSets.scala | 14 +++++- .../plan/rules/FlinkStreamRuleSets.scala | 14 +++++- 5 files changed, 147 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index a02915f1002939..6c43aa0f196c5d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -400,6 +400,23 @@ public class OptimizerConfigOptions { .withDescription( "Strategy for optimizing the delta-join. Only AUTO, FORCE or NONE can be set. Default it AUTO."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption + TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE = + key("table.optimizer.project-filter-transpose-rule") + .enumType(ProjectFilterTransposeRule.class) + .defaultValue(ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE) + .withDescription( + "Selects which Calcite ProjectFilterTransposeRule variant the optimizer uses " + + "when pushing a Project past a Filter. " + + "PROJECT_FILTER_TRANSPOSE (default) splits Project expressions so only " + + "the columns referenced by the Filter remain below it. " + + "PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS preserves each top-level " + + "Project expression as a whole when pushing past the Filter. " + + "PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS treats the Project " + + "as atomic and only pushes it when the entire Project can move below " + + "the Filter - Required for nested projection pushdown + filters."); + /** Strategy for handling non-deterministic updates. */ @PublicEvolving public enum NonDeterministicUpdateStrategy { @@ -496,4 +513,36 @@ public InlineElement getDescription() { return description; } } + + /** + * Variant of Calcite's ProjectFilterTransposeRule used when pushing a Project past a Filter. + */ + @PublicEvolving + public enum ProjectFilterTransposeRule implements DescribedEnum { + PROJECT_FILTER_TRANSPOSE( + text( + "Default variant. Pushes a Project past a Filter, splitting expressions " + + "so that only the columns referenced by the Filter remain below it.")), + PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS( + text( + "Pushes the Project past the Filter without splitting expressions. " + + "Each top-level Project expression is preserved as a whole and " + + "either pushed below the Filter or kept above it.")), + PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS( + text( + "Pushes the Project past the Filter only when the Project as a whole can be " + + "moved below the Filter; the Project is treated as atomic and is " + + "never split.")); + + private final InlineElement description; + + ProjectFilterTransposeRule(InlineElement description) { + this.description = description; + } + + @Override + public InlineElement getDescription() { + return description; + } + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala index cc46f3aeedc562..a03a13f0183a05 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala @@ -22,7 +22,12 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.rules.FlinkBatchRuleSets +import org.apache.calcite.plan.RelOptRule import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.rules.CoreRules +import org.apache.calcite.tools.{RuleSet, RuleSets} + +import scala.collection.JavaConverters._ /** Defines a sequence of programs to optimize flink batch table plan. */ object FlinkBatchProgram { @@ -45,6 +50,7 @@ object FlinkBatchProgram { def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = { val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]() + val projectFilterTransposeRule = pickProjectFilterTransposeRule(tableConfig) chainedProgram.addLast( // rewrite sub-queries to joins @@ -240,16 +246,26 @@ object FlinkBatchProgram { ) // window rewrite + // PROJECT_RULES carries CoreRules.PROJECT_FILTER_TRANSPOSE as the safe default for Volcano; + // here in HEP we swap it for the configured variant. chainedProgram.addLast( PROJECT_REWRITE, FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) - .add(FlinkBatchRuleSets.PROJECT_RULES) + .add( + substituteRule( + FlinkBatchRuleSets.PROJECT_RULES, + CoreRules.PROJECT_FILTER_TRANSPOSE, + projectFilterTransposeRule)) .build() ) // optimize the logical plan + // Note: the project-filter-transpose variant is intentionally NOT appended to the Volcano + // LOGICAL phase. The HEP PROJECT_REWRITE phase above already pushes the transpose down, and + // adding WHOLE_EXPRESSIONS here oscillates with FlinkFilterProjectTransposeRule in + // FILTER_RULES (no bloat protection), expanding the MEMO indefinitely. chainedProgram.addLast( LOGICAL, FlinkVolcanoProgramBuilder.newBuilder @@ -298,4 +314,23 @@ object FlinkBatchProgram { chainedProgram } + + private def pickProjectFilterTransposeRule(tableConfig: ReadableConfig): RelOptRule = { + tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE) match { + case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE => + CoreRules.PROJECT_FILTER_TRANSPOSE + case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS => + CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS + case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS => + CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS + } + } + + private def substituteRule(base: RuleSet, oldRule: RelOptRule, newRule: RelOptRule): RuleSet = { + if (oldRule eq newRule) { + base + } else { + RuleSets.ofList((base.asScala.filterNot(_ eq oldRule) ++ Seq(newRule)).asJava) + } + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala index 8ec32c242fa48a..2947ce7fea65d3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala @@ -24,7 +24,12 @@ import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets import org.apache.flink.table.planner.plan.rules.logical.EventTimeTemporalJoinRewriteRule import org.apache.flink.table.planner.plan.rules.physical.stream.{FlinkDuplicateChangesTraitInitProgram, FlinkMarkChangelogNormalizeProgram} +import org.apache.calcite.plan.RelOptRule import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.rules.CoreRules +import org.apache.calcite.tools.{RuleSet, RuleSets} + +import scala.collection.JavaConverters._ /** Defines a sequence of programs to optimize for stream table plan. */ object FlinkStreamProgram { @@ -45,6 +50,7 @@ object FlinkStreamProgram { def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[StreamOptimizeContext] = { val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]() + val projectFilterTransposeRule = pickProjectFilterTransposeRule(tableConfig) // rewrite sub-queries to joins chainedProgram.addLast( @@ -248,17 +254,29 @@ object FlinkStreamProgram { .build() ) + // TODO this pushing directly into rule set after the fact feels janky and not ideal + // find better way to do this // project rewrite + // PROJECT_RULES carries CoreRules.PROJECT_FILTER_TRANSPOSE as the safe default for Volcano; + // here in HEP we swap it for the configured variant. chainedProgram.addLast( PROJECT_REWRITE, FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) - .add(FlinkStreamRuleSets.PROJECT_RULES) + .add( + substituteRule( + FlinkStreamRuleSets.PROJECT_RULES, + CoreRules.PROJECT_FILTER_TRANSPOSE, + projectFilterTransposeRule)) .build() ) // optimize the logical plan + // Note: the project-filter-transpose variant is intentionally NOT appended to the Volcano + // LOGICAL phase. The HEP PROJECT_REWRITE phase above already pushes the transpose down, and + // adding WHOLE_EXPRESSIONS here oscillates with FlinkFilterProjectTransposeRule in + // FILTER_RULES (no bloat protection), expanding the MEMO indefinitely. chainedProgram.addLast( LOGICAL, FlinkVolcanoProgramBuilder.newBuilder @@ -360,4 +378,23 @@ object FlinkStreamProgram { chainedProgram } + + private def pickProjectFilterTransposeRule(tableConfig: ReadableConfig): RelOptRule = { + tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE) match { + case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE => + CoreRules.PROJECT_FILTER_TRANSPOSE + case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS => + CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS + case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS => + CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS + } + } + + private def substituteRule(base: RuleSet, oldRule: RelOptRule, newRule: RelOptRule): RuleSet = { + if (oldRule eq newRule) { + base + } else { + RuleSets.ofList((base.asScala.filterNot(_ eq oldRule) ++ Seq(newRule)).asJava) + } + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 7ff8a16a70e4a6..2eea27b4cd4d24 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -204,9 +204,19 @@ object FlinkBatchRuleSets { CoreRules.AGGREGATE_VALUES ) - /** RuleSet about project */ + /** + * RuleSet about project. + * + *

Note: [[CoreRules.PROJECT_FILTER_TRANSPOSE]] is the safe default kept here so it propagates + * into [[LOGICAL_OPT_RULES]] for the Volcano LOGICAL phase. In the HEP `PROJECT_REWRITE` phase, + * [[FlinkBatchProgram.buildProgram]] substitutes the configured variant from + * [[OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE]]. The aggressive + * `WHOLE_EXPRESSIONS` variant must NOT reach Volcano — it oscillates with + * [[FlinkFilterProjectTransposeRule]] when bloat protection is bypassed. See + * PROJECT_FILTER_TRANSPOSE_HANG.md. + */ val PROJECT_RULES: RuleSet = RuleSets.ofList( - // push a projection past a filter + // push a projection past a filter (safe default; HEP swaps in the configured variant) CoreRules.PROJECT_FILTER_TRANSPOSE, // push a projection to the children of a non semi/anti join // push all expressions to handle the time indicator correctly diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 05de12a6950a98..86acba2156ad4f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -202,9 +202,19 @@ object FlinkStreamRuleSets { CoreRules.AGGREGATE_VALUES ) - /** RuleSet about project */ + /** + * RuleSet about project. + * + *

Note: [[CoreRules.PROJECT_FILTER_TRANSPOSE]] is the safe default kept here so it propagates + * into [[LOGICAL_OPT_RULES]] for the Volcano LOGICAL phase. In the HEP `PROJECT_REWRITE` phase, + * [[FlinkStreamProgram.buildProgram]] substitutes the configured variant from + * [[OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE]]. The aggressive + * `WHOLE_EXPRESSIONS` variant must NOT reach Volcano — it oscillates with + * [[FlinkFilterProjectTransposeRule]] when bloat protection is bypassed. See + * PROJECT_FILTER_TRANSPOSE_HANG.md. + */ val PROJECT_RULES: RuleSet = RuleSets.ofList( - // push a projection past a filter + // push a projection past a filter (safe default; HEP swaps in the configured variant) CoreRules.PROJECT_FILTER_TRANSPOSE, // push a projection to the children of a non semi/anti join // push all expressions to handle the time indicator correctly From 8c7e20b4fd56f14ed12154a37fdcc9a558d52f3c Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 27 May 2026 15:11:04 -0400 Subject: [PATCH 2/4] Fixed comments --- .../plan/optimize/program/FlinkStreamProgram.scala | 10 ++-------- .../table/planner/plan/rules/FlinkBatchRuleSets.scala | 11 ----------- .../planner/plan/rules/FlinkStreamRuleSets.scala | 11 ----------- 3 files changed, 2 insertions(+), 30 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala index 2947ce7fea65d3..0079414e535fdb 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala @@ -254,11 +254,7 @@ object FlinkStreamProgram { .build() ) - // TODO this pushing directly into rule set after the fact feels janky and not ideal - // find better way to do this - // project rewrite - // PROJECT_RULES carries CoreRules.PROJECT_FILTER_TRANSPOSE as the safe default for Volcano; - // here in HEP we swap it for the configured variant. + // In PROJECT_REWRITE we utilize user desired transposition method chainedProgram.addLast( PROJECT_REWRITE, FlinkHepRuleSetProgramBuilder.newBuilder @@ -274,9 +270,7 @@ object FlinkStreamProgram { // optimize the logical plan // Note: the project-filter-transpose variant is intentionally NOT appended to the Volcano - // LOGICAL phase. The HEP PROJECT_REWRITE phase above already pushes the transpose down, and - // adding WHOLE_EXPRESSIONS here oscillates with FlinkFilterProjectTransposeRule in - // FILTER_RULES (no bloat protection), expanding the MEMO indefinitely. + // LOGICAL phase. Prevents infinite oscillation with optimization from PROJECT_REWRITE chainedProgram.addLast( LOGICAL, FlinkVolcanoProgramBuilder.newBuilder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 2eea27b4cd4d24..6edaa79a089be3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -204,17 +204,6 @@ object FlinkBatchRuleSets { CoreRules.AGGREGATE_VALUES ) - /** - * RuleSet about project. - * - *

Note: [[CoreRules.PROJECT_FILTER_TRANSPOSE]] is the safe default kept here so it propagates - * into [[LOGICAL_OPT_RULES]] for the Volcano LOGICAL phase. In the HEP `PROJECT_REWRITE` phase, - * [[FlinkBatchProgram.buildProgram]] substitutes the configured variant from - * [[OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE]]. The aggressive - * `WHOLE_EXPRESSIONS` variant must NOT reach Volcano — it oscillates with - * [[FlinkFilterProjectTransposeRule]] when bloat protection is bypassed. See - * PROJECT_FILTER_TRANSPOSE_HANG.md. - */ val PROJECT_RULES: RuleSet = RuleSets.ofList( // push a projection past a filter (safe default; HEP swaps in the configured variant) CoreRules.PROJECT_FILTER_TRANSPOSE, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 86acba2156ad4f..5d970257aa68c3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -202,17 +202,6 @@ object FlinkStreamRuleSets { CoreRules.AGGREGATE_VALUES ) - /** - * RuleSet about project. - * - *

Note: [[CoreRules.PROJECT_FILTER_TRANSPOSE]] is the safe default kept here so it propagates - * into [[LOGICAL_OPT_RULES]] for the Volcano LOGICAL phase. In the HEP `PROJECT_REWRITE` phase, - * [[FlinkStreamProgram.buildProgram]] substitutes the configured variant from - * [[OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE]]. The aggressive - * `WHOLE_EXPRESSIONS` variant must NOT reach Volcano — it oscillates with - * [[FlinkFilterProjectTransposeRule]] when bloat protection is bypassed. See - * PROJECT_FILTER_TRANSPOSE_HANG.md. - */ val PROJECT_RULES: RuleSet = RuleSets.ofList( // push a projection past a filter (safe default; HEP swaps in the configured variant) CoreRules.PROJECT_FILTER_TRANSPOSE, From c40c0e834913a9908b3bb273b93efca3a6883842 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 27 May 2026 15:13:25 -0400 Subject: [PATCH 3/4] Fixed comments pt2 --- .../planner/plan/optimize/program/FlinkBatchProgram.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala index a03a13f0183a05..794ae4e4d7f587 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala @@ -245,9 +245,7 @@ object FlinkBatchProgram { .build() ) - // window rewrite - // PROJECT_RULES carries CoreRules.PROJECT_FILTER_TRANSPOSE as the safe default for Volcano; - // here in HEP we swap it for the configured variant. + // In PROJECT_REWRITE we utilize user desired transposition method chainedProgram.addLast( PROJECT_REWRITE, FlinkHepRuleSetProgramBuilder.newBuilder @@ -263,9 +261,7 @@ object FlinkBatchProgram { // optimize the logical plan // Note: the project-filter-transpose variant is intentionally NOT appended to the Volcano - // LOGICAL phase. The HEP PROJECT_REWRITE phase above already pushes the transpose down, and - // adding WHOLE_EXPRESSIONS here oscillates with FlinkFilterProjectTransposeRule in - // FILTER_RULES (no bloat protection), expanding the MEMO indefinitely. + // LOGICAL phase. Prevents infinite oscillation with optimization from PROJECT_REWRITE chainedProgram.addLast( LOGICAL, FlinkVolcanoProgramBuilder.newBuilder From 13c93373be692f5a8f456980d140397268f7ac15 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 27 May 2026 15:22:55 -0400 Subject: [PATCH 4/4] config desc update --- .../table/api/config/OptimizerConfigOptions.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index 6c43aa0f196c5d..45c9f965bb5e63 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -407,15 +407,9 @@ public class OptimizerConfigOptions { .enumType(ProjectFilterTransposeRule.class) .defaultValue(ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE) .withDescription( - "Selects which Calcite ProjectFilterTransposeRule variant the optimizer uses " - + "when pushing a Project past a Filter. " - + "PROJECT_FILTER_TRANSPOSE (default) splits Project expressions so only " - + "the columns referenced by the Filter remain below it. " - + "PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS preserves each top-level " - + "Project expression as a whole when pushing past the Filter. " - + "PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS treats the Project " - + "as atomic and only pushes it when the entire Project can move below " - + "the Filter - Required for nested projection pushdown + filters."); + "Selects which Calcite ProjectFilterTransposeRule variant the optimizer " + + "uses when pushing a Project past a Filter in the " + + "PROJECT_REWRITE phase."); /** Strategy for handling non-deterministic updates. */ @PublicEvolving @@ -527,7 +521,8 @@ public enum ProjectFilterTransposeRule implements DescribedEnum { text( "Pushes the Project past the Filter without splitting expressions. " + "Each top-level Project expression is preserved as a whole and " - + "either pushed below the Filter or kept above it.")), + + "either pushed below the Filter or kept above it." + + "NOTE: Required for proper nested projection + filtering")), PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS( text( "Pushes the Project past the Filter only when the Project as a whole can be "