[INLONG-12125][SDK] Enhance Transform SDK protobuf processing and SQL alias parsing#12126
Merged
Merged
Conversation
aloyszhang
approved these changes
May 27, 2026
vernedeng
approved these changes
May 27, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #12125
Motivation
This PR introduces several enhancements to the Transform SDK for protobuf data processing and SQL alias parsing:
Enhance buildFieldValue() to produce values directly compatible with Flink's RowData type system:
STRING → wrap with BinaryStringData; handle existing BinaryStringData instances
INT/LONG/FLOAT/DOUBLE/BOOLEAN → validate actual types and convert from String if necessary using NumberUtils
BYTE_STRING → handle both ByteString and byte[] input; fallback to ISO_8859_1 encoding for other types
MESSAGE → delegate to buildStructData()
Other types → return as-is
2. Improve PbSourceData node value caching with IdentityHashMap
Replace HashMap with IdentityHashMap for nodeValueCache and mapNodeCache to use reference equality (==) instead of equals() when keying by DynamicMessage instances. This avoids expensive deep-equality comparisons and ensures correct identity-based lookups for protobuf messages.
Fix null handling in PbSourceData.findFieldNode() and findChildField()
Change return value from empty string "" to null when a field cannot be found, making the behavior consistent across the codebase and preventing downstream logic from treating a missing field as an empty string.
Fix DefaultSinkData.getField() to return null for missing fields
Change getOrDefault(fieldName, "") to get(fieldName) so that missing fields return null instead of "". This allows downstream encoders (e.g. RowDataSinkEncoder) to correctly distinguish between "field not set" and "field set to empty string".
Add null filtering in array/map field processing
In getField() array processing: skip null items via if (itemValue != null) before adding to result list
In buildMapData(): skip map entries where key or value is null via if (keyValue != null && valueValue != null)
Enhance findNodeValueByCache() with more precise cache-hit handling
When a path-level cache hit is found at the last node position, apply the full node-type resolution logic (array index extraction, map key lookup) rather than returning the raw cached value directly. This ensures correct results for paths like field(0) or map_field(key) when the parent list/map was cached from a prior access.
Fix SQL alias parsing for special characters (backtick handling)
In TransformProcessor.initTransformSql(), strip surrounding backtick characters from alias names returned by exprItem.getAlias().getName(). JSqlParser's getName() preserves backtick quotes in the returned string, causing a mismatch with the sink field list. This fix allows aliases containing special characters (e.g., *, $) to be properly matched when wrapped in backticks in the SQL.
Modifications
Protobuf data → Flink RowData → Iceberg pipeline requires strict type compatibility (e.g., BinaryStringData for STRING fields)
Identity-based caching avoids O(n) deep-equality for DynamicMessage keys
Consistent null semantics prevent silent data corruption when fields are missing
Aliases containing $ and * characters (common in protobuf map key paths) must be parseable in SQL
Verifying this change
(Please pick either of the following options)
This change is a trivial rework/code cleanup without any test coverage.
This change is already covered by existing tests, such as:
(please describe tests)
This change added tests and can be verified as follows:
(example:)
Documentation