Skip to content

[flink] Support batch read for tables without datalake enabled#3133

Open
binary-signal wants to merge 1 commit intoapache:mainfrom
binary-signal:fluss-flink-batch-source
Open

[flink] Support batch read for tables without datalake enabled#3133
binary-signal wants to merge 1 commit intoapache:mainfrom
binary-signal:fluss-flink-batch-source

Conversation

@binary-signal
Copy link
Copy Markdown
Contributor

@binary-signal binary-signal commented Apr 19, 2026

Purpose

Linked issue: close #40, partially addresses #1876

Support batch read for Fluss tables without requiring table.datalake.enabled = true. Previously, Flink batch execution mode was only supported for tables with data lake enabled or for point queries on primary keys. This change enables full table scans in batch mode for both log-only and primary-key tables without data lake, matching the capability already available in the Spark connector.

Brief change log

  • FlinkTableSource: Removed the isDataLakeEnabled guard clause in getScanRuntimeProvider() that blocked batch mode for non-lake tables. The modificationScanType check for UPDATE/DELETE statements on primary keys is preserved.
  • FlinkSourceEnumerator: Added non-lake batch startup path in startInBatchMode(). New methods initBoundedNonPartitionedSplits(), initBoundedPartitionedSplits(), getBoundedLogSplits(), and getBoundedSnapshotAndLogSplits() generate splits with stopping offsets (via OffsetsInitializer.latest()) for bounded reads.
  • HybridSnapshotLogSplit: Added logStoppingOffset field so primary-key table batch reads can bound the log tail after snapshot reading. Added new constructors while maintaining backward compatibility with existing callers.
  • HybridSnapshotLogSplitState: Updated toSourceSplit() to preserve the stopping offset through checkpoint/restore cycles.
  • SourceSplitSerializer: Bumped serialization version from VERSION_0 to VERSION_1 to include the new logStoppingOffset field. Deserialization handles both versions — VERSION_0 defaults logStoppingOffset to NO_STOPPING_OFFSET for backward compatibility.
  • FlinkSourceSplitReader: Extended subscribeLog() to extract and register stopping offsets from HybridSnapshotLogSplit in addition to LogSplit, so the reader correctly stops at the bounded offset for both split types.

Tests

  • SourceSplitSerializerTest: Added testHybridSnapshotLogSplitSerdeWithStoppingOffset (parameterized, partitioned/non-partitioned) and testHybridSnapshotLogSplitV0BackwardCompat (manually crafted VERSION_0 bytes).
  • SourceSplitStateTest: Added testHybridSnapshotLogSplitStateWithStoppingOffset — verifies stopping offset is preserved through setRecordsToSkip() and setNextOffset() state transitions.
  • FlinkSourceEnumeratorTest: Added testBatchModeNonLakeLogTable and testBatchModeNonLakePkTable — verify that the enumerator produces LogSplit / HybridSnapshotLogSplit with valid stopping offsets in batch mode without lake.
  • FlinkTableSourceBatchITCase: Updated 3 existing tests that asserted the removed UnsupportedOperationException — they now verify that batch queries succeed:
    • testScanSingleRowFilterExceptiontestScanWithPartialPrimaryKeyFilter
    • testCountPushDownForPkTable — grouped COUNT now works via batch scan
    • testCountPushDownForLogTable — grouped COUNT now works via batch scan
  • All 348 tests in fluss-flink-common pass (0 failures).

Alignment with #40

This PR fully addresses Case 1 from issue #40:

  • Log tables: reads logs up to the latest offset using LogSplit with stopping offsets — exactly as proposed.
  • PK tables: merges KV snapshot + bounded changelog tail via HybridSnapshotLogSplit — matches the "merge SSTs and change logs" approach.

Case 2 (lake-enabled table where lake snapshot is not yet available) is not addressed in this PR. Currently, startInBatchMode() still throws if generateHybridLakeFlussSplits() returns null for lake-enabled tables. A follow-up could fall back to the non-lake bounded path when no lake snapshot exists yet, rather than forcing users to wait for the first tiering cycle.

This PR also partially addresses #1876 (Support full scan in batch mode for PrimaryKey Table):

  • Subtask 2 (Flink Integration: support non-limited scanning for PK tables in batch execution mode) — addressed. PK tables can now be fully scanned in Flink batch mode without lake or LIMIT, using HybridSnapshotLogSplit with stopping offsets.
  • Subtask 1 (ad-hoc full snapshot scanning client API with streaming fetch mode) — not addressed. This PR uses the existing snapshot-based KvSnapshotBatchScanner which downloads snapshot files. Subtask 1 requires a new server-side RocksDB scan API (tracked in [Umbrella] FIP-17: Support Full KV Scan for Primary Key Tables #1600 / PR [KV] Support Full KV Scan  #2809).

Follow-up

  • Case 2 fallback: When a lake-enabled table has no lake snapshot yet, fall back to the non-lake bounded read path instead of throwing.
  • Server-side KV scan: Once [Umbrella] Support full scan in batch mode for PrimaryKey Table #1876 (Full KV Scan for Primary Key Tables) lands, the bounded read path for primary-key tables can be optimized to use server-side RocksDB scanning instead of downloading KV snapshot files to the client. The current implementation uses the existing snapshot-based BatchScanner approach, which works correctly but requires transferring snapshot files. The server-side scan would stream results directly, avoiding the file transfer overhead.

API and Format

  • Serialization format change: SourceSplitSerializer version bumped from 0 to 1. The new version appends logStoppingOffset (long) to HybridSnapshotLogSplit serialization. Deserialization is backward compatible — version 0 data is read correctly with logStoppingOffset defaulting to NO_STOPPING_OFFSET.
  • No public API changes.

Documentation

  • No documentation changes needed. The behavior change is that batch execution mode now works for non-lake tables, which was previously blocked with an UnsupportedOperationException.

Enable Flink batch execution mode for Fluss tables that do not have
  table.datalake.enabled set to true. Previously, batch queries on
  non-lake tables threw UnsupportedOperationException. Now, log-only
  tables produce LogSplits with stopping offsets, and primary-key tables
  produce HybridSnapshotLogSplits with bounded log tails.

  Closes apache#40

  Co-authored-by: binary-signal <binary-signal@users.noreply.github.com>
  Co-authored-by: Claude <noreply@anthropic.com>

Signed-off-by: Evan <binary-signal@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support batch read table without datalake enabled

1 participant