Skip to content

IGNITE-14777 window functions support#12096

Open
oleg-zinovev wants to merge 11 commits intoapache:masterfrom
oleg-zinovev:IGNITE-14777
Open

IGNITE-14777 window functions support#12096
oleg-zinovev wants to merge 11 commits intoapache:masterfrom
oleg-zinovev:IGNITE-14777

Conversation

@oleg-zinovev
Copy link
Copy Markdown

@oleg-zinovev oleg-zinovev commented May 26, 2025

Thank you for submitting the pull request to the Apache Ignite.

In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:

The Contribution Checklist

  • There is a single JIRA ticket related to the pull request.
  • The web-link to the pull request is attached to the JIRA ticket.
  • The JIRA ticket has the Patch Available state.
  • The pull request body describes changes that have been made.
    The description explains WHAT and WHY was made instead of HOW.
  • The pull request title is treated as the final commit message.
    The following pattern must be used: IGNITE-XXXX Change summary where XXXX - number of JIRA issue.
  • A reviewer has been mentioned through the JIRA comments
    (see the Maintainers list)
  • The pull request has been checked by the Teamcity Bot and
    the green visa attached to the JIRA ticket (see TC.Bot: Check PR)

Notes

If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.

Description of Changes:

Added support for planning and executing window functions.
Added special window functions: row_number, rank, dense_rank, percent_rank, cume_dist, ntile, nth_value, first_value, last_value, lag, lead.
Provides two modes of window function execution:

  • With partition buffering
  • Without buffering, accessing only the current and previous row (only for row_number, rank, dense_rank, and standard aggregates with the window specification ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

Supports specifying integer offsets in either variant (ROWS / RANGE) and time interval offsets for RANGE.

Window Planning:

During query planning, windows are split into separate rels for each group of aggregation functions. Each logical window rel includes a collation that is required for correctly partitioning rows and defining frames when computing the window.
Splitting is done using Calcite’s standard rule, which groups function calls based on the window specification (according to the OVER clause).
After that, constants used in the window are projected to support referencing them in the current implementation of Ignite aggregates. (If constants are used in FOLLOWING/PRECEDING, they are directly substituted into the offset, which helps reduce the number of frame boundary searches.)
An additional planning phase was introduced specifically for window planning. (I couldn't find a suitable existing place for the new rules, so I followed the approach used in Apache Drill.)

Separate Change:

During development, when attempting to upgrade to Calcite 1.39, it was discovered that IgniteTypeFactory#leastRestrictive does not take into account the nullability of the resulting type when merging FLOAT and DOUBLE.

P.S. I'll be appreciate to any feedback

@oleg-zinovev oleg-zinovev changed the title IGNITE-14777: row_number() with window functions unsupported IGNITE-14777 row_number() with window functions unsupported May 26, 2025
@oleg-zinovev oleg-zinovev changed the title IGNITE-14777 row_number() with window functions unsupported IGNITE-14777 window functions support May 26, 2025
@alex-plekhanov
Copy link
Copy Markdown
Contributor

@oleg-zinovev, I've partially reviewed your PR. Review not completed yet, but I have some comments to publish. Also there are a lot of codestyle violations. Please read the https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines article about Ignite codestyle. Most of the problems can be detected automatically (for example using command: mvn clean install -DskipTests -Dmaven.javadoc.skip=true -Pcheckstyle -T 1C), but there are also some violated rules that currently not checked by maven, such as:

  • Comments should be started with an upper-case letter and ended with point
  • Braces should not be used for if one-line statements
  • No empty lines allowed after class declaration
  • Comments like /** */ (two spaces inside) are not allowed, use one space
  • Abbreviation should be used for field/variables like 'value', 'group', 'buffer', ..., see full list here: https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules

* - project removing constants
*/
@Value.Enclosing
public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a goal of AccumulatorsFactory in/out adapters to provide rows suitable for accumulator. Let's use this logic instead of creating addional rel ops.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of aggregates in Ignite relies on the indices of values in the row instead of evaluating expressions in AggregateCall#rexList. In this case, constants must be present in the row before it is processed by the aggregate.

If we want to eliminate a separate rule that implements this using projections, I can suggest one of the following options:

  • Redesign AccumulatorsFactory so that it uses AggregateCall#rexList instead of AggregateCall#argList when computing the aggregate, and replace RexInputRef for window constants with RexLiteral in the WindowConverterRule.

  • Add projection of constants when computing window functions so that the row passed to accumulators contains window constants in the required positions. However, in this case, the implementation of WindowNode will include logic that already exists in ProjectionNode.

  • Implement a separate factory for aggregates (or add new methods in the current one) that works directly with RexWinAggCall. In this case, AccumulatorsFactory (or the new factory for window functions) will need to handle a set of RexNode operands, but this will not affect the current logic for regular AggregateCall. Also, this would allow abandoning the transformation of RexWinAggCall into AggregateCall and, thus, avoid copying groups in WindowConverterRule. However, in this scenario, I might need to change the visibility of classes in the package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.

Let me know if you'd like to propose a more robust solution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.

Copy link
Copy Markdown
Author

@oleg-zinovev oleg-zinovev May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alex-plekhanov
Hi,

Could you please take a look at commit e9f9a37?

I removed projection execution as a separate step during query planning (ProjectWindowConstantsRule). Projections are now applied directly during window function evaluation.

However, I have a couple of concerns:

  • The number of projections will now depend on how many window functions are invoked

  • Re-scanning a window partition will require recomputing the projections

I think I can address the first issue by refactoring WindowFunctionFactory so that a single projection is shared across all functions within a group.

But I’m not sure what the best approach is for the second one, other than introducing a cache of already projected rows—which would increase memory usage during execution.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S. Take a look at commit 818771a also

partition = partitionFactory.get();
}
else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) {
partition.drainTo(rowFactory, outBuf);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This operation can block the thread for a long time.
  • Large amount of rows can be stored in outBuf, in worth case there will be 2x input rows count (in partition and in outBuf)

Consider pushing directly to downstream. Drain (and push) only requested amount and postpone next pushes until next request.

doPush();
}

if (partition.add(row)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange API (true to drain). Maybe rework it somehow?

doPush();
}
else
nodeMemoryTracker.onRowAdded(row);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be false positive memory limit exceed error. Tracker need to be reset not only on rewindInternal, but also when partition is fully drained.
Test required for new node (see MemoryQuotasIntegrationTest)

Supplier<WindowPartition<Row>> partitionFactory,
RowHandler.RowFactory<Row> rowFactory
) {
super(ctx, rowType, DFLT_ROW_OVERHEAD);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory tracker row overhead depends at least on count of aggregates (maybe also kinds of aggregates)

register(SqlStdOperatorTable.BIT_XOR);

// Window specific operations
register(SqlStdOperatorTable.ROW_NUMBER);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smoke test for each operand required in StdSqlOperatorsTest

@oleg-zinovev

This comment was marked as outdated.

@oleg-zinovev

This comment was marked as outdated.

else {
Row offsetRow = frame.get(idx);
Object val = get(0, offsetRow);
if (val == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the description of LAG/LEAD function (and other database behavior), we have to return a value even if it is NULL. A default value returns only in the case when a row does not exist.

statement ok
CREATE TABLE t_lag_lead(id INTEGER, val INTEGER);

statement ok
INSERT INTO t_lag_lead VALUES (1, 10), (2, NULL), (3, 30);

query IIII
SELECT id, val,
    LAG(val, 1, 999) OVER (ORDER BY id),
    LEAD(val, 1, 999) OVER (ORDER BY id)
FROM t_lag_lead
ORDER BY id;
----
1	10	999	NULL
2	NULL	10	30
3	30	NULL	999

Copy link
Copy Markdown
Author

@oleg-zinovev oleg-zinovev May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vldpyatkov

Hi.
Fixed except the following:

If the third argument of lag/lead is non-nullable, Calcite changes the return type of the function to non-nullable (org.apache.calcite.sql.fun.SqlLeadLagAggFunction#transformType).

Because of this, your example returns 0 instead of null.

Not entirely sure what the best way to handle this.

public static RelCollation mergeCollations(RelCollation collation0, RelCollation collation1) {
ImmutableBitSet keys = ImmutableBitSet.of(collation0.getKeys());
List<RelFieldCollation> fields = U.arrayList(collation0.getFieldCollations());
for (RelFieldCollation it : collation1.getFieldCollations())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's crucial to maintain ORDER BY direction and nulls ordering, also it's crucial to maintain ORDER BY columns order. Here, if PARTITION BY collation and ORDER BY collation is intersected, PARTITION BY direction (default) and columns order are applied instead of ORDER BY direction and columns order.
For example:

SELECT row_number() OVER (PARTITION BY id ORDER BY id DESC) FROM tbl

Inserts sort node with ASC-nulls-first direction

SELECT row_number() OVER (PARTITION BY id, value ORDER BY value, id DESC) FROM tbl

Inserts sort node with 0, 1 columns order.
Let's add these cases to tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Braces should be used for multi-line for statement

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alex-plekhanov

Hi,
According to the docs (https://sql-academy.org/en/guide/sorting-in-windows-functions), for OVER expressions ORDER BY clause only affects the row order within each partition defined by PARTITION BY.

Therefore, sorting by a field that appears in both ORDER BY and PARTITION BY is redundant. Within a single partition, all rows share the same value of that field, so the sort operation receives identical values as input.

I’ll add the required tests to verify the behavior and compare it across different database management systems.


result = convert(result, inTraits);

// add fields added by current group.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Comments start with upper case.

RelDataTypeFactory.Builder builder = typeFactory.builder();

for (int i = 0; i < inputFieldCnt; i++) {
// add fields from original input, passed through window rel
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Comments start with an uppercase letter and end with a point.

builder.add(windowFields.get(i));
}
for (int i = inputFieldCnt; i < newInputFields.size(); i++) {
// add constants from new input
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Comments start with an uppercase letter and end with a point.

builder.add(newInputFields.get(i));
}
for (int i = inputFieldCnt; i < windowFields.size(); i++) {
// add fields, provided by window
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Comments start with an uppercase letter and end with a point.

return Commons.compile(CastFunction.class, Expressions.toString(F.asList(decl), "\n", false));
}
public class AccumulatorsFactory<Row> extends AccumulatorsFactoryBase<Row> implements Supplier<List<AccumulatorWrapper<Row>>> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Redundant line.

);

/** */
Supplier<WindowPartition<Row>> windowPartitionFactory(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpressionFactory is mostly for RexNode to something convertion. windowPartitionFactory is not directly related to RexNode, so, maybe it's better to use partition factory constructor instead of new ExpressionFactory method.


/** Factory to create {@link WindowPartitionBase} factory from {@link Window.Group}. */
public final class WindowPartitionFactory<Row> implements Supplier<WindowPartition<Row>> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Redundant line.

public final class WindowPartitionFactory<Row> implements Supplier<WindowPartition<Row>> {

/** */
private final Supplier<WindowPartition<Row>> supplier;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional supplier is redundant, WindowPartitionFactory - is already a Supplier.

Comment on lines +45 to +55
List<RelDataType> aggTypes = Commons.transform(calls, AggregateCall::getType);
RowHandler.RowFactory<Row> aggRowFactory = ctx.rowHandler().factory(Commons.typeFactory(), aggTypes);

Comparator<Row> peerCmp;
if (group.isRows)
// peer comparator in meaningless in rows frame.
peerCmp = null;
else
peerCmp = ctx.expressionFactory().comparator(group.collation());

WindowFunctionFactory<Row> accFactory = new WindowFunctionFactory<>(ctx, group, calls, inputRowType);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like some (or all) of these variables can be calculated only once (in constructor), but not for each partition (in supplier.get).

@alex-plekhanov
Copy link
Copy Markdown
Contributor

@oleg-zinovev, sorry for delay with review.
I have reviewed planning/serialization part and have published some comments, but I'm still in progress with exec/window package. I hope to finish review next week
Also, please merge the master branch and resolve conflicts (but please, do not rebase with force push, it's hard to analize changes after rebase).

@Override public Object call(Row row, int rowIdx, int peerIdx, WindowFunctionFrame<Row> frame) {
int startIdx = frame.getFrameStart(row, rowIdx, peerIdx);
Row firstRow = frame.get(startIdx);
return get(0, firstRow);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can tack a value outside of the window if the window is empty. While in this case NULL is expected.

statement ok
CREATE TABLE t_first_value(id INTEGER, val INTEGER);

statement ok
INSERT INTO t_first_value VALUES (1, 10), (2, 20), (3, 30);

query II
SELECT id,
    FIRST_VALUE(val) OVER (
        ORDER BY id
        ROWS BETWEEN 4 PRECEDING AND 2 PRECEDING
    )
FROM t_first_value
ORDER BY id;
----
1	NULL
2	NULL
3	10

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@oleg-zinovev

This comment was marked as outdated.


/** Base implementation of window partition. */
abstract class WindowPartitionBase<Row> implements WindowPartition<Row> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Redundant line

}

/** Creates row with window function results. */
protected final Row createResultRow(RowHandler.RowFactory<Row> rowFactory, Row source, Object... results) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Abbreviation should be used for source

}

/** Compares two rows and return true if current row peer not equal to the previous row peer. */
protected final boolean isNewPeer(Row current, @Nullable Row previous) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Abbreviation should be used for current


/** Partition of rows in window function calculation. */
public interface WindowPartition<Row> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Redundant line


/** */
public final class WindowFunctions {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Redundant line

Comment on lines +217 to +223
for (Object[] newParam : newParams) {
for (Object[] inheritedParam : AbstractExecutionTest.parameters()) {
Object[] both = Stream.concat(Arrays.stream(inheritedParam), Arrays.stream(newParam))
.toArray(Object[]::new);
extraParams.add(both);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        for (Object[] newParam : newParams) {
            for (Object[] inheritedParam : parameters())
                extraParams.add(F.concat(inheritedParam, newParam));
        }

Comment on lines +284 to +286
for (int j = 0; j < testRes.get(i).size(); j++) {
assertEquals(testRes.get(i).get(j), row[3 + j]);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Redundant braces for one-line for loop
3 -> inputRowType.getFieldCount()


IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType inputRowType = TypeUtils.createRowType(tf, int.class, int.class, int.class);
Class<?>[] outFields = new Class<?>[3 + testGrp.aggCalls.size()];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 -> inputRowType.getFieldCount()?

Comment on lines +91 to +98
F.asList(
F.asList(1),
F.asList(2),
F.asList(1),
F.asList(2),
F.asList(3),
F.asList(1)
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of specifying expected results as parameters. Maybe it's better to describe cases inside executeWindow method? For example, declare method:

    private void checkWindow(Window.Group grp, boolean streaming, Object[][] expRes) {

Move content of executeWindow to this method, and call it from executeWIndow like:

        checkWindow(rowNumber(), true, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
        checkWindow(rowNumber(), false, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
        checkWindow(countRows(RexWindowBounds.UNBOUNDED_PRECEDING, RexWindowBounds.CURRENT_ROW), false,
            new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
        ...

* - project removing constants
*/
@Value.Enclosing
public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.

# Conflicts:
#	modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
#	modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@oleg-zinovev
Copy link
Copy Markdown
Author

@alex-plekhanov
Just in case, I’m duplicating the link to the comment on ProjectWindowConstantsRule:

#12096 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants