IGNITE-14777 window functions support#12096
IGNITE-14777 window functions support#12096oleg-zinovev wants to merge 11 commits intoapache:masterfrom
Conversation
9b522ec to
f04ed9f
Compare
f04ed9f to
dbf1e84
Compare
|
@oleg-zinovev, I've partially reviewed your PR. Review not completed yet, but I have some comments to publish. Also there are a lot of codestyle violations. Please read the https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines article about Ignite codestyle. Most of the problems can be detected automatically (for example using command:
|
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
It's a goal of AccumulatorsFactory in/out adapters to provide rows suitable for accumulator. Let's use this logic instead of creating addional rel ops.
There was a problem hiding this comment.
The implementation of aggregates in Ignite relies on the indices of values in the row instead of evaluating expressions in AggregateCall#rexList. In this case, constants must be present in the row before it is processed by the aggregate.
If we want to eliminate a separate rule that implements this using projections, I can suggest one of the following options:
-
Redesign AccumulatorsFactory so that it uses AggregateCall#rexList instead of AggregateCall#argList when computing the aggregate, and replace RexInputRef for window constants with RexLiteral in the WindowConverterRule.
-
Add projection of constants when computing window functions so that the row passed to accumulators contains window constants in the required positions. However, in this case, the implementation of WindowNode will include logic that already exists in ProjectionNode.
-
Implement a separate factory for aggregates (or add new methods in the current one) that works directly with RexWinAggCall. In this case, AccumulatorsFactory (or the new factory for window functions) will need to handle a set of RexNode operands, but this will not affect the current logic for regular AggregateCall. Also, this would allow abandoning the transformation of RexWinAggCall into AggregateCall and, thus, avoid copying groups in WindowConverterRule. However, in this scenario, I might need to change the visibility of classes in the package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.
Let me know if you'd like to propose a more robust solution.
There was a problem hiding this comment.
First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.
There was a problem hiding this comment.
@alex-plekhanov
Hi,
Could you please take a look at commit e9f9a37?
I removed projection execution as a separate step during query planning (ProjectWindowConstantsRule). Projections are now applied directly during window function evaluation.
However, I have a couple of concerns:
-
The number of projections will now depend on how many window functions are invoked
-
Re-scanning a window partition will require recomputing the projections
I think I can address the first issue by refactoring WindowFunctionFactory so that a single projection is shared across all functions within a group.
But I’m not sure what the best approach is for the second one, other than introducing a cache of already projected rows—which would increase memory usage during execution.
| partition = partitionFactory.get(); | ||
| } | ||
| else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { | ||
| partition.drainTo(rowFactory, outBuf); |
There was a problem hiding this comment.
- This operation can block the thread for a long time.
- Large amount of rows can be stored in outBuf, in worth case there will be 2x input rows count (in partition and in outBuf)
Consider pushing directly to downstream. Drain (and push) only requested amount and postpone next pushes until next request.
| doPush(); | ||
| } | ||
|
|
||
| if (partition.add(row)) { |
There was a problem hiding this comment.
Strange API (true to drain). Maybe rework it somehow?
| doPush(); | ||
| } | ||
| else | ||
| nodeMemoryTracker.onRowAdded(row); |
There was a problem hiding this comment.
Can be false positive memory limit exceed error. Tracker need to be reset not only on rewindInternal, but also when partition is fully drained.
Test required for new node (see MemoryQuotasIntegrationTest)
| Supplier<WindowPartition<Row>> partitionFactory, | ||
| RowHandler.RowFactory<Row> rowFactory | ||
| ) { | ||
| super(ctx, rowType, DFLT_ROW_OVERHEAD); |
There was a problem hiding this comment.
Memory tracker row overhead depends at least on count of aggregates (maybe also kinds of aggregates)
| register(SqlStdOperatorTable.BIT_XOR); | ||
|
|
||
| // Window specific operations | ||
| register(SqlStdOperatorTable.ROW_NUMBER); |
There was a problem hiding this comment.
Smoke test for each operand required in StdSqlOperatorsTest
This comment was marked as outdated.
This comment was marked as outdated.
dbf1e84 to
9f135fa
Compare
This comment was marked as outdated.
This comment was marked as outdated.
…g into window partition factory call, window exclusion validation
9f135fa to
13dd848
Compare
| else { | ||
| Row offsetRow = frame.get(idx); | ||
| Object val = get(0, offsetRow); | ||
| if (val == null) { |
There was a problem hiding this comment.
Based on the description of LAG/LEAD function (and other database behavior), we have to return a value even if it is NULL. A default value returns only in the case when a row does not exist.
statement ok
CREATE TABLE t_lag_lead(id INTEGER, val INTEGER);
statement ok
INSERT INTO t_lag_lead VALUES (1, 10), (2, NULL), (3, 30);
query IIII
SELECT id, val,
LAG(val, 1, 999) OVER (ORDER BY id),
LEAD(val, 1, 999) OVER (ORDER BY id)
FROM t_lag_lead
ORDER BY id;
----
1 10 999 NULL
2 NULL 10 30
3 30 NULL 999
There was a problem hiding this comment.
Hi.
Fixed except the following:
If the third argument of lag/lead is non-nullable, Calcite changes the return type of the function to non-nullable (org.apache.calcite.sql.fun.SqlLeadLagAggFunction#transformType).
Because of this, your example returns 0 instead of null.
Not entirely sure what the best way to handle this.
| public static RelCollation mergeCollations(RelCollation collation0, RelCollation collation1) { | ||
| ImmutableBitSet keys = ImmutableBitSet.of(collation0.getKeys()); | ||
| List<RelFieldCollation> fields = U.arrayList(collation0.getFieldCollations()); | ||
| for (RelFieldCollation it : collation1.getFieldCollations()) |
There was a problem hiding this comment.
It's crucial to maintain ORDER BY direction and nulls ordering, also it's crucial to maintain ORDER BY columns order. Here, if PARTITION BY collation and ORDER BY collation is intersected, PARTITION BY direction (default) and columns order are applied instead of ORDER BY direction and columns order.
For example:
SELECT row_number() OVER (PARTITION BY id ORDER BY id DESC) FROM tbl
Inserts sort node with ASC-nulls-first direction
SELECT row_number() OVER (PARTITION BY id, value ORDER BY value, id DESC) FROM tbl
Inserts sort node with 0, 1 columns order.
Let's add these cases to tests.
There was a problem hiding this comment.
Codestyle: Braces should be used for multi-line for statement
There was a problem hiding this comment.
Hi,
According to the docs (https://sql-academy.org/en/guide/sorting-in-windows-functions), for OVER expressions ORDER BY clause only affects the row order within each partition defined by PARTITION BY.
Therefore, sorting by a field that appears in both ORDER BY and PARTITION BY is redundant. Within a single partition, all rows share the same value of that field, so the sort operation receives identical values as input.
I’ll add the required tests to verify the behavior and compare it across different database management systems.
|
|
||
| result = convert(result, inTraits); | ||
|
|
||
| // add fields added by current group. |
There was a problem hiding this comment.
Codestyle: Comments start with upper case.
| RelDataTypeFactory.Builder builder = typeFactory.builder(); | ||
|
|
||
| for (int i = 0; i < inputFieldCnt; i++) { | ||
| // add fields from original input, passed through window rel |
There was a problem hiding this comment.
Codestyle: Comments start with an uppercase letter and end with a point.
| builder.add(windowFields.get(i)); | ||
| } | ||
| for (int i = inputFieldCnt; i < newInputFields.size(); i++) { | ||
| // add constants from new input |
There was a problem hiding this comment.
Codestyle: Comments start with an uppercase letter and end with a point.
| builder.add(newInputFields.get(i)); | ||
| } | ||
| for (int i = inputFieldCnt; i < windowFields.size(); i++) { | ||
| // add fields, provided by window |
There was a problem hiding this comment.
Codestyle: Comments start with an uppercase letter and end with a point.
| return Commons.compile(CastFunction.class, Expressions.toString(F.asList(decl), "\n", false)); | ||
| } | ||
| public class AccumulatorsFactory<Row> extends AccumulatorsFactoryBase<Row> implements Supplier<List<AccumulatorWrapper<Row>>> { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line.
| ); | ||
|
|
||
| /** */ | ||
| Supplier<WindowPartition<Row>> windowPartitionFactory( |
There was a problem hiding this comment.
ExpressionFactory is mostly for RexNode to something convertion. windowPartitionFactory is not directly related to RexNode, so, maybe it's better to use partition factory constructor instead of new ExpressionFactory method.
|
|
||
| /** Factory to create {@link WindowPartitionBase} factory from {@link Window.Group}. */ | ||
| public final class WindowPartitionFactory<Row> implements Supplier<WindowPartition<Row>> { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line.
| public final class WindowPartitionFactory<Row> implements Supplier<WindowPartition<Row>> { | ||
|
|
||
| /** */ | ||
| private final Supplier<WindowPartition<Row>> supplier; |
There was a problem hiding this comment.
Additional supplier is redundant, WindowPartitionFactory - is already a Supplier.
| List<RelDataType> aggTypes = Commons.transform(calls, AggregateCall::getType); | ||
| RowHandler.RowFactory<Row> aggRowFactory = ctx.rowHandler().factory(Commons.typeFactory(), aggTypes); | ||
|
|
||
| Comparator<Row> peerCmp; | ||
| if (group.isRows) | ||
| // peer comparator in meaningless in rows frame. | ||
| peerCmp = null; | ||
| else | ||
| peerCmp = ctx.expressionFactory().comparator(group.collation()); | ||
|
|
||
| WindowFunctionFactory<Row> accFactory = new WindowFunctionFactory<>(ctx, group, calls, inputRowType); |
There was a problem hiding this comment.
Looks like some (or all) of these variables can be calculated only once (in constructor), but not for each partition (in supplier.get).
|
@oleg-zinovev, sorry for delay with review. |
| @Override public Object call(Row row, int rowIdx, int peerIdx, WindowFunctionFrame<Row> frame) { | ||
| int startIdx = frame.getFrameStart(row, rowIdx, peerIdx); | ||
| Row firstRow = frame.get(startIdx); | ||
| return get(0, firstRow); |
There was a problem hiding this comment.
We can tack a value outside of the window if the window is empty. While in this case NULL is expected.
statement ok
CREATE TABLE t_first_value(id INTEGER, val INTEGER);
statement ok
INSERT INTO t_first_value VALUES (1, 10), (2, 20), (3, 30);
query II
SELECT id,
FIRST_VALUE(val) OVER (
ORDER BY id
ROWS BETWEEN 4 PRECEDING AND 2 PRECEDING
)
FROM t_first_value
ORDER BY id;
----
1 NULL
2 NULL
3 10
This comment was marked as outdated.
This comment was marked as outdated.
|
|
||
| /** Base implementation of window partition. */ | ||
| abstract class WindowPartitionBase<Row> implements WindowPartition<Row> { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line
| } | ||
|
|
||
| /** Creates row with window function results. */ | ||
| protected final Row createResultRow(RowHandler.RowFactory<Row> rowFactory, Row source, Object... results) { |
There was a problem hiding this comment.
Codestyle: Abbreviation should be used for source
| } | ||
|
|
||
| /** Compares two rows and return true if current row peer not equal to the previous row peer. */ | ||
| protected final boolean isNewPeer(Row current, @Nullable Row previous) { |
There was a problem hiding this comment.
Codestyle: Abbreviation should be used for current
|
|
||
| /** Partition of rows in window function calculation. */ | ||
| public interface WindowPartition<Row> { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line
|
|
||
| /** */ | ||
| public final class WindowFunctions { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line
| for (Object[] newParam : newParams) { | ||
| for (Object[] inheritedParam : AbstractExecutionTest.parameters()) { | ||
| Object[] both = Stream.concat(Arrays.stream(inheritedParam), Arrays.stream(newParam)) | ||
| .toArray(Object[]::new); | ||
| extraParams.add(both); | ||
| } | ||
| } |
There was a problem hiding this comment.
for (Object[] newParam : newParams) {
for (Object[] inheritedParam : parameters())
extraParams.add(F.concat(inheritedParam, newParam));
}
| for (int j = 0; j < testRes.get(i).size(); j++) { | ||
| assertEquals(testRes.get(i).get(j), row[3 + j]); | ||
| } |
There was a problem hiding this comment.
Codestyle: Redundant braces for one-line for loop
3 -> inputRowType.getFieldCount()
|
|
||
| IgniteTypeFactory tf = ctx.getTypeFactory(); | ||
| RelDataType inputRowType = TypeUtils.createRowType(tf, int.class, int.class, int.class); | ||
| Class<?>[] outFields = new Class<?>[3 + testGrp.aggCalls.size()]; |
There was a problem hiding this comment.
3 -> inputRowType.getFieldCount()?
| F.asList( | ||
| F.asList(1), | ||
| F.asList(2), | ||
| F.asList(1), | ||
| F.asList(2), | ||
| F.asList(3), | ||
| F.asList(1) | ||
| ), |
There was a problem hiding this comment.
I don't like the idea of specifying expected results as parameters. Maybe it's better to describe cases inside executeWindow method? For example, declare method:
private void checkWindow(Window.Group grp, boolean streaming, Object[][] expRes) {
Move content of executeWindow to this method, and call it from executeWIndow like:
checkWindow(rowNumber(), true, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
checkWindow(rowNumber(), false, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
checkWindow(countRows(RexWindowBounds.UNBOUNDED_PRECEDING, RexWindowBounds.CURRENT_ROW), false,
new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
...
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.
# Conflicts: # modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java # modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
1a3964c to
7285f91
Compare
|
@alex-plekhanov |
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summarywhereXXXX- number of JIRA issue.(see the Maintainers list)
the
green visaattached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.
Description of Changes:
Added support for planning and executing window functions.
Added special window functions: row_number, rank, dense_rank, percent_rank, cume_dist, ntile, nth_value, first_value, last_value, lag, lead.
Provides two modes of window function execution:
Supports specifying integer offsets in either variant (ROWS / RANGE) and time interval offsets for RANGE.
Window Planning:
During query planning, windows are split into separate rels for each group of aggregation functions. Each logical window rel includes a collation that is required for correctly partitioning rows and defining frames when computing the window.
Splitting is done using Calcite’s standard rule, which groups function calls based on the window specification (according to the OVER clause).
After that, constants used in the window are projected to support referencing them in the current implementation of Ignite aggregates. (If constants are used in FOLLOWING/PRECEDING, they are directly substituted into the offset, which helps reduce the number of frame boundary searches.)
An additional planning phase was introduced specifically for window planning. (I couldn't find a suitable existing place for the new rules, so I followed the approach used in Apache Drill.)
Separate Change:
During development, when attempting to upgrade to Calcite 1.39, it was discovered that IgniteTypeFactory#leastRestrictive does not take into account the nullability of the resulting type when merging FLOAT and DOUBLE.
P.S. I'll be appreciate to any feedback