Skip to content

Commit bafba42

Browse files
authored
[server] Add configuration options for multiple remote data directories (#2757)
1 parent e9dbcf1 commit bafba42

6 files changed

Lines changed: 311 additions & 69 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,59 @@ public class ConfigOptions {
9696
.stringType()
9797
.noDefaultValue()
9898
.withDescription(
99-
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
100-
+ " in a Fluss supported filesystem.");
99+
"The directory used for storing the kv snapshot data files and remote log for log tiered storage"
100+
+ " in a Fluss supported filesystem. "
101+
+ "When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration."
102+
+ "For new clusters, it is recommended to use `remote.data.dirs` instead. "
103+
+ "If `remote.data.dirs` is configured, this value will be ignored.");
104+
105+
public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
106+
key("remote.data.dirs")
107+
.stringType()
108+
.asList()
109+
.defaultValues()
110+
.withDescription(
111+
"A comma-separated list of directories in Fluss supported filesystems "
112+
+ "for storing the kv snapshot data files and remote log files of tables/partitions. "
113+
+ "If configured, when a new table or a new partition is created, "
114+
+ "one of the directories from this list will be selected according to the strategy "
115+
+ "specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). "
116+
+ "If not configured, the system uses `"
117+
+ REMOTE_DATA_DIR.key()
118+
+ "` as the sole remote data directory for all data.");
119+
120+
public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
121+
key("remote.data.dirs.strategy")
122+
.enumType(RemoteDataDirStrategy.class)
123+
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
124+
.withDescription(
125+
String.format(
126+
"The strategy for selecting the remote data directory from `%s`. "
127+
+ "The candidate strategies are: %s, the default strategy is %s.\n"
128+
+ "%s: this strategy employs a round-robin approach to select one from the available remote directories.\n"
129+
+ "%s: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`.",
130+
REMOTE_DATA_DIRS.key(),
131+
Arrays.toString(RemoteDataDirStrategy.values()),
132+
RemoteDataDirStrategy.ROUND_ROBIN,
133+
RemoteDataDirStrategy.ROUND_ROBIN,
134+
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN));
135+
136+
public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
137+
key("remote.data.dirs.weights")
138+
.intType()
139+
.asList()
140+
.defaultValues()
141+
.withDescription(
142+
"The weights of the remote data directories. "
143+
+ "This is a list of weights corresponding to the `"
144+
+ REMOTE_DATA_DIRS.key()
145+
+ "` in the same order. When `"
146+
+ REMOTE_DATA_DIRS_STRATEGY.key()
147+
+ "` is set to `"
148+
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
149+
+ "`, this must be configured, and its size must be equal to `"
150+
+ REMOTE_DATA_DIRS.key()
151+
+ "`; otherwise, it will be ignored.");
101152

102153
public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
103154
key("remote.fs.write-buffer-size")
@@ -2076,4 +2127,10 @@ private static class ConfigOptionsHolder {
20762127
public static ConfigOption<?> getConfigOption(String key) {
20772128
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
20782129
}
2130+
2131+
/** Remote data dir select strategy for Fluss. */
2132+
public enum RemoteDataDirStrategy {
2133+
ROUND_ROBIN,
2134+
WEIGHTED_ROUND_ROBIN
2135+
}
20792136
}

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.annotation.VisibleForTesting;
22+
import org.apache.fluss.exception.IllegalConfigurationException;
23+
import org.apache.fluss.fs.FsPath;
2224

2325
import java.lang.reflect.Field;
2426
import java.util.Arrays;
2527
import java.util.HashMap;
2628
import java.util.List;
2729
import java.util.Map;
30+
import java.util.Optional;
2831

2932
/** Utilities of Fluss {@link ConfigOptions}. */
3033
@Internal
@@ -77,4 +80,118 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
7780
}
7881
return options;
7982
}
83+
84+
public static void validateCoordinatorConfigs(Configuration conf) {
85+
validateServerConfigs(conf);
86+
}
87+
88+
public static void validateTabletConfigs(Configuration conf) {
89+
validateServerConfigs(conf);
90+
91+
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
92+
if (!serverId.isPresent()) {
93+
throw new IllegalConfigurationException(
94+
String.format(
95+
"Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID.key()));
96+
}
97+
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
98+
}
99+
100+
/** Validate common server configs. */
101+
protected static void validateServerConfigs(Configuration conf) {
102+
// Validate remote.data.dir and remote.data.dirs
103+
String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
104+
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
105+
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null
106+
&& conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) {
107+
throw new IllegalConfigurationException(
108+
String.format(
109+
"Either %s or %s must be configured.",
110+
ConfigOptions.REMOTE_DATA_DIR.key(),
111+
ConfigOptions.REMOTE_DATA_DIRS.key()));
112+
}
113+
114+
if (remoteDataDir != null) {
115+
// Must validate that remote.data.dir is a valid FsPath
116+
try {
117+
new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
118+
} catch (Exception e) {
119+
throw new IllegalConfigurationException(
120+
String.format(
121+
"Invalid configuration for %s.",
122+
ConfigOptions.REMOTE_DATA_DIR.key()),
123+
e);
124+
}
125+
}
126+
127+
// Validate remote.data.dirs
128+
for (int i = 0; i < remoteDataDirs.size(); i++) {
129+
String dir = remoteDataDirs.get(i);
130+
try {
131+
new FsPath(dir);
132+
} catch (Exception e) {
133+
throw new IllegalConfigurationException(
134+
String.format(
135+
"Invalid remote path for %s at index %d.",
136+
ConfigOptions.REMOTE_DATA_DIRS.key(), i),
137+
e);
138+
}
139+
}
140+
141+
// Validate remote.data.dirs.strategy
142+
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
143+
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
144+
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
145+
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
146+
if (!remoteDataDirs.isEmpty()) {
147+
if (remoteDataDirs.size() != weights.size()) {
148+
throw new IllegalConfigurationException(
149+
String.format(
150+
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
151+
ConfigOptions.REMOTE_DATA_DIRS.key(),
152+
remoteDataDirs.size(),
153+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
154+
weights.size()));
155+
}
156+
157+
// Validate all weights are no less than 0
158+
for (int i = 0; i < weights.size(); i++) {
159+
if (weights.get(i) < 0) {
160+
throw new IllegalConfigurationException(
161+
String.format(
162+
"All weights in '%s' must be no less than 0, but found %d at index %d.",
163+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
164+
weights.get(i),
165+
i));
166+
}
167+
}
168+
}
169+
}
170+
171+
validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
172+
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
173+
validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
174+
validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
175+
176+
if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
177+
throw new IllegalConfigurationException(
178+
String.format(
179+
"Invalid configuration for %s, it must be less than or equal %d bytes.",
180+
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
181+
}
182+
}
183+
184+
private static void validMinValue(
185+
Configuration conf, ConfigOption<Integer> option, int minValue) {
186+
validMinValue(option, conf.get(option), minValue);
187+
}
188+
189+
private static void validMinValue(ConfigOption<Integer> option, int value, int minValue) {
190+
if (value < minValue) {
191+
throw new IllegalConfigurationException(
192+
String.format(
193+
"Invalid configuration for %s, it must be greater than or equal %d.",
194+
option.key(), minValue));
195+
}
196+
}
80197
}

fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717

1818
package org.apache.fluss.config;
1919

20+
import org.apache.fluss.exception.IllegalConfigurationException;
21+
2022
import org.junit.jupiter.api.Test;
2123

24+
import java.util.Arrays;
25+
import java.util.Collections;
2226
import java.util.Map;
2327

2428
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS;
2529
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
2630
import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions;
31+
import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
32+
import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs;
2733
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2835

2936
/** Test for {@link FlussConfigUtils}. */
3037
class FlussConfigUtilsTest {
@@ -49,4 +56,123 @@ void testExtractOptions() {
4956
});
5057
assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size());
5158
}
59+
60+
@Test
61+
void testValidateCoordinatorConfigs() {
62+
// Test empty configuration
63+
Configuration emptyConf = new Configuration();
64+
assertThatThrownBy(() -> validateCoordinatorConfigs(emptyConf))
65+
.isInstanceOf(IllegalConfigurationException.class)
66+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
67+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
68+
.hasMessageContaining("must be configured");
69+
70+
// Test configuration with only REMOTE_DATA_DIR set
71+
Configuration remoteDataDirConf = new Configuration();
72+
remoteDataDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
73+
validateCoordinatorConfigs(remoteDataDirConf);
74+
75+
// Test invalid REMOTE_DATA_DIR
76+
Configuration invalidRemoteDirConf = new Configuration();
77+
invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "123://invalid.com");
78+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirConf))
79+
.isInstanceOf(IllegalConfigurationException.class)
80+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
81+
.hasMessageContaining("Invalid configuration for remote.data.dir");
82+
83+
// Test configuration with only REMOTE_DATA_DIRS set
84+
Configuration remoteDataDirsConf = new Configuration();
85+
remoteDataDirsConf.set(
86+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
87+
validateCoordinatorConfigs(remoteDataDirConf);
88+
89+
// Test REMOTE_DATA_DIRS contains invalid path
90+
Configuration invalidRemoteDirsConf = new Configuration();
91+
invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
92+
invalidRemoteDirsConf.set(
93+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "123://invalid.com"));
94+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirsConf))
95+
.isInstanceOf(IllegalConfigurationException.class)
96+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
97+
.hasMessageContaining("Invalid remote path for");
98+
99+
// Test WEIGHTED_ROUND_ROBIN with mismatched sizes
100+
Configuration mismatchedWeightsConf = new Configuration();
101+
mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
102+
mismatchedWeightsConf.set(
103+
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
104+
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
105+
mismatchedWeightsConf.set(
106+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
107+
mismatchedWeightsConf.set(
108+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Collections.singletonList(1));
109+
assertThatThrownBy(() -> validateCoordinatorConfigs(mismatchedWeightsConf))
110+
.isInstanceOf(IllegalConfigurationException.class)
111+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
112+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key());
113+
114+
// Test WEIGHTED_ROUND_ROBIN with matched sizes
115+
Configuration matchedWeightsConf = new Configuration();
116+
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
117+
matchedWeightsConf.set(
118+
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
119+
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
120+
matchedWeightsConf.set(
121+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
122+
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 2));
123+
validateCoordinatorConfigs(matchedWeightsConf);
124+
125+
// Test negative weight
126+
Configuration negativeWeightConf = new Configuration();
127+
negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
128+
negativeWeightConf.set(
129+
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
130+
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
131+
negativeWeightConf.set(
132+
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
133+
negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(-1, 2));
134+
assertThatThrownBy(() -> validateCoordinatorConfigs(negativeWeightConf))
135+
.isInstanceOf(IllegalConfigurationException.class)
136+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
137+
.hasMessageContaining(
138+
"All weights in 'remote.data.dirs.weights' must be no less than 0");
139+
140+
// Test invalid DEFAULT_REPLICATION_FACTOR
141+
Configuration invalidReplicationConf = new Configuration();
142+
invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
143+
invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0);
144+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf))
145+
.isInstanceOf(IllegalConfigurationException.class)
146+
.hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())
147+
.hasMessageContaining("must be greater than or equal 1");
148+
149+
// Test invalid KV_MAX_RETAINED_SNAPSHOTS
150+
Configuration invalidSnapshotConf = new Configuration();
151+
invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
152+
invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0);
153+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf))
154+
.isInstanceOf(IllegalConfigurationException.class)
155+
.hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())
156+
.hasMessageContaining("must be greater than or equal 1");
157+
158+
// Test invalid SERVER_IO_POOL_SIZE
159+
Configuration invalidIoPoolConf = new Configuration();
160+
invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
161+
invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0);
162+
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf))
163+
.isInstanceOf(IllegalConfigurationException.class)
164+
.hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key())
165+
.hasMessageContaining("must be greater than or equal 1");
166+
}
167+
168+
@Test
169+
void testValidateTabletConfigs() {
170+
Configuration conf = new Configuration();
171+
conf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
172+
conf.set(ConfigOptions.TABLET_SERVER_ID, -1);
173+
assertThatThrownBy(() -> validateTabletConfigs(conf))
174+
.isInstanceOf(IllegalConfigurationException.class)
175+
.hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key())
176+
.hasMessageContaining("it must be greater than or equal 0");
177+
}
52178
}

0 commit comments

Comments
 (0)