[flink] Support batch read for tables without datalake enabled#3133
Open
binary-signal wants to merge 1 commit intoapache:mainfrom
Open
[flink] Support batch read for tables without datalake enabled#3133binary-signal wants to merge 1 commit intoapache:mainfrom
binary-signal wants to merge 1 commit intoapache:mainfrom
Conversation
Enable Flink batch execution mode for Fluss tables that do not have table.datalake.enabled set to true. Previously, batch queries on non-lake tables threw UnsupportedOperationException. Now, log-only tables produce LogSplits with stopping offsets, and primary-key tables produce HybridSnapshotLogSplits with bounded log tails. Closes apache#40 Co-authored-by: binary-signal <binary-signal@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Signed-off-by: Evan <binary-signal@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #40, partially addresses #1876
Support batch read for Fluss tables without requiring
table.datalake.enabled = true. Previously, Flink batch execution mode was only supported for tables with data lake enabled or for point queries on primary keys. This change enables full table scans in batch mode for both log-only and primary-key tables without data lake, matching the capability already available in the Spark connector.Brief change log
FlinkTableSource: Removed theisDataLakeEnabledguard clause ingetScanRuntimeProvider()that blocked batch mode for non-lake tables. ThemodificationScanTypecheck for UPDATE/DELETE statements on primary keys is preserved.FlinkSourceEnumerator: Added non-lake batch startup path instartInBatchMode(). New methodsinitBoundedNonPartitionedSplits(),initBoundedPartitionedSplits(),getBoundedLogSplits(), andgetBoundedSnapshotAndLogSplits()generate splits with stopping offsets (viaOffsetsInitializer.latest()) for bounded reads.HybridSnapshotLogSplit: AddedlogStoppingOffsetfield so primary-key table batch reads can bound the log tail after snapshot reading. Added new constructors while maintaining backward compatibility with existing callers.HybridSnapshotLogSplitState: UpdatedtoSourceSplit()to preserve the stopping offset through checkpoint/restore cycles.SourceSplitSerializer: Bumped serialization version fromVERSION_0toVERSION_1to include the newlogStoppingOffsetfield. Deserialization handles both versions —VERSION_0defaultslogStoppingOffsettoNO_STOPPING_OFFSETfor backward compatibility.FlinkSourceSplitReader: ExtendedsubscribeLog()to extract and register stopping offsets fromHybridSnapshotLogSplitin addition toLogSplit, so the reader correctly stops at the bounded offset for both split types.Tests
SourceSplitSerializerTest: AddedtestHybridSnapshotLogSplitSerdeWithStoppingOffset(parameterized, partitioned/non-partitioned) andtestHybridSnapshotLogSplitV0BackwardCompat(manually crafted VERSION_0 bytes).SourceSplitStateTest: AddedtestHybridSnapshotLogSplitStateWithStoppingOffset— verifies stopping offset is preserved throughsetRecordsToSkip()andsetNextOffset()state transitions.FlinkSourceEnumeratorTest: AddedtestBatchModeNonLakeLogTableandtestBatchModeNonLakePkTable— verify that the enumerator producesLogSplit/HybridSnapshotLogSplitwith valid stopping offsets in batch mode without lake.FlinkTableSourceBatchITCase: Updated 3 existing tests that asserted the removedUnsupportedOperationException— they now verify that batch queries succeed:testScanSingleRowFilterException→testScanWithPartialPrimaryKeyFiltertestCountPushDownForPkTable— grouped COUNT now works via batch scantestCountPushDownForLogTable— grouped COUNT now works via batch scanfluss-flink-commonpass (0 failures).Alignment with #40
This PR fully addresses Case 1 from issue #40:
LogSplitwith stopping offsets — exactly as proposed.HybridSnapshotLogSplit— matches the "merge SSTs and change logs" approach.Case 2 (lake-enabled table where lake snapshot is not yet available) is not addressed in this PR. Currently,
startInBatchMode()still throws ifgenerateHybridLakeFlussSplits()returns null for lake-enabled tables. A follow-up could fall back to the non-lake bounded path when no lake snapshot exists yet, rather than forcing users to wait for the first tiering cycle.This PR also partially addresses #1876 (Support full scan in batch mode for PrimaryKey Table):
HybridSnapshotLogSplitwith stopping offsets.KvSnapshotBatchScannerwhich downloads snapshot files. Subtask 1 requires a new server-side RocksDB scan API (tracked in [Umbrella] FIP-17: Support Full KV Scan for Primary Key Tables #1600 / PR [KV] Support Full KV Scan #2809).Follow-up
BatchScannerapproach, which works correctly but requires transferring snapshot files. The server-side scan would stream results directly, avoiding the file transfer overhead.API and Format
SourceSplitSerializerversion bumped from 0 to 1. The new version appendslogStoppingOffset(long) toHybridSnapshotLogSplitserialization. Deserialization is backward compatible — version 0 data is read correctly withlogStoppingOffsetdefaulting toNO_STOPPING_OFFSET.Documentation
UnsupportedOperationException.