[FLINK-39401] Extend raw format to support line-delimiter option#27897
[FLINK-39401] Extend raw format to support line-delimiter option#27897featzhang wants to merge 1 commit intoapache:masterfrom
Conversation
|
@flinkbot run azure |
|
@featzhang Thanks for this contribution. The commit message in the PR should include the Jira ticket id. |
| if (lineDelimiter == null || valueBytes == null) { | ||
| return valueBytes; | ||
| } | ||
| byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName)); |
There was a problem hiding this comment.
Thanks for the Jira note on split(-1) — makes sense for preserving intentional empty middle segments.
One thing I noticed though: the serializer appends the delimiter to every row, so a message it produces always ends with the delimiter. When that message is read back with the same config, split(-1) will emit a trailing empty row. So the two halves of the feature are subtly incompatible when used together on the same table.
When added a round-trip test:
- Serialize "hello" → produces "hello\n"
- Deserialize "hello\n" → expected 1 row, got 2
Would be good to note in the PR about the limitation.
| Set<ConfigOption<?>> options = new HashSet<>(); | ||
| options.add(RawFormatOptions.ENDIANNESS); | ||
| options.add(RawFormatOptions.CHARSET); | ||
| options.add(RawFormatOptions.LINE_DELIMITER); |
There was a problem hiding this comment.
raw.line-delimiter is not added to docs/content/docs/connectors/table/formats/raw.md or the Chinese equivalent.
|
|
||
| Charset charset = Charset.forName(charsetName); | ||
| String decoded = new String(message, charset); | ||
| String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1); |
There was a problem hiding this comment.
nit: Pattern.quote(lineDelimiter) is recompiled on every message — since the delimiter is fixed, could this be pre-compiled as a Pattern field in the constructor?
| if (lineDelimiter == null || valueBytes == null) { | ||
| return valueBytes; | ||
| } | ||
| byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName)); |
There was a problem hiding this comment.
nit: delimiterBytes is allocated on every row — since both lineDelimiter and charsetName are fixed at construction time, this could be computed once and stored as a field.
8205f0c to
3161013
Compare
|
@flinkbot run azure |
|
Thanks for the thorough review @spuru9 and @rmetzger! Here's what was addressed in this update:
|
Summary
This PR extends the
rawformat to support a new optionalraw.line-delimiterconfig option.When
raw.line-delimiteris set:raw.charset, split by the delimiter (String.split(Pattern.quote(delimiter), -1)), and oneRowDatais emitted per segment viadeserialize(byte[], Collector<T>).When
raw.line-delimiteris not set, all existing behavior is preserved exactly (backward compatible).Example SQL
Changes
RawFormatOptionsLINE_DELIMITERConfigOption<String>with no default valueRawFormatFactoryoptionalOptions()RawFormatDeserializationSchemadeserialize(byte[], Collector)to split by delimiter when set; addlineDelimiterfield toequals/hashCodeRawFormatSerializationSchemalineDelimiterfield toequals/hashCodeRawFormatFactoryTesttestLineDelimiterOption()RawFormatLineDelimiterTestTest Plan
RawFormatLineDelimiterTest(9 tests):\ndelimiter → 3 rows||→ 3 rows\ndelimiter → correct splitting\n→ appends\n||→ appends||RawFormatFactoryTest.testLineDelimiterOption(): verifies factory produces schemas with correct delimiter