diff --git a/docs/content.zh/docs/connectors/table/formats/raw.md b/docs/content.zh/docs/connectors/table/formats/raw.md
index ebe756c56e8ae..69b4acfa1a0f4 100644
--- a/docs/content.zh/docs/connectors/table/formats/raw.md
+++ b/docs/content.zh/docs/connectors/table/formats/raw.md
@@ -105,6 +105,17 @@ Format 参数
指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。
更多细节可查阅 字节序。 |
+
+ raw.line-delimiter |
+ 可选 |
+ (无) |
+ String |
+ 指定行分隔符,用于在反序列化时将一条消息拆分为多行。设置后,每条消息将使用
+ 'raw.charset' 解码,并按此分隔符切分,每段输出一条数据行。在序列化时,分隔符字节会被
+ 追加到每条序列化值的末尾。常用值为 '\n'(换行符)或 '||'。
+ 注意:当序列化与反序列化使用相同的分隔符配置时,两者具有
+ round-trip 兼容性:序列化器追加的末尾分隔符会在反序列化时自动被去除。 |
+
diff --git a/docs/content/docs/connectors/table/formats/raw.md b/docs/content/docs/connectors/table/formats/raw.md
index 9596eb07fcd76..1efdd2b40ee4a 100644
--- a/docs/content/docs/connectors/table/formats/raw.md
+++ b/docs/content/docs/connectors/table/formats/raw.md
@@ -105,6 +105,19 @@ Format Options
Specify the endianness to encode the bytes of numeric value. Valid values are 'big-endian' and 'little-endian'.
See more details of endianness. |
+
+ raw.line-delimiter |
+ optional |
+ (none) |
+ String |
+ Specify the line delimiter for splitting incoming messages into multiple rows during
+ deserialization. When set, each incoming message is decoded using 'raw.charset' and then split
+ by this delimiter; one row is emitted per segment. During serialization, the delimiter bytes
+ are appended to each serialized value. Common values are '\n' (newline) or '||'.
+ Note: When the same delimiter is configured for both serialization and
+ deserialization, the two are round-trip compatible: a trailing delimiter appended by the
+ serializer is automatically stripped during deserialization. |
+
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
index 94d1cb30a8e78..650bdb4d4e0b4 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
@@ -29,12 +29,16 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.DeserializationException;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
+import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements DeserializationSchema producedTypeInfo,
String charsetName,
boolean isBigEndian) {
+ this(deserializedType, producedTypeInfo, charsetName, isBigEndian, null);
+ }
+
+ public RawFormatDeserializationSchema(
+ LogicalType deserializedType,
+ TypeInformation producedTypeInfo,
+ String charsetName,
+ boolean isBigEndian,
+ @Nullable String lineDelimiter) {
this.deserializedType = checkNotNull(deserializedType);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.converter = createConverter(deserializedType, charsetName, isBigEndian);
this.validator = createDataLengthValidator(deserializedType);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
+ this.lineDelimiter = lineDelimiter;
+ this.lineDelimiterPattern =
+ lineDelimiter != null ? Pattern.compile(Pattern.quote(lineDelimiter)) : null;
}
@Override
@@ -92,6 +116,41 @@ public RowData deserialize(byte[] message) throws IOException {
return rowData;
}
+ @Override
+ public void deserialize(byte[] message, Collector out) throws IOException {
+ if (lineDelimiter == null) {
+ // no delimiter: default single-record behavior
+ RowData row = deserialize(message);
+ if (row != null) {
+ out.collect(row);
+ }
+ return;
+ }
+
+ if (message == null) {
+ return;
+ }
+
+ Charset charset = Charset.forName(charsetName);
+ String decoded = new String(message, charset);
+ // Use pre-compiled pattern. Split with -1 to keep intentional empty middle segments,
+ // but strip the single trailing empty string produced when the message ends with the
+ // delimiter (e.g. a serializer that appends one delimiter per row).
+ String[] parts = lineDelimiterPattern.split(decoded, -1);
+ int count = parts.length;
+ if (count > 0 && parts[count - 1].isEmpty()) {
+ count--;
+ }
+ for (int i = 0; i < count; i++) {
+ byte[] partBytes = parts[i].getBytes(charset);
+ validator.validate(partBytes);
+ Object field = converter.convert(partBytes);
+ GenericRowData rowData = new GenericRowData(1);
+ rowData.setField(0, field);
+ out.collect(rowData);
+ }
+ }
+
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
@@ -114,12 +173,14 @@ public boolean equals(Object o) {
return producedTypeInfo.equals(that.producedTypeInfo)
&& deserializedType.equals(that.deserializedType)
&& charsetName.equals(that.charsetName)
- && isBigEndian == that.isBigEndian;
+ && isBigEndian == that.isBigEndian
+ && Objects.equals(lineDelimiter, that.lineDelimiter);
}
@Override
public int hashCode() {
- return Objects.hash(producedTypeInfo, deserializedType, charsetName, isBigEndian);
+ return Objects.hash(
+ producedTypeInfo, deserializedType, charsetName, isBigEndian, lineDelimiter);
}
// ------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
index 5c9dfe3cb1983..05b52cf2c94e4 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
@@ -45,6 +45,7 @@
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -72,6 +73,7 @@ public Set> optionalOptions() {
Set> options = new HashSet<>();
options.add(RawFormatOptions.ENDIANNESS);
options.add(RawFormatOptions.CHARSET);
+ options.add(RawFormatOptions.LINE_DELIMITER);
return options;
}
@@ -81,6 +83,8 @@ public DecodingFormat> createDecodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
+ final Optional lineDelimiter =
+ formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
return new DecodingFormat>() {
@Override
@@ -91,7 +95,11 @@ public DeserializationSchema createRuntimeDecoder(
final TypeInformation producedTypeInfo =
context.createTypeInformation(producedDataType);
return new RawFormatDeserializationSchema(
- fieldType, producedTypeInfo, charsetName, isBigEndian);
+ fieldType,
+ producedTypeInfo,
+ charsetName,
+ isBigEndian,
+ lineDelimiter.orElse(null));
}
@Override
@@ -107,6 +115,8 @@ public EncodingFormat> createEncodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
+ final Optional lineDelimiter =
+ formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
return new EncodingFormat>() {
@Override
@@ -114,7 +124,8 @@ public SerializationSchema createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType physicalRowType = (RowType) consumedDataType.getLogicalType();
final LogicalType fieldType = validateAndExtractSingleField(physicalRowType);
- return new RawFormatSerializationSchema(fieldType, charsetName, isBigEndian);
+ return new RawFormatSerializationSchema(
+ fieldType, charsetName, isBigEndian, lineDelimiter.orElse(null));
}
@Override
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
index aa88d6765ea2a..7fa674b7f98e8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
@@ -43,5 +43,14 @@ public class RawFormatOptions {
.defaultValue(StandardCharsets.UTF_8.displayName())
.withDescription("Defines the string charset.");
+ public static final ConfigOption LINE_DELIMITER =
+ ConfigOptions.key("line-delimiter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional line delimiter. Supports Java escape sequences (e.g. '\\n', '\\r\\n'). "
+ + "When set, deserialization splits each message by this delimiter and emits "
+ + "one RowData per part. Serialization appends the delimiter after each row's value.");
+
private RawFormatOptions() {}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
index 839cc66abf13a..2f16dee5caa21 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
@@ -27,10 +27,13 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RawType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Objects;
/** Serialization schema that serializes an {@link RowData} object into raw (byte based) value. */
@@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements SerializationSchema' as column type.");
}
+ @Test
+ void testLineDelimiterOption() {
+ final Map tableOptions =
+ getModifiedOptions(
+ options -> {
+ options.put("raw.line-delimiter", "\n");
+ });
+
+ // test deserialization schema contains line delimiter
+ final RawFormatDeserializationSchema expectedDeser =
+ new RawFormatDeserializationSchema(
+ ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), "UTF-8", true, "\n");
+ DeserializationSchema actualDeser =
+ createDeserializationSchema(SCHEMA, tableOptions);
+ assertThat(actualDeser).isEqualTo(expectedDeser);
+
+ // test serialization schema contains line delimiter
+ final RawFormatSerializationSchema expectedSer =
+ new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), "UTF-8", true, "\n");
+ SerializationSchema actualSer = createSerializationSchema(SCHEMA, tableOptions);
+ assertThat(actualSer).isEqualTo(expectedSer);
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
new file mode 100644
index 0000000000000..041d4c8c19ba6
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.raw.RawFormatDeserializationSchema;
+import org.apache.flink.formats.raw.RawFormatSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link RawFormatDeserializationSchema} and {@link RawFormatSerializationSchema} with
+ * the {@code raw.line-delimiter} option.
+ */
+class RawFormatLineDelimiterTest {
+
+ private static final VarCharType STRING_TYPE = VarCharType.STRING_TYPE;
+
+ // -----------------------------------------------------------------------
+ // Deserialization tests
+ // -----------------------------------------------------------------------
+
+ @Test
+ void testDeserializeWithoutDelimiter_singleRow() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, null);
+ openDeser(schema);
+
+ List rows = collectRows(schema, "hello".getBytes("UTF-8"));
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("hello");
+ }
+
+ @Test
+ void testDeserializeWithNewlineDelimiter_multipleRows() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n");
+ openDeser(schema);
+
+ byte[] message = "line1\nline2\nline3".getBytes("UTF-8");
+ List rows = collectRows(schema, message);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("line1");
+ assertThat(rows.get(1).getString(0).toString()).isEqualTo("line2");
+ assertThat(rows.get(2).getString(0).toString()).isEqualTo("line3");
+ }
+
+ @Test
+ void testDeserializeWithCustomMultiCharDelimiter() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "||");
+ openDeser(schema);
+
+ byte[] message = "record1||record2||record3".getBytes("UTF-8");
+ List rows = collectRows(schema, message);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("record1");
+ assertThat(rows.get(1).getString(0).toString()).isEqualTo("record2");
+ assertThat(rows.get(2).getString(0).toString()).isEqualTo("record3");
+ }
+
+ @Test
+ void testDeserializeWithNullMessage_noOutput() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n");
+ openDeser(schema);
+
+ List rows = collectRows(schema, null);
+ assertThat(rows).isEmpty();
+ }
+
+ @Test
+ void testDeserializeWithGbkCharset() throws Exception {
+ Charset gbk = Charset.forName("GBK");
+ String original = "你好\n世界";
+ byte[] message = original.getBytes(gbk);
+
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "GBK", true, "\n");
+ openDeser(schema);
+
+ List rows = collectRows(schema, message);
+ assertThat(rows).hasSize(2);
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("你好");
+ assertThat(rows.get(1).getString(0).toString()).isEqualTo("世界");
+ }
+
+ // -----------------------------------------------------------------------
+ // Serialization tests
+ // -----------------------------------------------------------------------
+
+ @Test
+ void testSerializeWithoutDelimiter_noAppend() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, null);
+ openSer(schema);
+
+ RowData row = buildStringRow("hello");
+ byte[] result = schema.serialize(row);
+ assertThat(result).isEqualTo("hello".getBytes("UTF-8"));
+ }
+
+ @Test
+ void testSerializeWithNewlineDelimiter_appendsDelimiter() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "\n");
+ openSer(schema);
+
+ RowData row = buildStringRow("hello");
+ byte[] result = schema.serialize(row);
+ assertThat(result).isEqualTo("hello\n".getBytes("UTF-8"));
+ }
+
+ @Test
+ void testSerializeWithCustomDelimiter_appendsDelimiter() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "||");
+ openSer(schema);
+
+ RowData row = buildStringRow("record1");
+ byte[] result = schema.serialize(row);
+ assertThat(result).isEqualTo("record1||".getBytes("UTF-8"));
+ }
+
+ @Test
+ void testSerializeNullRow_returnsNull() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "\n");
+ openSer(schema);
+
+ GenericRowData nullRow = new GenericRowData(1);
+ nullRow.setField(0, null);
+ byte[] result = schema.serialize(nullRow);
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testDeserializeTrailingDelimiter_noExtraRow() throws Exception {
+ // Verify that a message ending with the delimiter does not produce a trailing empty row.
+ // This ensures round-trip compatibility: serialize("hello") -> "hello\n" ->
+ // deserialize -> ["hello"] (1 row, not 2).
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n");
+ openDeser(schema);
+
+ // Message already ends with the delimiter (as produced by the serializer)
+ byte[] message = "hello\n".getBytes("UTF-8");
+ List rows = collectRows(schema, message);
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("hello");
+ }
+
+ @Test
+ void testRoundTrip_serializeThenDeserialize() throws Exception {
+ // Verify that rows written by the serializer can be read back correctly by the
+ // deserializer when both share the same delimiter configuration.
+ RawFormatSerializationSchema ser =
+ new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "\n");
+ openSer(ser);
+
+ RawFormatDeserializationSchema deser =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n");
+ openDeser(deser);
+
+ // Serialize a single row -> "hello\n"
+ byte[] serialized = ser.serialize(buildStringRow("hello"));
+
+ // Deserialize "hello\n" -> should yield exactly 1 row
+ List rows = collectRows(deser, serialized);
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("hello");
+ }
+
+ // -----------------------------------------------------------------------
+ // Helpers
+ // -----------------------------------------------------------------------
+
+ private void openDeser(RawFormatDeserializationSchema schema) throws Exception {
+ schema.open(
+ new DeserializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+
+ private void openSer(RawFormatSerializationSchema schema) throws Exception {
+ schema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+
+ private List collectRows(RawFormatDeserializationSchema schema, byte[] message)
+ throws Exception {
+ List rows = new ArrayList<>();
+ schema.deserialize(
+ message,
+ new Collector() {
+ @Override
+ public void collect(RowData record) {
+ rows.add(record);
+ }
+
+ @Override
+ public void close() {}
+ });
+ return rows;
+ }
+
+ private RowData buildStringRow(String value) {
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, StringData.fromString(value));
+ return row;
+ }
+}