From 01a12ef48ebfb43d251ffeaed73273cc67efb25e Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Tue, 27 Jan 2026 15:22:51 +0800 Subject: [PATCH 1/4] fix: fix ray redundant execution --- graphgen/engine.py | 49 +++++++++++++++++-- .../operators/generate/generate_service.py | 7 ++- .../operators/partition/partition_service.py | 7 +-- graphgen/run.py | 44 +---------------- 4 files changed, 56 insertions(+), 51 deletions(-) diff --git a/graphgen/engine.py b/graphgen/engine.py index 2f1abf61..f4486eb9 100644 --- a/graphgen/engine.py +++ b/graphgen/engine.py @@ -8,12 +8,38 @@ import ray import ray.data from ray.data import DataContext +from ray.data.block import Block +from ray.data.datasource.filename_provider import FilenameProvider from graphgen.bases import Config, Node from graphgen.common import init_llm, init_storage from graphgen.utils import logger +class NodeFilenameProvider(FilenameProvider): + def __init__(self, node_id: str): + self.node_id = node_id + + def get_filename_for_block( + self, block: Block, write_uuid: str, task_index: int, block_index: int + ) -> str: + # format: {node_id}_{write_uuid}_{task_index:06}_{block_index:06}.json + return f"{self.node_id}_{write_uuid}_{task_index:06d}_{block_index:06d}.jsonl" + + def get_filename_for_row( + self, + row: Dict[str, Any], + write_uuid: str, + task_index: int, + block_index: int, + row_index: int, + ) -> str: + raise NotImplementedError( + f"Row-based filenames are not supported by write_json. " + f"Node: {self.node_id}, write_uuid: {write_uuid}" + ) + + class Engine: def __init__( self, config: Dict[str, Any], functions: Dict[str, Callable], **ray_init_kwargs @@ -263,13 +289,28 @@ def func_wrapper(row_or_batch: Dict[str, Any]) -> Dict[str, Any]: f"Unsupported node type {node.type} for node {node.id}" ) - def execute(self, initial_ds: ray.data.Dataset) -> Dict[str, ray.data.Dataset]: + def execute(self, initial_ds: ray.data.Dataset, output_dir: str): sorted_nodes = self._topo_sort(self.config.nodes) for node in sorted_nodes: + logger.info("Executing node %s of type %s", node.id, node.type) self._execute_node(node, initial_ds) if getattr(node, "save_output", False): - self.datasets[node.id] = self.datasets[node.id].materialize() + node_output_path = os.path.join(output_dir, f"{node.id}") + os.makedirs(node_output_path, exist_ok=True) + logger.info("Saving output of node %s to %s", node.id, node_output_path) + + ds = self.datasets[node.id] + ds.write_json( + node_output_path, + filename_provider=NodeFilenameProvider(node.id), + pandas_json_args_fn=lambda: { + "orient": "records", + "lines": True, + "force_ascii": False, + }, + ) + logger.info("Node %s output saved to %s", node.id, node_output_path) - output_nodes = [n for n in sorted_nodes if getattr(n, "save_output", False)] - return {node.id: self.datasets[node.id] for node in output_nodes} + # ray will lazy read the dataset + self.datasets[node.id] = ray.data.read_json(node_output_path) diff --git a/graphgen/operators/generate/generate_service.py b/graphgen/operators/generate/generate_service.py index 104ab88f..839e25da 100644 --- a/graphgen/operators/generate/generate_service.py +++ b/graphgen/operators/generate/generate_service.py @@ -1,3 +1,5 @@ +import json + import pandas as pd from graphgen.bases import BaseLLMWrapper, BaseOperator @@ -85,7 +87,10 @@ def generate(self, items: list[dict]) -> list[dict]: :return: QA pairs """ logger.info("[Generation] mode: %s, batches: %d", self.method, len(items)) - items = [(item["nodes"], item["edges"]) for item in items] + # items = [(item["nodes"], item["edges"]) for item in items] + items = [ + (json.loads(item["nodes"]), json.loads(item["edges"])) for item in items + ] results = run_concurrent( self.generator.generate, items, diff --git a/graphgen/operators/partition/partition_service.py b/graphgen/operators/partition/partition_service.py index ff215fce..6622e411 100644 --- a/graphgen/operators/partition/partition_service.py +++ b/graphgen/operators/partition/partition_service.py @@ -89,9 +89,10 @@ def partition(self) -> Iterable[pd.DataFrame]: yield pd.DataFrame( { - "nodes": [batch[0]], - "edges": [batch[1]], - } + "nodes": json.dumps(batch[0]), + "edges": json.dumps(batch[1]), + }, + index=[0], ) logger.info("Total communities partitioned: %d", count) diff --git a/graphgen/run.py b/graphgen/run.py index 6b303ee1..26e752ae 100644 --- a/graphgen/run.py +++ b/graphgen/run.py @@ -2,13 +2,10 @@ import os import time from importlib import resources -from typing import Any, Dict import ray import yaml from dotenv import load_dotenv -from ray.data.block import Block -from ray.data.datasource.filename_provider import FilenameProvider from graphgen.engine import Engine from graphgen.operators import operators @@ -32,30 +29,6 @@ def save_config(config_path, global_config): ) -class NodeFilenameProvider(FilenameProvider): - def __init__(self, node_id: str): - self.node_id = node_id - - def get_filename_for_block( - self, block: Block, write_uuid: str, task_index: int, block_index: int - ) -> str: - # format: {node_id}_{write_uuid}_{task_index:06}_{block_index:06}.json - return f"{self.node_id}_{write_uuid}_{task_index:06d}_{block_index:06d}.jsonl" - - def get_filename_for_row( - self, - row: Dict[str, Any], - write_uuid: str, - task_index: int, - block_index: int, - row_index: int, - ) -> str: - raise NotImplementedError( - f"Row-based filenames are not supported by write_json. " - f"Node: {self.node_id}, write_uuid: {write_uuid}" - ) - - def main(): parser = argparse.ArgumentParser() parser.add_argument( @@ -91,22 +64,7 @@ def main(): engine = Engine(config, operators) ds = ray.data.from_items([]) - results = engine.execute(ds) - - for node_id, dataset in results.items(): - logger.info("Saving results for node %s", node_id) - node_output_path = os.path.join(output_path, f"{node_id}") - os.makedirs(node_output_path, exist_ok=True) - dataset.write_json( - node_output_path, - filename_provider=NodeFilenameProvider(node_id), - pandas_json_args_fn=lambda: { - "force_ascii": False, - "orient": "records", - "lines": True, - }, - ) - logger.info("Node %s results saved to %s", node_id, node_output_path) + engine.execute(ds, output_dir=output_path) save_config(os.path.join(output_path, "config.yaml"), config) logger.info("GraphGen completed successfully. Data saved to %s", output_path) From 804ad573d55d392484e317b2be0d685e0f61d92c Mon Sep 17 00:00:00 2001 From: chenzihong <58508660+ChenZiHong-Gavin@users.noreply.github.com> Date: Tue, 27 Jan 2026 15:25:26 +0800 Subject: [PATCH 2/4] Update graphgen/engine.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- graphgen/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphgen/engine.py b/graphgen/engine.py index f4486eb9..879e48a7 100644 --- a/graphgen/engine.py +++ b/graphgen/engine.py @@ -23,7 +23,7 @@ def __init__(self, node_id: str): def get_filename_for_block( self, block: Block, write_uuid: str, task_index: int, block_index: int ) -> str: - # format: {node_id}_{write_uuid}_{task_index:06}_{block_index:06}.json + # format: {node_id}_{write_uuid}_{task_index:06}_{block_index:06}.jsonl return f"{self.node_id}_{write_uuid}_{task_index:06d}_{block_index:06d}.jsonl" def get_filename_for_row( From 01350bb2f4f05add72c2a57694abf214917eae68 Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Tue, 27 Jan 2026 15:26:14 +0800 Subject: [PATCH 3/4] fix: delete useless code --- graphgen/operators/generate/generate_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/graphgen/operators/generate/generate_service.py b/graphgen/operators/generate/generate_service.py index 839e25da..2107876b 100644 --- a/graphgen/operators/generate/generate_service.py +++ b/graphgen/operators/generate/generate_service.py @@ -87,7 +87,6 @@ def generate(self, items: list[dict]) -> list[dict]: :return: QA pairs """ logger.info("[Generation] mode: %s, batches: %d", self.method, len(items)) - # items = [(item["nodes"], item["edges"]) for item in items] items = [ (json.loads(item["nodes"]), json.loads(item["edges"])) for item in items ] From 6102033fa9babd0ba1a53e4e253f71c13d2fefca Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Tue, 27 Jan 2026 15:56:40 +0800 Subject: [PATCH 4/4] fix: update webui --- graphgen/engine.py | 6 +++++- webui/app.py | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/graphgen/engine.py b/graphgen/engine.py index 879e48a7..d09eb106 100644 --- a/graphgen/engine.py +++ b/graphgen/engine.py @@ -289,7 +289,9 @@ def func_wrapper(row_or_batch: Dict[str, Any]) -> Dict[str, Any]: f"Unsupported node type {node.type} for node {node.id}" ) - def execute(self, initial_ds: ray.data.Dataset, output_dir: str): + def execute( + self, initial_ds: ray.data.Dataset, output_dir: str + ) -> Dict[str, ray.data.Dataset]: sorted_nodes = self._topo_sort(self.config.nodes) for node in sorted_nodes: @@ -314,3 +316,5 @@ def execute(self, initial_ds: ray.data.Dataset, output_dir: str): # ray will lazy read the dataset self.datasets[node.id] = ray.data.read_json(node_output_path) + + return self.datasets diff --git a/webui/app.py b/webui/app.py index 08f1907a..140122e4 100644 --- a/webui/app.py +++ b/webui/app.py @@ -1,7 +1,7 @@ +import gc import json import os import sys -import gc import tempfile from importlib.resources import files @@ -188,7 +188,7 @@ def run_graphgen(params: WebuiParams, progress=gr.Progress()): ds = ray.data.from_items([]) # Execute pipeline - results = engine.execute(ds) + results = engine.execute(ds, output_dir=working_dir) # 5. Process Output # Extract the result from the 'generate' node