diff --git a/docs/content.zh/docs/connectors/table/formats/raw.md b/docs/content.zh/docs/connectors/table/formats/raw.md index ebe756c56e8ae..69b4acfa1a0f4 100644 --- a/docs/content.zh/docs/connectors/table/formats/raw.md +++ b/docs/content.zh/docs/connectors/table/formats/raw.md @@ -105,6 +105,17 @@ Format 参数 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅 字节序。 + +
raw.line-delimiter
+ 可选 + (无) + String + 指定行分隔符,用于在反序列化时将一条消息拆分为多行。设置后,每条消息将使用 + 'raw.charset' 解码,并按此分隔符切分,每段输出一条数据行。在序列化时,分隔符字节会被 + 追加到每条序列化值的末尾。常用值为 '\n'(换行符)或 '||'。 +
注意:当序列化与反序列化使用相同的分隔符配置时,两者具有 + round-trip 兼容性:序列化器追加的末尾分隔符会在反序列化时自动被去除。 + diff --git a/docs/content/docs/connectors/table/formats/raw.md b/docs/content/docs/connectors/table/formats/raw.md index 9596eb07fcd76..1efdd2b40ee4a 100644 --- a/docs/content/docs/connectors/table/formats/raw.md +++ b/docs/content/docs/connectors/table/formats/raw.md @@ -105,6 +105,19 @@ Format Options Specify the endianness to encode the bytes of numeric value. Valid values are 'big-endian' and 'little-endian'. See more details of endianness. + +
raw.line-delimiter
+ optional + (none) + String + Specify the line delimiter for splitting incoming messages into multiple rows during + deserialization. When set, each incoming message is decoded using 'raw.charset' and then split + by this delimiter; one row is emitted per segment. During serialization, the delimiter bytes + are appended to each serialized value. Common values are '\n' (newline) or '||'. +
Note: When the same delimiter is configured for both serialization and + deserialization, the two are round-trip compatible: a trailing delimiter appended by the + serializer is automatically stripped during deserialization. + diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java index 94d1cb30a8e78..650bdb4d4e0b4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java @@ -29,12 +29,16 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.DeserializationException; +import org.apache.flink.util.Collector; + +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Objects; +import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements DeserializationSchema producedTypeInfo, String charsetName, boolean isBigEndian) { + this(deserializedType, producedTypeInfo, charsetName, isBigEndian, null); + } + + public RawFormatDeserializationSchema( + LogicalType deserializedType, + TypeInformation producedTypeInfo, + String charsetName, + boolean isBigEndian, + @Nullable String lineDelimiter) { this.deserializedType = checkNotNull(deserializedType); this.producedTypeInfo = checkNotNull(producedTypeInfo); this.converter = createConverter(deserializedType, charsetName, isBigEndian); this.validator = createDataLengthValidator(deserializedType); this.charsetName = charsetName; this.isBigEndian = isBigEndian; + this.lineDelimiter = lineDelimiter; + this.lineDelimiterPattern = + lineDelimiter != null ? Pattern.compile(Pattern.quote(lineDelimiter)) : null; } @Override @@ -92,6 +116,41 @@ public RowData deserialize(byte[] message) throws IOException { return rowData; } + @Override + public void deserialize(byte[] message, Collector out) throws IOException { + if (lineDelimiter == null) { + // no delimiter: default single-record behavior + RowData row = deserialize(message); + if (row != null) { + out.collect(row); + } + return; + } + + if (message == null) { + return; + } + + Charset charset = Charset.forName(charsetName); + String decoded = new String(message, charset); + // Use pre-compiled pattern. Split with -1 to keep intentional empty middle segments, + // but strip the single trailing empty string produced when the message ends with the + // delimiter (e.g. a serializer that appends one delimiter per row). + String[] parts = lineDelimiterPattern.split(decoded, -1); + int count = parts.length; + if (count > 0 && parts[count - 1].isEmpty()) { + count--; + } + for (int i = 0; i < count; i++) { + byte[] partBytes = parts[i].getBytes(charset); + validator.validate(partBytes); + Object field = converter.convert(partBytes); + GenericRowData rowData = new GenericRowData(1); + rowData.setField(0, field); + out.collect(rowData); + } + } + @Override public boolean isEndOfStream(RowData nextElement) { return false; @@ -114,12 +173,14 @@ public boolean equals(Object o) { return producedTypeInfo.equals(that.producedTypeInfo) && deserializedType.equals(that.deserializedType) && charsetName.equals(that.charsetName) - && isBigEndian == that.isBigEndian; + && isBigEndian == that.isBigEndian + && Objects.equals(lineDelimiter, that.lineDelimiter); } @Override public int hashCode() { - return Objects.hash(producedTypeInfo, deserializedType, charsetName, isBigEndian); + return Objects.hash( + producedTypeInfo, deserializedType, charsetName, isBigEndian, lineDelimiter); } // ------------------------------------------------------------------------ diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java index 5c9dfe3cb1983..05b52cf2c94e4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java @@ -45,6 +45,7 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -72,6 +73,7 @@ public Set> optionalOptions() { Set> options = new HashSet<>(); options.add(RawFormatOptions.ENDIANNESS); options.add(RawFormatOptions.CHARSET); + options.add(RawFormatOptions.LINE_DELIMITER); return options; } @@ -81,6 +83,8 @@ public DecodingFormat> createDecodingFormat( FactoryUtil.validateFactoryOptions(this, formatOptions); final String charsetName = validateAndGetCharsetName(formatOptions); final boolean isBigEndian = isBigEndian(formatOptions); + final Optional lineDelimiter = + formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER); return new DecodingFormat>() { @Override @@ -91,7 +95,11 @@ public DeserializationSchema createRuntimeDecoder( final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); return new RawFormatDeserializationSchema( - fieldType, producedTypeInfo, charsetName, isBigEndian); + fieldType, + producedTypeInfo, + charsetName, + isBigEndian, + lineDelimiter.orElse(null)); } @Override @@ -107,6 +115,8 @@ public EncodingFormat> createEncodingFormat( FactoryUtil.validateFactoryOptions(this, formatOptions); final String charsetName = validateAndGetCharsetName(formatOptions); final boolean isBigEndian = isBigEndian(formatOptions); + final Optional lineDelimiter = + formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER); return new EncodingFormat>() { @Override @@ -114,7 +124,8 @@ public SerializationSchema createRuntimeEncoder( DynamicTableSink.Context context, DataType consumedDataType) { final RowType physicalRowType = (RowType) consumedDataType.getLogicalType(); final LogicalType fieldType = validateAndExtractSingleField(physicalRowType); - return new RawFormatSerializationSchema(fieldType, charsetName, isBigEndian); + return new RawFormatSerializationSchema( + fieldType, charsetName, isBigEndian, lineDelimiter.orElse(null)); } @Override diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java index aa88d6765ea2a..7fa674b7f98e8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java @@ -43,5 +43,14 @@ public class RawFormatOptions { .defaultValue(StandardCharsets.UTF_8.displayName()) .withDescription("Defines the string charset."); + public static final ConfigOption LINE_DELIMITER = + ConfigOptions.key("line-delimiter") + .stringType() + .noDefaultValue() + .withDescription( + "Optional line delimiter. Supports Java escape sequences (e.g. '\\n', '\\r\\n'). " + + "When set, deserialization splits each message by this delimiter and emits " + + "one RowData per part. Serialization appends the delimiter after each row's value."); + private RawFormatOptions() {} } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java index 839cc66abf13a..2f16dee5caa21 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java @@ -27,10 +27,13 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RawType; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Objects; /** Serialization schema that serializes an {@link RowData} object into raw (byte based) value. */ @@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements SerializationSchema' as column type."); } + @Test + void testLineDelimiterOption() { + final Map tableOptions = + getModifiedOptions( + options -> { + options.put("raw.line-delimiter", "\n"); + }); + + // test deserialization schema contains line delimiter + final RawFormatDeserializationSchema expectedDeser = + new RawFormatDeserializationSchema( + ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), "UTF-8", true, "\n"); + DeserializationSchema actualDeser = + createDeserializationSchema(SCHEMA, tableOptions); + assertThat(actualDeser).isEqualTo(expectedDeser); + + // test serialization schema contains line delimiter + final RawFormatSerializationSchema expectedSer = + new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), "UTF-8", true, "\n"); + SerializationSchema actualSer = createSerializationSchema(SCHEMA, tableOptions); + assertThat(actualSer).isEqualTo(expectedSer); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java new file mode 100644 index 0000000000000..041d4c8c19ba6 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.raw; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.raw.RawFormatDeserializationSchema; +import org.apache.flink.formats.raw.RawFormatSerializationSchema; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.UserCodeClassLoader; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link RawFormatDeserializationSchema} and {@link RawFormatSerializationSchema} with + * the {@code raw.line-delimiter} option. + */ +class RawFormatLineDelimiterTest { + + private static final VarCharType STRING_TYPE = VarCharType.STRING_TYPE; + + // ----------------------------------------------------------------------- + // Deserialization tests + // ----------------------------------------------------------------------- + + @Test + void testDeserializeWithoutDelimiter_singleRow() throws Exception { + RawFormatDeserializationSchema schema = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, null); + openDeser(schema); + + List rows = collectRows(schema, "hello".getBytes("UTF-8")); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("hello"); + } + + @Test + void testDeserializeWithNewlineDelimiter_multipleRows() throws Exception { + RawFormatDeserializationSchema schema = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n"); + openDeser(schema); + + byte[] message = "line1\nline2\nline3".getBytes("UTF-8"); + List rows = collectRows(schema, message); + assertThat(rows).hasSize(3); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("line1"); + assertThat(rows.get(1).getString(0).toString()).isEqualTo("line2"); + assertThat(rows.get(2).getString(0).toString()).isEqualTo("line3"); + } + + @Test + void testDeserializeWithCustomMultiCharDelimiter() throws Exception { + RawFormatDeserializationSchema schema = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "||"); + openDeser(schema); + + byte[] message = "record1||record2||record3".getBytes("UTF-8"); + List rows = collectRows(schema, message); + assertThat(rows).hasSize(3); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("record1"); + assertThat(rows.get(1).getString(0).toString()).isEqualTo("record2"); + assertThat(rows.get(2).getString(0).toString()).isEqualTo("record3"); + } + + @Test + void testDeserializeWithNullMessage_noOutput() throws Exception { + RawFormatDeserializationSchema schema = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n"); + openDeser(schema); + + List rows = collectRows(schema, null); + assertThat(rows).isEmpty(); + } + + @Test + void testDeserializeWithGbkCharset() throws Exception { + Charset gbk = Charset.forName("GBK"); + String original = "你好\n世界"; + byte[] message = original.getBytes(gbk); + + RawFormatDeserializationSchema schema = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "GBK", true, "\n"); + openDeser(schema); + + List rows = collectRows(schema, message); + assertThat(rows).hasSize(2); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("你好"); + assertThat(rows.get(1).getString(0).toString()).isEqualTo("世界"); + } + + // ----------------------------------------------------------------------- + // Serialization tests + // ----------------------------------------------------------------------- + + @Test + void testSerializeWithoutDelimiter_noAppend() throws Exception { + RawFormatSerializationSchema schema = + new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, null); + openSer(schema); + + RowData row = buildStringRow("hello"); + byte[] result = schema.serialize(row); + assertThat(result).isEqualTo("hello".getBytes("UTF-8")); + } + + @Test + void testSerializeWithNewlineDelimiter_appendsDelimiter() throws Exception { + RawFormatSerializationSchema schema = + new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "\n"); + openSer(schema); + + RowData row = buildStringRow("hello"); + byte[] result = schema.serialize(row); + assertThat(result).isEqualTo("hello\n".getBytes("UTF-8")); + } + + @Test + void testSerializeWithCustomDelimiter_appendsDelimiter() throws Exception { + RawFormatSerializationSchema schema = + new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "||"); + openSer(schema); + + RowData row = buildStringRow("record1"); + byte[] result = schema.serialize(row); + assertThat(result).isEqualTo("record1||".getBytes("UTF-8")); + } + + @Test + void testSerializeNullRow_returnsNull() throws Exception { + RawFormatSerializationSchema schema = + new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "\n"); + openSer(schema); + + GenericRowData nullRow = new GenericRowData(1); + nullRow.setField(0, null); + byte[] result = schema.serialize(nullRow); + assertThat(result).isNull(); + } + + @Test + void testDeserializeTrailingDelimiter_noExtraRow() throws Exception { + // Verify that a message ending with the delimiter does not produce a trailing empty row. + // This ensures round-trip compatibility: serialize("hello") -> "hello\n" -> + // deserialize -> ["hello"] (1 row, not 2). + RawFormatDeserializationSchema schema = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n"); + openDeser(schema); + + // Message already ends with the delimiter (as produced by the serializer) + byte[] message = "hello\n".getBytes("UTF-8"); + List rows = collectRows(schema, message); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("hello"); + } + + @Test + void testRoundTrip_serializeThenDeserialize() throws Exception { + // Verify that rows written by the serializer can be read back correctly by the + // deserializer when both share the same delimiter configuration. + RawFormatSerializationSchema ser = + new RawFormatSerializationSchema(STRING_TYPE, "UTF-8", true, "\n"); + openSer(ser); + + RawFormatDeserializationSchema deser = + new RawFormatDeserializationSchema( + STRING_TYPE, TypeInformation.of(RowData.class), "UTF-8", true, "\n"); + openDeser(deser); + + // Serialize a single row -> "hello\n" + byte[] serialized = ser.serialize(buildStringRow("hello")); + + // Deserialize "hello\n" -> should yield exactly 1 row + List rows = collectRows(deser, serialized); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("hello"); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private void openDeser(RawFormatDeserializationSchema schema) throws Exception { + schema.open( + new DeserializationSchema.InitializationContext() { + @Override + public MetricGroup getMetricGroup() { + throw new UnsupportedOperationException(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + throw new UnsupportedOperationException(); + } + }); + } + + private void openSer(RawFormatSerializationSchema schema) throws Exception { + schema.open( + new SerializationSchema.InitializationContext() { + @Override + public MetricGroup getMetricGroup() { + throw new UnsupportedOperationException(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + throw new UnsupportedOperationException(); + } + }); + } + + private List collectRows(RawFormatDeserializationSchema schema, byte[] message) + throws Exception { + List rows = new ArrayList<>(); + schema.deserialize( + message, + new Collector() { + @Override + public void collect(RowData record) { + rows.add(record); + } + + @Override + public void close() {} + }); + return rows; + } + + private RowData buildStringRow(String value) { + GenericRowData row = new GenericRowData(1); + row.setField(0, StringData.fromString(value)); + return row; + } +}