Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
r.getTableId(),
r.getSchemaId(),
TableDescriptor.fromJsonBytes(r.getTableJson()),
// For backward compatibility, results returned by old
// clusters do not include the remote data dir
r.hasRemoteDataDir() ? r.getRemoteDataDir() : null,
r.getCreatedTime(),
r.getModifiedTime()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,12 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
pbPartitionInfo ->
new PartitionInfo(
pbPartitionInfo.getPartitionId(),
toResolvedPartitionSpec(
pbPartitionInfo.getPartitionSpec())))
toResolvedPartitionSpec(pbPartitionInfo.getPartitionSpec()),
// For backward compatibility, results returned by old
// clusters do not include the remote data dir
pbPartitionInfo.hasRemoteDataDir()
Comment thread
LiebingYu marked this conversation as resolved.
? pbPartitionInfo.getRemoteDataDir()
: null))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.row.BinaryString.fromString;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
Expand Down Expand Up @@ -169,6 +170,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
.distributedBy(3)
.logFormat(logFormat)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down Expand Up @@ -313,6 +315,7 @@ void testComplexTypeFetch() throws Exception {
.distributedBy(3)
.logFormat(LogFormat.ARROW)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
DATA1_TABLE_INFO.getNumBuckets(),
DATA1_TABLE_INFO.getProperties(),
DATA1_TABLE_INFO.getCustomProperties(),
DATA1_TABLE_INFO.getRemoteDataDir(),
DATA1_TABLE_INFO.getComment().orElse(null),
DATA1_TABLE_INFO.getCreatedTime(),
DATA1_TABLE_INFO.getModifiedTime()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.testutils.DataTestUtils.genLogFile;
import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
Expand Down Expand Up @@ -218,6 +219,7 @@ void testProjection(String format) throws Exception {
.distributedBy(3)
.logFormat(logFormat)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -96,6 +97,7 @@ class RecordAccumulatorTest {
.distributedBy(3)
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), "zstd")
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ public static boolean isAlterableTableOption(String key) {
return ALTERABLE_TABLE_OPTIONS.contains(key);
}

/**
* Returns the default remote data directory from the configuration. Used as a fallback for
* tables or partitions that do not contain remote data directory metadata.
*
* @param conf the Fluss configuration
* @return the default remote data directory path, never {@code null} if the configuration is
* valid (i.e., at least one of {@code remote.data.dir} or {@code remote.data.dirs} is set)
* @throws IllegalConfigurationException if the configuration is invalid (i.e., both {@code
* remote.data.dir} and {@code remote.data.dirs} are unset)
* @see ConfigOptions#REMOTE_DATA_DIR
* @see ConfigOptions#REMOTE_DATA_DIRS
*/
public static String getDefaultRemoteDataDir(Configuration conf) {
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
if (!remoteDataDirs.isEmpty()) {
return remoteDataDirs.get(0);
}

String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
if (remoteDataDir == null) {
throw new IllegalConfigurationException(
String.format(
"Either %s or %s must be configured.",
ConfigOptions.REMOTE_DATA_DIR.key(),
ConfigOptions.REMOTE_DATA_DIRS.key()));
}
return remoteDataDir;
}

@VisibleForTesting
static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
Map<String, ConfigOption<?>> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,27 @@

import org.apache.fluss.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* Information of a partition metadata, includes the partition's name and the partition id that
* represents the unique identifier of the partition.
* Information of a partition metadata, includes partition id (unique identifier of the partition),
* partition name, remote data dir for partitioned data storage, etc.
*
* @since 0.2
*/
@PublicEvolving
public class PartitionInfo {
private final long partitionId;
private final ResolvedPartitionSpec partitionSpec;
private final @Nullable String remoteDataDir;

public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
public PartitionInfo(
long partitionId, ResolvedPartitionSpec partitionSpec, @Nullable String remoteDataDir) {
this.partitionId = partitionId;
this.partitionSpec = partitionSpec;
this.remoteDataDir = remoteDataDir;
}

/** Get the partition id. The id is globally unique in the Fluss cluster. */
Expand All @@ -58,6 +63,11 @@ public PartitionSpec getPartitionSpec() {
return partitionSpec.toPartitionSpec();
}

@Nullable
public String getRemoteDataDir() {
return remoteDataDir;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -67,16 +77,25 @@ public boolean equals(Object o) {
return false;
}
PartitionInfo that = (PartitionInfo) o;
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
return partitionId == that.partitionId
&& Objects.equals(partitionSpec, that.partitionSpec)
&& Objects.equals(remoteDataDir, that.remoteDataDir);
}

@Override
public int hashCode() {
return Objects.hash(partitionId, partitionSpec);
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
}

@Override
public String toString() {
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
return "Partition{name='"
+ getPartitionName()
+ '\''
+ ", id="
+ partitionId
+ ", remoteDataDir="
+ remoteDataDir
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class TableInfo {
private final Configuration properties;
private final TableConfig tableConfig;
private final Configuration customProperties;
private final @Nullable String remoteDataDir;
private final @Nullable String comment;

private final long createdTime;
Expand All @@ -74,6 +75,7 @@ public TableInfo(
int numBuckets,
Configuration properties,
Configuration customProperties,
@Nullable String remoteDataDir,
@Nullable String comment,
long createdTime,
long modifiedTime) {
Expand All @@ -90,6 +92,7 @@ public TableInfo(
this.properties = properties;
this.tableConfig = new TableConfig(properties);
this.customProperties = customProperties;
this.remoteDataDir = remoteDataDir;
this.comment = comment;
this.createdTime = createdTime;
this.modifiedTime = modifiedTime;
Expand Down Expand Up @@ -263,6 +266,12 @@ public Configuration getCustomProperties() {
return customProperties;
}

/** Returns the remote data directory of the table. */
@Nullable
public String getRemoteDataDir() {
return remoteDataDir;
}

/** Returns the comment/description of the table. */
public Optional<String> getComment() {
return Optional.ofNullable(comment);
Expand Down Expand Up @@ -314,6 +323,7 @@ public static TableInfo of(
long tableId,
int schemaId,
TableDescriptor tableDescriptor,
String remoteDataDir,
long createdTime,
long modifiedTime) {
Schema schema = tableDescriptor.getSchema();
Expand All @@ -335,6 +345,7 @@ public static TableInfo of(
numBuckets,
Configuration.fromMap(tableDescriptor.getProperties()),
Configuration.fromMap(tableDescriptor.getCustomProperties()),
remoteDataDir,
tableDescriptor.getComment().orElse(null),
createdTime,
modifiedTime);
Expand All @@ -358,6 +369,7 @@ public boolean equals(Object o) {
&& Objects.equals(partitionKeys, that.partitionKeys)
&& Objects.equals(properties, that.properties)
&& Objects.equals(customProperties, that.customProperties)
&& Objects.equals(remoteDataDir, that.remoteDataDir)
&& Objects.equals(comment, that.comment);
}

Expand All @@ -376,6 +388,7 @@ public int hashCode() {
numBuckets,
properties,
customProperties,
remoteDataDir,
comment);
}

Expand All @@ -402,6 +415,8 @@ public String toString() {
+ properties
+ ", customProperties="
+ customProperties
+ ", remoteDataDir="
+ remoteDataDir
+ ", comment='"
+ comment
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class TestData {
public static final short DEFAULT_SCHEMA_ID = 1;
public static final long BASE_OFFSET = 0L;
public static final byte DEFAULT_MAGIC = CURRENT_LOG_MAGIC_VALUE;
public static final String DEFAULT_REMOTE_DATA_DIR = "/tmp/fluss/remote-data";
// ---------------------------- data1 and related table info begin ---------------------------
public static final List<Object[]> DATA1 =
Arrays.asList(
Expand Down Expand Up @@ -93,6 +94,7 @@ public final class TestData {
DATA1_TABLE_ID,
1,
DATA1_TABLE_DESCRIPTOR,
DEFAULT_REMOTE_DATA_DIR,
currentMillis,
currentMillis);

Expand All @@ -118,6 +120,7 @@ public final class TestData {
PARTITION_TABLE_ID,
1,
DATA1_PARTITIONED_TABLE_DESCRIPTOR,
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());

Expand Down Expand Up @@ -148,6 +151,7 @@ public final class TestData {
DATA1_TABLE_ID_PK,
1,
DATA1_TABLE_DESCRIPTOR_PK,
DEFAULT_REMOTE_DATA_DIR,
currentMillis,
currentMillis);

Expand Down Expand Up @@ -217,6 +221,7 @@ public final class TestData {
DATA2_TABLE_ID,
1,
DATA2_TABLE_DESCRIPTOR,
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
// -------------------------------- data2 info end ------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.fluss.metadata.TablePath.detectInvalidName;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition;
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
Expand Down Expand Up @@ -84,7 +85,8 @@ void testValidatePartitionValues() {
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR)
.build();
TableInfo tableInfo = TableInfo.of(DATA1_TABLE_PATH, 1L, 1, descriptor, 1L, 1L);
TableInfo tableInfo =
TableInfo.of(DATA1_TABLE_PATH, 1L, 1, descriptor, DEFAULT_REMOTE_DATA_DIR, 1L, 1L);
assertThatThrownBy(
() ->
validatePartitionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -82,14 +83,15 @@ private static TableInfo createTableInfo(int numBuckets, boolean isPartitioned)
numBuckets,
new Configuration(),
new Configuration(),
DEFAULT_REMOTE_DATA_DIR,
null, // comment
System.currentTimeMillis(),
System.currentTimeMillis());
}

private static PartitionInfo createPartitionInfo(long partitionId, String partitionName) {
ResolvedPartitionSpec spec = ResolvedPartitionSpec.fromPartitionValue("pt", partitionName);
return new PartitionInfo(partitionId, spec);
return new PartitionInfo(partitionId, spec, DEFAULT_REMOTE_DATA_DIR);
}

// ==================== FRESH_START Tests ====================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.stream.Collectors;

import static org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -646,13 +647,15 @@ void testPartitionsExpiredInFlussButExistInLake(
ResolvedPartitionSpec.fromPartitionName(
Collections.singletonList(isPrimaryKeyTable ? "date" : "name"),
partitionName);
lakePartitionInfos.add(new PartitionInfo(partitionId, partitionSpec));
lakePartitionInfos.add(
new PartitionInfo(partitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR));
}
ResolvedPartitionSpec partitionSpec =
ResolvedPartitionSpec.fromPartitionName(
Collections.singletonList(isPrimaryKeyTable ? "date" : "name"),
hybridPartitionName);
lakePartitionInfos.add(new PartitionInfo(hybridPartitionId, partitionSpec));
lakePartitionInfos.add(
new PartitionInfo(hybridPartitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR));

LakeSource<LakeSplit> lakeSource =
new TestingLakeSource(DEFAULT_BUCKET_NUM, lakePartitionInfos);
Expand Down
Loading