Skip to content

[FLINK-39401] Extend raw format to support line-delimiter option#27897

Open
featzhang wants to merge 1 commit intoapache:masterfrom
featzhang:feature/raw-format-line-delimiter
Open

[FLINK-39401] Extend raw format to support line-delimiter option#27897
featzhang wants to merge 1 commit intoapache:masterfrom
featzhang:feature/raw-format-line-delimiter

Conversation

@featzhang
Copy link
Copy Markdown
Member

Summary

This PR extends the raw format to support a new optional raw.line-delimiter config option.

When raw.line-delimiter is set:

  • Deserialization: each incoming message is decoded to a string using raw.charset, split by the delimiter (String.split(Pattern.quote(delimiter), -1)), and one RowData is emitted per segment via deserialize(byte[], Collector<T>).
  • Serialization: the delimiter bytes are appended to the serialized value.

When raw.line-delimiter is not set, all existing behavior is preserved exactly (backward compatible).

Example SQL

CREATE TABLE nginx_log (log STRING) WITH (
  'connector'          = 'kafka',
  'topic'              = 'nginx_log',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'             = 'raw',
  'raw.line-delimiter' = '\n'
);
-- Each Kafka message "line1\nline2\nline3" produces 3 rows
SELECT log FROM nginx_log;

Changes

Class Change
RawFormatOptions Add LINE_DELIMITER ConfigOption<String> with no default value
RawFormatFactory Read new option; pass to schema builders; register in optionalOptions()
RawFormatDeserializationSchema Override deserialize(byte[], Collector) to split by delimiter when set; add lineDelimiter field to equals/hashCode
RawFormatSerializationSchema Append delimiter bytes when set; add lineDelimiter field to equals/hashCode
RawFormatFactoryTest Add testLineDelimiterOption()
RawFormatLineDelimiterTest New test class: 9 tests covering splitting, GBK charset, null message, serialization appending, null row

Test Plan

  • RawFormatLineDelimiterTest (9 tests):
    • No delimiter → single row (regression)
    • \n delimiter → 3 rows
    • Custom multi-char delimiter || → 3 rows
    • Null message with delimiter → 0 rows
    • GBK charset with \n delimiter → correct splitting
    • Serialization without delimiter → no change
    • Serialization with \n → appends \n
    • Serialization with || → appends ||
    • Serialization of null row → returns null
  • RawFormatFactoryTest.testLineDelimiterOption(): verifies factory produces schemas with correct delimiter
  • All 50 existing raw format tests continue to pass

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@featzhang featzhang changed the title [FLINK] Extend raw format to support line-delimiter option [FLINK-39401] Extend raw format to support line-delimiter option Apr 5, 2026
@featzhang
Copy link
Copy Markdown
Member Author

@flinkbot run azure

@rmetzger
Copy link
Copy Markdown
Contributor

rmetzger commented Apr 8, 2026

@featzhang Thanks for this contribution. The commit message in the PR should include the Jira ticket id.

if (lineDelimiter == null || valueBytes == null) {
return valueBytes;
}
byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the Jira note on split(-1) — makes sense for preserving intentional empty middle segments.
One thing I noticed though: the serializer appends the delimiter to every row, so a message it produces always ends with the delimiter. When that message is read back with the same config, split(-1) will emit a trailing empty row. So the two halves of the feature are subtly incompatible when used together on the same table.

When added a round-trip test:
- Serialize "hello" → produces "hello\n"
- Deserialize "hello\n" → expected 1 row, got 2

Would be good to note in the PR about the limitation.

Set<ConfigOption<?>> options = new HashSet<>();
options.add(RawFormatOptions.ENDIANNESS);
options.add(RawFormatOptions.CHARSET);
options.add(RawFormatOptions.LINE_DELIMITER);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw.line-delimiter is not added to docs/content/docs/connectors/table/formats/raw.md or the Chinese equivalent.


Charset charset = Charset.forName(charsetName);
String decoded = new String(message, charset);
String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Pattern.quote(lineDelimiter) is recompiled on every message — since the delimiter is fixed, could this be pre-compiled as a Pattern field in the constructor?

if (lineDelimiter == null || valueBytes == null) {
return valueBytes;
}
byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delimiterBytes is allocated on every row — since both lineDelimiter and charsetName are fixed at construction time, this could be computed once and stored as a field.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Apr 8, 2026
@featzhang featzhang force-pushed the feature/raw-format-line-delimiter branch from 8205f0c to 3161013 Compare April 11, 2026 16:27
@featzhang
Copy link
Copy Markdown
Member Author

@flinkbot run azure

@featzhang
Copy link
Copy Markdown
Member Author

Thanks for the thorough review @spuru9 and @rmetzger!

Here's what was addressed in this update:

  1. Commit message: Updated to include the Jira ticket ID: [FLINK-39401][formats].

  2. Round-trip compatibility fix (serializer/deserializer incompatibility): The deserializer now strips the single trailing empty string produced when a message ends with the delimiter. This makes serialization and deserialization fully round-trip compatible — serialize("hello") → "hello\n" → deserialize → ["hello"] (1 row, not 2). Added testDeserializeTrailingDelimiter_noExtraRow and testRoundTrip_serializeThenDeserialize tests to verify this.

  3. Pre-compiled Pattern: Pattern.quote(lineDelimiter) is now pre-compiled once in the constructor as a Pattern lineDelimiterPattern field, eliminating repeated compilation on every message.

  4. Pre-computed delimiterBytes: delimiterBytes is now computed once in the constructor and stored as a field, eliminating repeated allocation on every row serialization.

  5. Documentation: Added raw.line-delimiter option to both docs/content/docs/connectors/table/formats/raw.md and docs/content.zh/docs/connectors/table/formats/raw.md, including a note on round-trip compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants