Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b4321eb
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
d19096d
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
a32901a
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
42ee2ba
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
c976caa
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
570d670
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 10, 2025
5915428
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
e7da7e5
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 6, 2025
b0f381c
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
804055d
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
2e8578e
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 7, 2025
2c44cee
fix(server): disable server-role in StandardTaskScheduler
Tsukilc Nov 10, 2025
925c384
fix(server): fix npe in non-auth mode
Tsukilc Jan 11, 2026
3974048
fix(server): fix npe in non-auth mode
Tsukilc Jan 11, 2026
a349f62
fix(server): fix npe in non-auth mode
Tsukilc Jan 11, 2026
3e7bc6f
fix(server): remove server.id
Tsukilc Jan 11, 2026
e6cc98b
Merge branch 'fix/scheduler' of https://github.com/hugegraph/hugegrap…
Tsukilc Jan 11, 2026
e6f6487
fix(server): remove task.scheduler_type
Tsukilc Jan 11, 2026
b325dba
fix(server): remove task.scheduler_type
Tsukilc Jan 11, 2026
68b906a
Merge branch 'master' of https://github.com/Tsukilc/incubator-hugegra…
Tsukilc Jan 11, 2026
1113520
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
5ffd20b
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
a110112
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
a31e937
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
d89b9bd
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
5807fb7
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 15, 2026
f8fc58a
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
6dd52e4
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
af85bef
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
b332674
fix(server): fix some issues of the distributed scheduler
Tsukilc Jan 16, 2026
cac78f4
Merge branch 'master' into pr/2937
imbajin Feb 3, 2026
28e0390
fix(server): fix some issues of the distributed scheduler
Tsukilc Feb 10, 2026
5e30cac
Merge branch 'fix/scheduler' of https://github.com/Tsukilc/incubator-…
Tsukilc Feb 10, 2026
b70788f
Revert "fix(server): fix some issues of the distributed scheduler"
Tsukilc Feb 11, 2026
7ba40bd
fix(server): fix some issues of the distributed scheduler
Tsukilc Feb 11, 2026
9ad8c57
Merge branch 'master' of https://github.com/hugegraph/hugegraph into …
Tsukilc Mar 10, 2026
71978ac
fix(server): delete config
Tsukilc Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker/configs/server1-conf/graphs/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ serializer=binary
pd.peers=127.0.0.1:8686,127.0.0.1:8687,127.0.0.1:8688

# task config
task.scheduler_type=local
task.schedule_period=10
task.retry=0
task.wait_timeout=10
1 change: 0 additions & 1 deletion docker/configs/server2-conf/graphs/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ serializer=binary
pd.peers=127.0.0.1:8686,127.0.0.1:8687,127.0.0.1:8688

# task config
task.scheduler_type=local
task.schedule_period=10
task.retry=0
task.wait_timeout=10
1 change: 0 additions & 1 deletion docker/configs/server3-conf/graphs/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ serializer=binary
pd.peers=127.0.0.1:8686,127.0.0.1:8687,127.0.0.1:8688

# task config
task.scheduler_type=local
task.schedule_period=10
task.retry=0
task.wait_timeout=10
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ store=hugegraph
pd.peers=$PD_PEERS_LIST$

# task config
task.scheduler_type=local
task.schedule_period=10
task.retry=0
task.wait_timeout=10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ public class ServerOptions extends OptionHolder {
public static final ConfigOption<String> SERVER_ID =
new ConfigOption<>(
"server.id",
"The id of hugegraph-server.",
disallowEmpty(),
"server-1"
"The id of hugegraph-server, auto-generated if not specified.",
null,
""
);
public static final ConfigOption<String> SERVER_ROLE =
new ConfigOption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.exception.ExistedException;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.io.HugeGraphSONModule;
import org.apache.hugegraph.k8s.K8sDriver;
Expand Down Expand Up @@ -195,7 +197,17 @@ public final class GraphManager {
public GraphManager(HugeConfig conf, EventHub hub) {
LOG.info("Init graph manager");
E.checkArgumentNotNull(conf, "The config can't be null");

// Auto-generate server.id if not configured.
// Random generation is to prevent duplicate id error reports.This id is currently
// meaningless and needs to be completely removed serverInfoManager in
// the future
String server = conf.get(ServerOptions.SERVER_ID);
if (StringUtils.isEmpty(server)) {
server = "server-" + UUID.randomUUID().toString().substring(0, 8);
LOG.info("Auto-generated server.id: {}", server);
conf.setProperty(ServerOptions.SERVER_ID.name(), server);
}
String role = conf.get(ServerOptions.SERVER_ROLE);

this.config = conf;
Expand All @@ -206,10 +218,6 @@ public GraphManager(HugeConfig conf, EventHub hub) {
conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S);
this.startIgnoreSingleGraphError = conf.get(
ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR);
E.checkArgument(server != null && !server.isEmpty(),
"The server name can't be null or empty");
E.checkArgument(role != null && !role.isEmpty(),
"The server role can't be null or empty");
this.graphsDir = conf.get(ServerOptions.GRAPHS);
this.cluster = conf.get(ServerOptions.CLUSTER);
this.graphSpaces = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -276,7 +284,7 @@ private static String serviceId(String graphSpace, Service.ServiceType type,
.replace("_", "-").toLowerCase();
}

private boolean usePD() {
public boolean usePD() {
return this.PDExist;
}

Expand Down Expand Up @@ -1557,6 +1565,14 @@ private void loadGraph(String name, String graphConfPath) {
String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

// Transfer `pd.peers` from server config to graph config
// Only inject if not already configured in graph config
if (!config.containsKey("pd.peers")) {
String pdPeers = this.conf.get(ServerOptions.PD_PEERS);
config.addProperty("pd.peers", pdPeers);
}

this.transferRoleWorkerConfig(config);

Graph graph = GraphFactory.open(config);
Expand Down Expand Up @@ -1637,10 +1653,6 @@ private void checkBackendVersionOrExit(HugeConfig config) {
private void initNodeRole() {
String id = config.get(ServerOptions.SERVER_ID);
String role = config.get(ServerOptions.SERVER_ROLE);
E.checkArgument(StringUtils.isNotEmpty(id),
"The server name can't be null or empty");
E.checkArgument(StringUtils.isNotEmpty(role),
"The server role can't be null or empty");

NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
boolean supportRoleElection = !nodeRole.computer() &&
Expand Down Expand Up @@ -1960,7 +1972,7 @@ public HugeGraph graph(String graphSpace, String name) {
} else if (graph instanceof HugeGraph) {
return (HugeGraph) graph;
}
throw new NotSupportException("graph instance of %s", graph.getClass());
throw new NotFoundException(String.format("Graph '%s' does not exist", name));
}

public void dropGraphLocal(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph {
private final BackendStoreProvider storeProvider;
private final TinkerPopTransaction tx;
private final RamTable ramtable;
private final String schedulerType;
private volatile boolean started;
private volatile boolean closed;
private volatile GraphMode mode;
Expand Down Expand Up @@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) {
this.closed = false;
this.mode = GraphMode.NONE;
this.readMode = GraphReadMode.OLTP_ONLY;
this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);

LockUtil.init(this.spaceGraphName());

Expand Down Expand Up @@ -315,6 +313,7 @@ public String backend() {
return this.storeProvider.type();
}

@Override
public BackendStoreInfo backendStoreInfo() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
// TODO: pass storeProvider.metaStore()
Expand Down Expand Up @@ -465,6 +464,7 @@ public void updateTime(Date updateTime) {
this.updateTime = updateTime;
}

@Override
public void waitStarted() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.schemaTransaction();
Expand Down Expand Up @@ -1629,7 +1629,9 @@ public <T> void submitEphemeralJob(EphemeralJob<T> job) {

@Override
public String schedulerType() {
return StandardHugeGraph.this.schedulerType;
// Use distributed scheduler for hstore backend, otherwise use local
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schedulerType() method now determines the scheduler type based on whether the backend is hstore. However, this logic change is undocumented. Consider adding a comment explaining why hstore backends require distributed scheduling while other backends use local scheduling, as this is an important architectural decision.

Suggested change
// Use distributed scheduler for hstore backend, otherwise use local
/*
* HStore is a distributed backend: data and tasks may be handled by
* multiple graph servers that must coordinate scheduling and state.
* For this reason we require a distributed task scheduler when the
* backend is hstore so that jobs can be balanced and recovered
* across nodes. For other backends, the graph is served by a single
* server instance and tasks are executed locally, so a local
* in-process scheduler is sufficient and avoids the overhead of
* distributed coordination.
*/

Copilot uses AI. Check for mistakes.
// After the merger of rocksdb and hstore, consider whether to change this logic
return StandardHugeGraph.this.isHstore() ? "distributed" : "local";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,7 @@ public class CoreOptions extends OptionHolder {
rangeInt(1, 500),
1
);
public static final ConfigOption<String> SCHEDULER_TYPE =
new ConfigOption<>(
"task.scheduler_type",
"The type of scheduler used in distribution system.",
allowValues("local", "distributed"),
"local"
);

public static final ConfigOption<Boolean> TASK_SYNC_DELETION =
new ConfigOption<>(
"task.sync_deletion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hugegraph.type.define.NodeRole;
import org.apache.hugegraph.util.E;

// TODO: rename to GlobalNodeRoleInfo
// TODO: We need to completely delete the startup of master-worker
public final class GlobalMasterInfo {

private static final NodeInfo NO_MASTER = new NodeInfo(false, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.hugegraph.masterelection;

import java.util.Objects;

import org.apache.hugegraph.task.TaskManager;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import java.util.Objects;

public class StandardRoleListener implements RoleListener {

private static final Logger LOG = Log.logger(StandardRoleListener.class);
Expand All @@ -36,7 +36,6 @@ public class StandardRoleListener implements RoleListener {
public StandardRoleListener(TaskManager taskManager,
GlobalMasterInfo roleInfo) {
this.taskManager = taskManager;
this.taskManager.enableRoleElection();
this.roleInfo = roleInfo;
this.selfIsMaster = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.slf4j.Logger;

public class DistributedTaskScheduler extends TaskAndResultScheduler {

private static final Logger LOG = Log.logger(DistributedTaskScheduler.class);
private final long schedulePeriod;
private final ExecutorService taskDbExecutor;
Expand Down Expand Up @@ -118,6 +121,11 @@ private static boolean sleep(long ms) {
public void cronSchedule() {
// Perform periodic scheduling tasks

// Check closed flag first to exit early
if (this.closed.get()) {
return;
}

if (!this.graph.started() || this.graph.closed()) {
return;
}
Expand Down Expand Up @@ -253,6 +261,10 @@ public <V> Future<?> schedule(HugeTask<V> task) {
return this.ephemeralTaskExecutor.submit(task);
}

// Validate task state before saving to ensure correct exception type
E.checkState(task.type() != null, "Task type can't be null");
E.checkState(task.name() != null, "Task name can't be null");

// Process schema task
// Handle gremlin task
// Handle OLAP calculation tasks
Expand Down Expand Up @@ -284,14 +296,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 业务逻辑设计问题 - delete 方法的强制删除逻辑不一致

DistributedTaskScheduler.delete() 方法第 286-305 行,删除逻辑与原实现存在重大差异:

原逻辑:

  • force=false: 设置状态为 DELETING,返回 null
  • force=true: 直接从数据库删除

新逻辑:

if (!force) {
    if (!task.completed() && task.status() != TaskStatus.DELETING) {
        throw new IllegalArgumentException(
                String.format("Can't delete incomplete task '%s' in status %s, " +
                              "Please try to cancel the task first",
                              id, task.status()));
    }
}
return this.deleteFromDB(id);

问题:

  1. 移除了 DELETING 状态的设置逻辑,这可能破坏依赖定时清理的代码
  2. 非强制删除现在会直接删除完成的任务,而不是先标记为 DELETING
  3. StandardTaskScheduler 的实现可能不一致

建议:重新考虑删除流程,保持与原有逻辑的兼容性,或在 PR 描述中明确说明此行为变更

}

/**
* Note: This method will update the status of the input task.
*
* @param task
* @param <V>
*/
@Override
public <V> void cancel(HugeTask<V> task) {
// Update status to CANCELLING
if (!task.completed()) {
// Task not completed, can only execute status not CANCELLING
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
E.checkArgumentNotNull(task, "Task can't be null");

if (task.completed() || task.cancelling()) {
return;
}

LOG.info("Cancel task '{}' in status {}", task.id(), task.status());

// Check if task is running locally, cancel it directly if so
HugeTask<?> runningTask = this.runningTasks.get(task.id());
if (runningTask != null) {
boolean cancelled = runningTask.cancel(true);
if (cancelled) {
task.overwriteStatus(TaskStatus.CANCELLED);
Comment on lines +318 to +320
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel method now updates the task status directly by calling task.overwriteStatus(TaskStatus.CANCELLED) on line 320 when the task is running locally. However, this doesn't persist the status change to the database. The task status should be saved to ensure consistency between in-memory state and persisted state.

Suggested change
boolean cancelled = runningTask.cancel(true);
if (cancelled) {
task.overwriteStatus(TaskStatus.CANCELLED);
TaskStatus previousStatus = task.status();
boolean cancelled = runningTask.cancel(true);
if (cancelled) {
if (this.updateStatus(task.id(), previousStatus,
TaskStatus.CANCELLED)) {
task.overwriteStatus(TaskStatus.CANCELLED);
} else {
LOG.info("Failed to persist cancelled status for task '{}', " +
"status may have changed from {}",
task.id(), previousStatus);
}

Copilot uses AI. Check for mistakes.
}
LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled);
return;
}

// Task not running locally, update status to CANCELLING
// for cronSchedule() or other nodes to handle
TaskStatus currentStatus = task.status();
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
LOG.info("Failed to cancel task '{}', status may have changed from {}",
task.id(), currentStatus);
} else {
LOG.info("cancel task({}) error, task has completed", task.id());
task.overwriteStatus(TaskStatus.CANCELLING);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 代码可维护性 - TODO 注释应该更具体

第 334 行的 TODO 注释过于模糊:

//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;

问题:

  1. 未说明为什么要移除 serverInfoManager
  2. 未说明移除的时间节点或前提条件
  3. 注释掉的代码应该删除,而不是保留

建议:

Suggested change
}
// TODO(issue-XXX): Remove serverInfoManager.close() after migrating to
// pure single-node architecture. Currently kept for backward compatibility.
return this.serverManager().close();


Expand All @@ -316,14 +355,18 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {

@Override
public <V> HugeTask<V> delete(Id id, boolean force) {
if (!force) {
// Change status to DELETING, perform the deletion operation through automatic
// scheduling.
HugeTask<?> task = this.taskWithoutResult(id);

if (!force && !task.completed()) {
// Check task status: can't delete running tasks without force
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
} else {
return this.deleteFromDB(id);
// Already in DELETING status, delete directly from DB
// Completed tasks can also be deleted directly
}

// Delete from DB directly for completed/DELETING tasks or force=true
return this.deleteFromDB(id);
}

@Override
Expand Down Expand Up @@ -353,6 +396,18 @@ public boolean close() {
cronFuture.cancel(false);
}

// Wait for cron task to complete to ensure all transactions are closed
try {
cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
} catch (CancellationException e) {
// Task was cancelled, this is expected
LOG.debug("Cron task was cancelled");
} catch (TimeoutException e) {
LOG.warn("Cron task did not complete in time when closing scheduler");
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Exception while waiting for cron task to complete", e);
}

if (!this.taskDbExecutor.isShutdown()) {
this.call(() -> {
try {
Expand All @@ -363,7 +418,10 @@ public boolean close() {
this.graph.closeTx();
});
}
return true;

//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,6 @@ public static HugeServerInfo fromVertex(Vertex vertex) {
return serverInfo;
}

public <V> boolean suitableFor(HugeTask<V> task, long now) {
if (task.computer() != this.role.computer()) {
return false;
}
return this.updateTime.getTime() + EXPIRED_INTERVAL >= now &&
this.load() + task.load() <= this.maxLoad;
}

public static Schema schema(HugeGraphParams graph) {
return new Schema(graph);
}
Expand Down
Loading
Loading