Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/raw.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ Format 参数
<td>指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。
更多细节可查阅 <a href="https://zh.wikipedia.org/wiki/字节序">字节序</a>。</td>
</tr>
<tr>
<td><h5>raw.line-delimiter</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>指定行分隔符,用于在反序列化时将一条消息拆分为多行。设置后,每条消息将使用
'raw.charset' 解码,并按此分隔符切分,每段输出一条数据行。在序列化时,分隔符字节会被
追加到每条序列化值的末尾。常用值为 '\n'(换行符)或 '||'。
<br><strong>注意:</strong>当序列化与反序列化使用相同的分隔符配置时,两者具有
round-trip 兼容性:序列化器追加的末尾分隔符会在反序列化时自动被去除。</td>
</tr>
</tbody>
</table>

Expand Down
13 changes: 13 additions & 0 deletions docs/content/docs/connectors/table/formats/raw.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ Format Options
<td>Specify the endianness to encode the bytes of numeric value. Valid values are 'big-endian' and 'little-endian'.
See more details of <a href="https://en.wikipedia.org/wiki/Endianness">endianness</a>.</td>
</tr>
<tr>
<td><h5>raw.line-delimiter</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the line delimiter for splitting incoming messages into multiple rows during
deserialization. When set, each incoming message is decoded using 'raw.charset' and then split
by this delimiter; one row is emitted per segment. During serialization, the delimiter bytes
are appended to each serialized value. Common values are '\n' (newline) or '||'.
<br><strong>Note:</strong> When the same delimiter is configured for both serialization and
deserialization, the two are round-trip compatible: a trailing delimiter appended by the
serializer is automatically stripped during deserialization.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements DeserializationSchema<Row

private final boolean isBigEndian;

@Nullable private final String lineDelimiter;

/**
* Pre-compiled pattern for splitting by {@link #lineDelimiter}, or {@code null} if no
* delimiter.
*/
@Nullable private final Pattern lineDelimiterPattern;

private final DeserializationRuntimeConverter converter;

private final DataLengthValidator validator;
Expand All @@ -64,12 +76,24 @@ public RawFormatDeserializationSchema(
TypeInformation<RowData> producedTypeInfo,
String charsetName,
boolean isBigEndian) {
this(deserializedType, producedTypeInfo, charsetName, isBigEndian, null);
}

public RawFormatDeserializationSchema(
LogicalType deserializedType,
TypeInformation<RowData> producedTypeInfo,
String charsetName,
boolean isBigEndian,
@Nullable String lineDelimiter) {
this.deserializedType = checkNotNull(deserializedType);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.converter = createConverter(deserializedType, charsetName, isBigEndian);
this.validator = createDataLengthValidator(deserializedType);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
this.lineDelimiter = lineDelimiter;
this.lineDelimiterPattern =
lineDelimiter != null ? Pattern.compile(Pattern.quote(lineDelimiter)) : null;
}

@Override
Expand All @@ -92,6 +116,41 @@ public RowData deserialize(byte[] message) throws IOException {
return rowData;
}

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
if (lineDelimiter == null) {
// no delimiter: default single-record behavior
RowData row = deserialize(message);
if (row != null) {
out.collect(row);
}
return;
}

if (message == null) {
return;
}

Charset charset = Charset.forName(charsetName);
String decoded = new String(message, charset);
// Use pre-compiled pattern. Split with -1 to keep intentional empty middle segments,
// but strip the single trailing empty string produced when the message ends with the
// delimiter (e.g. a serializer that appends one delimiter per row).
String[] parts = lineDelimiterPattern.split(decoded, -1);
int count = parts.length;
if (count > 0 && parts[count - 1].isEmpty()) {
count--;
}
for (int i = 0; i < count; i++) {
byte[] partBytes = parts[i].getBytes(charset);
validator.validate(partBytes);
Object field = converter.convert(partBytes);
GenericRowData rowData = new GenericRowData(1);
rowData.setField(0, field);
out.collect(rowData);
}
}

@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
Expand All @@ -114,12 +173,14 @@ public boolean equals(Object o) {
return producedTypeInfo.equals(that.producedTypeInfo)
&& deserializedType.equals(that.deserializedType)
&& charsetName.equals(that.charsetName)
&& isBigEndian == that.isBigEndian;
&& isBigEndian == that.isBigEndian
&& Objects.equals(lineDelimiter, that.lineDelimiter);
}

@Override
public int hashCode() {
return Objects.hash(producedTypeInfo, deserializedType, charsetName, isBigEndian);
return Objects.hash(
producedTypeInfo, deserializedType, charsetName, isBigEndian, lineDelimiter);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -72,6 +73,7 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(RawFormatOptions.ENDIANNESS);
options.add(RawFormatOptions.CHARSET);
options.add(RawFormatOptions.LINE_DELIMITER);
return options;
}

Expand All @@ -81,6 +83,8 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
final Optional<String> lineDelimiter =
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);

return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -91,7 +95,11 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new RawFormatDeserializationSchema(
fieldType, producedTypeInfo, charsetName, isBigEndian);
fieldType,
producedTypeInfo,
charsetName,
isBigEndian,
lineDelimiter.orElse(null));
}

@Override
Expand All @@ -107,14 +115,17 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
final Optional<String> lineDelimiter =
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType physicalRowType = (RowType) consumedDataType.getLogicalType();
final LogicalType fieldType = validateAndExtractSingleField(physicalRowType);
return new RawFormatSerializationSchema(fieldType, charsetName, isBigEndian);
return new RawFormatSerializationSchema(
fieldType, charsetName, isBigEndian, lineDelimiter.orElse(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,14 @@ public class RawFormatOptions {
.defaultValue(StandardCharsets.UTF_8.displayName())
.withDescription("Defines the string charset.");

public static final ConfigOption<String> LINE_DELIMITER =
ConfigOptions.key("line-delimiter")
.stringType()
.noDefaultValue()
.withDescription(
"Optional line delimiter. Supports Java escape sequences (e.g. '\\n', '\\r\\n'). "
+ "When set, deserialization splits each message by this delimiter and emits "
+ "one RowData per part. Serialization appends the delimiter after each row's value.");

private RawFormatOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RawType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;

/** Serialization schema that serializes an {@link RowData} object into raw (byte based) value. */
Expand All @@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData

private final boolean isBigEndian;

@Nullable private final String lineDelimiter;

/** Pre-computed delimiter bytes, or {@code null} if no delimiter is set. */
@Nullable private final byte[] delimiterBytes;

public RawFormatSerializationSchema(
LogicalType serializedType, String charsetName, boolean isBigEndian) {
this(serializedType, charsetName, isBigEndian, null);
}

public RawFormatSerializationSchema(
LogicalType serializedType,
String charsetName,
boolean isBigEndian,
@Nullable String lineDelimiter) {
this.serializedType = serializedType;
this.converter = createConverter(serializedType, charsetName, isBigEndian);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
this.lineDelimiter = lineDelimiter;
this.delimiterBytes =
lineDelimiter != null ? lineDelimiter.getBytes(Charset.forName(charsetName)) : null;
}

@Override
Expand All @@ -63,7 +82,13 @@ public void open(InitializationContext context) throws Exception {
@Override
public byte[] serialize(RowData row) {
try {
return converter.convert(row);
byte[] valueBytes = converter.convert(row);
if (delimiterBytes == null || valueBytes == null) {
return valueBytes;
}
byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + delimiterBytes.length);
System.arraycopy(delimiterBytes, 0, result, valueBytes.length, delimiterBytes.length);
return result;
} catch (IOException e) {
throw new RuntimeException("Could not serialize row '" + row + "'. ", e);
}
Expand All @@ -80,12 +105,13 @@ public boolean equals(Object o) {
RawFormatSerializationSchema that = (RawFormatSerializationSchema) o;
return serializedType.equals(that.serializedType)
&& charsetName.equals(that.charsetName)
&& isBigEndian == that.isBigEndian;
&& isBigEndian == that.isBigEndian
&& Objects.equals(lineDelimiter, that.lineDelimiter);
}

@Override
public int hashCode() {
return Objects.hash(serializedType, charsetName, isBigEndian);
return Objects.hash(serializedType, charsetName, isBigEndian, lineDelimiter);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,29 @@ void testInvalidFieldTypes() {
.hasMessage("The 'raw' format doesn't supports 'MAP<INT, STRING>' as column type.");
}

@Test
void testLineDelimiterOption() {
final Map<String, String> tableOptions =
getModifiedOptions(
options -> {
options.put("raw.line-delimiter", "\n");
});

// test deserialization schema contains line delimiter
final RawFormatDeserializationSchema expectedDeser =
new RawFormatDeserializationSchema(
ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), "UTF-8", true, "\n");
DeserializationSchema<RowData> actualDeser =
createDeserializationSchema(SCHEMA, tableOptions);
assertThat(actualDeser).isEqualTo(expectedDeser);

// test serialization schema contains line delimiter
final RawFormatSerializationSchema expectedSer =
new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), "UTF-8", true, "\n");
SerializationSchema<RowData> actualSer = createSerializationSchema(SCHEMA, tableOptions);
assertThat(actualSer).isEqualTo(expectedSer);
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Loading