Skip to content
6 changes: 3 additions & 3 deletions docs/usage/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ FastDeploy generates the following log files during deployment. Below is an expl
By default, logs are stored in the `log` directory under the execution path. To specify a custom directory, set the environment variable `FD_LOG_DIR`.

## Inference Service Logs
* `backup_env.*.json` : Records environment variables set during instance startup. The number of files matches the number of GPU cards.
* `envlog.*` : Logs environment variables set during instance startup. The number of files matches the number of GPU cards.
* `log/paddle/backup_env.*.json` : Records environment variables set during instance startup. The number of files matches the number of GPU cards.
* `log/paddle/envlog.*` : Logs environment variables set during instance startup. The number of files matches the number of GPU cards.
* `console.log` : Records model startup time and other information. This log is also printed to the console.
* `data_processor.log` : Logs input/output data encoding and decoding details.
* `fastdeploy.log` : Records configuration information during instance startup, as well as request and response details during runtime.
* `workerlog.*` : Tracks model loading progress and inference operator errors. Each GPU card has a corresponding file.
* `log/paddle/workerlog.*` : Tracks model loading progress and inference operator errors. Each GPU card has a corresponding file. `log/workerlog.0` is a convenient alias to `log/paddle/workerlog.0`.
* `worker_process.log` : Logs engine inference data for each iteration.
* `cache_manager.log` : Records KV Cache logical index allocation for each request and cache hit status.
* `launch_worker.log` : Logs model startup information and error messages.
Expand Down
6 changes: 3 additions & 3 deletions docs/zh/usage/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ FastDeploy 在部署过程中,会产生如下日志文件,各日志含义说
默认日志目录为执行目录下的 `log` 文件夹,若需要指定可设置环境变量 `FD_LOG_DIR`。

## 推理服务日志
* `backup_env.*.json` : 记录当前实例启动时设置的环境变量,文件个数与卡数相同
* `envlog.*` : 记录当前实例启动时设置的环境变量,文件个数与卡数相同
* `log/paddle/backup_env.*.json` : 记录当前实例启动时设置的环境变量,文件个数与卡数相同
* `log/paddle/envlog.*` : 记录当前实例启动时设置的环境变量,文件个数与卡数相同
* `console.log` : 记录模型启动耗时等信息,该日志信息会被打印到控制台
* `data_processor.log` : 记录输入数据及输出输出编码解码的内容
* `fastdeploy.log` : 记录当前实例启动的各个 config 的信息,运行中记录用户请求的 request 及 response 信息
* `workerlog.*` : 记录模型启动加载进度及推理算子报错信息,每个卡对应一个文件
* `log/paddle/workerlog.*` : 记录模型启动加载进度及推理算子报错信息,每个卡对应一个文件。`log/workerlog.0` 是 `log/paddle/workerlog.0` 的便捷映射。
* `worker_process.log` : 记录引擎每一轮推理的数据
* `cache_manager.log` : 记录每一个请求分配 KV Cache 的逻辑索引,以及当前请求的命中情况
* `launch_worker.log` : 记录模型启动信息及报错信息
Expand Down
46 changes: 23 additions & 23 deletions fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ def read_storage_task(self, task: ReadStorageTask):
match_block_num = self.storage_backend.query(
task.task_id, task.token_ids, task.start_read_block_idx, task.timeout
)
logger.info(f"Matched {match_block_num} blocks in cache storage for read task {task.task_id}")
logger.debug(f"Matched {match_block_num} blocks in cache storage for read task {task.task_id}")

k_cache_keys = k_cache_keys[:match_block_num]
v_cache_keys = v_cache_keys[:match_block_num]
Expand All @@ -749,7 +749,7 @@ def read_storage_task(self, task: ReadStorageTask):
cpu_block_ids,
task.timeout,
)
logger.info(
logger.debug(
f"Successfully read {len(valid_gpu_block_ids)} blocks from cache storage for task {task.task_id}"
)
except Exception as e:
Expand All @@ -761,9 +761,9 @@ def read_storage_task(self, task: ReadStorageTask):
try:
if (self.rank == 0) and self.storage_backend_type == "attention_store":
self.storage_backend.flush_token_index(task.task_id, task.token_ids, 0, True)
logger.info(f"Report cache index in HBM to cache storage for task {task.task_id}")
logger.debug(f"Report cache index in HBM to cache storage for task {task.task_id}")
except Exception as e:
logger.info(
logger.debug(
f"Failed to report cache index in HBM to cache storage for task {task.task_id}, error: {e}"
)

Expand Down Expand Up @@ -914,10 +914,10 @@ def write_back_storage_task(self, task: WriteStorageTask):
)
elif self.storage_backend_type == "attention_store":
match_block_num = self.storage_backend.query(task.task_id, task.token_ids, 0, task.timeout)
logger.info(f"Matched {match_block_num} blocks in cache storage for write task {task.task_id}")
logger.debug(f"Matched {match_block_num} blocks in cache storage for write task {task.task_id}")

if match_block_num >= len(k_cache_keys):
logger.info(f"No uncached keys found for task {task.task_id}")
logger.debug(f"No uncached keys found for task {task.task_id}")
gpu_block_ids = []
else:
try:
Expand All @@ -940,7 +940,7 @@ def write_back_storage_task(self, task: WriteStorageTask):
cpu_block_ids,
task.timeout,
)
logger.info(
logger.debug(
f"Successfully wrote {write_block_num} blocks to cache storage for task {task.task_id}"
)
except Exception as e:
Expand All @@ -950,9 +950,9 @@ def write_back_storage_task(self, task: WriteStorageTask):
try:
if (self.rank == 0) and self.storage_backend_type == "attention_store":
self.storage_backend.flush_token_index(task.task_id, task.token_ids, 0, False)
logger.info(f"Report cache index out HBM to cache storage for task {task.task_id}")
logger.debug(f"Report cache index out HBM to cache storage for task {task.task_id}")
except Exception as e:
logger.info(
logger.debug(
f"Failed to report cache index out HBM to cache storage for task {task.task_id}, error: {e}"
)

Expand Down Expand Up @@ -993,7 +993,7 @@ def _do_swap_to_cpu_task(
self.cache_task_queue.swap_to_cpu_barrier2.reset()
self.cache_task_queue.put_transfer_done_signal(result)
logger.debug(f"_do_swap_to_cpu_task: put_transfer_done_signal {result}")
logger.info(f"_do_swap_to_cpu_task: put_transfer_done_signal for transfer_task_id {transfer_task_id}")
logger.debug(f"_do_swap_to_cpu_task: put_transfer_done_signal for transfer_task_id {transfer_task_id}")

def _do_swap_to_gpu_task(
self,
Expand Down Expand Up @@ -1021,7 +1021,7 @@ def _do_swap_to_gpu_task(
self.cache_task_queue.swap_to_gpu_barrier2.reset()
self.cache_task_queue.put_transfer_done_signal(result)
logger.debug(f"_do_swap_to_gpu_task: put_transfer_done_signal {result}")
logger.info(f"_do_swap_to_gpu_task: put_transfer_done_signal for transfer_task_id {transfer_task_id}")
logger.debug(f"_do_swap_to_gpu_task: put_transfer_done_signal for transfer_task_id {transfer_task_id}")

def check_work_status(self, time_interval_threashold=envs.FD_CACHE_PROC_EXIT_TIMEOUT):
"""
Expand Down Expand Up @@ -1163,7 +1163,7 @@ def do_data_transfer(self):
continue

except Exception as e:
logger.info(f"do_data_transfer: error: {e}, {str(traceback.format_exc())}")
logger.debug(f"do_data_transfer: error: {e}, {str(traceback.format_exc())}")

def _transfer_data(
self,
Expand Down Expand Up @@ -1267,15 +1267,15 @@ def _transfer_data(
1,
)
else:
logger.warning(
logger.debug(
f"transfer data: Get unexpected event type {event_type}, only SWAP2CPU and SWAP2GPU supported"
)
except Exception as e:
logger.error(f"transfer data: error: {e}")
raise e
end_time = time.time()
elasped_time = end_time - start_time
logger.info(
logger.debug(
f"transfer data: transfer_task_id {transfer_task_id} event_type {event_type}: "
+ f"transfer {len(gpu_block_ids)} blocks done elapsed_time {elasped_time:.4f}"
)
Expand All @@ -1291,7 +1291,7 @@ def check_cache_status(self, args):
# TODO XPU support RL
if unset_data_ipc is None:
return
logger.info("[RL] Launch a thread to clear/restore kv cache when model weights are cleared/updated.")
logger.debug("[RL] Launch a thread to clear/restore kv cache when model weights are cleared/updated.")
while True:
# handle cache clearing/restoring
if self.kv_cache_status_signal.value[0] == KVCacheStatus.CLEARING:
Expand All @@ -1301,7 +1301,7 @@ def check_cache_status(self, args):
self.pause()

# clear cpu caches
logger.info("[RL] start clearing caches")
logger.debug("[RL] start clearing caches")
logger.debug("[RL] start clearing cpu caches")
if self.num_cpu_blocks > 0 and envs.FD_ENABLE_SWAP_SPACE_CLEARING:
paddle.set_device("cpu")
Expand All @@ -1326,10 +1326,10 @@ def check_cache_status(self, args):
# clear gpu caches
logger.debug("[RL] start clearing gpu caches")
if args.create_cache_tensor:
logger.info("[RL] waiting for gpu runner to unlink cuda ipc")
logger.debug("[RL] waiting for gpu runner to unlink cuda ipc")
while self.cache_ready_signal.value[self.rank] != 0:
time.sleep(0.1)
logger.info("[RL] stop waiting! gpu runner has unlinked cuda ipc")
logger.debug("[RL] stop waiting! gpu runner has unlinked cuda ipc")
paddle.set_device(f"gpu:{self.device}")
self.gpu_cache_kvs.clear()
self.gpu_cache_k_tensors.clear()
Expand All @@ -1347,7 +1347,7 @@ def check_cache_status(self, args):

while np.sum(self.cache_ready_signal.value) != 0:
time.sleep(0.1)
logger.info("[RL] all ranks cleared caches!")
logger.debug("[RL] all ranks cleared caches!")

# reset kv_cache_status_signal
self.kv_cache_status_signal.value[0] = KVCacheStatus.CLEARED
Expand All @@ -1361,7 +1361,7 @@ def check_cache_status(self, args):
assert args.splitwise_role == "mixed", "Only mixed mode supports updating cache."
try:
# restore cpu cache
logger.info("[RL] start restoring caches")
logger.debug("[RL] start restoring caches")
logger.debug("[RL] start restoring cpu caches")
if self.num_cpu_blocks > 0 and envs.FD_ENABLE_SWAP_SPACE_CLEARING:
self._init_cpu_cache(args)
Expand All @@ -1382,12 +1382,12 @@ def check_cache_status(self, args):
version_file_path = os.path.join(args.model_path, "version.yaml")
assert os.path.exists(version_file_path), f"version.yaml not found at {version_file_path}"
self.key_prefix = get_key_prefix_from_version(version_file_path)
logger.info(f"Update key_prefix of cache storage to {self.key_prefix}")
logger.debug(f"Update key_prefix of cache storage to {self.key_prefix}")

# wait for all ranks caches to be ready
while np.sum(self.cache_ready_signal.value) != args.mp_num:
time.sleep(0.1)
logger.info("[RL] all ranks restored caches!")
logger.debug("[RL] all ranks restored caches!")

# resume transfer
self.resume()
Expand Down Expand Up @@ -1430,7 +1430,7 @@ def _log_memory(self, context: str):
curr_alloc = paddle.device.cuda.memory_allocated() / (1024**3)
curr_reserved = paddle.device.cuda.memory_reserved() / (1024**3)

logger.warning(
logger.debug(
f"GPU memory usage {context}:"
f"max_allocated: {max_alloc:.2f}GB "
f"max_reserved: {max_reserved:.2f}GB "
Expand Down
8 changes: 3 additions & 5 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def launch_cache_manager(
+ f" --write_policy {cache_config.write_policy}"
+ f" --max_model_len {self.config.model_config.max_model_len}"
+ f" --model_path {self.config.model_config.model}"
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
+ f" >>{log_dir}/cache_manager.log 2>&1"
)
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
Expand All @@ -346,10 +346,8 @@ def launch_cache_manager(
exit_code = cache_manager_processes[-1].poll()
if exit_code is None:
logger.info("Launch cache transfer manager successful")
else:
logger.info(
"Launch cache transfer manager failed, see launch_cache_transfer_manager.log for more information"
)
else:
logger.info("Launch cache transfer manager failed, see cache_manager.log for more information")

# Start additional threads
if cache_config.kvcache_storage_backend or self.num_cpu_blocks > 0:
Expand Down
65 changes: 26 additions & 39 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
from fastdeploy.utils import (
ceil_div,
check_unified_ckpt,
get_console_only_logger,
get_host_ip,
get_logger,
parse_ports,
print_config_dict,
)

logger = get_logger("config", "config.log")
logger = get_console_only_logger("config")

TaskOption = Literal["auto", "generate", "embedding", "embed"]

Expand Down Expand Up @@ -596,10 +597,7 @@ def print(self):
"""
Print all configuration information.
"""
logger.info("Model Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
print_config_dict("Model Configuration Information", self.__dict__)


class ParallelConfig:
Expand Down Expand Up @@ -707,10 +705,7 @@ def print(self):
print all config

"""
logger.info("Parallel Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
print_config_dict("Parallel Configuration Information", self.__dict__)


class SpeculativeConfig:
Expand Down Expand Up @@ -835,10 +830,7 @@ def print(self):
print all config

"""
logger.info("Speculative Decoding Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
print_config_dict("Speculative Decoding Configuration Information", self.__dict__)

def check_legality_parameters(
self,
Expand Down Expand Up @@ -1338,10 +1330,7 @@ def print(self):
"""
Print all configuration information.
"""
logger.info("EPLB Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
print_config_dict("EPLB Configuration Information", self.__dict__)


class CacheConfig:
Expand Down Expand Up @@ -1515,10 +1504,7 @@ def print(self):
print all config

"""
logger.info("Cache Configuration Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
print_config_dict("Cache Configuration Information", self.__dict__)


class RouterConfig:
Expand Down Expand Up @@ -1594,10 +1580,7 @@ def print(self):
print all config

"""
logger.info("Fasedeploy Commit Information :")
for k, v in self.__dict__.items():
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
print_config_dict("Fasedeploy Commit Information", self.__dict__)


class StructuredOutputsConfig:
Expand Down Expand Up @@ -2120,23 +2103,27 @@ def print(self):
"""
print all config
"""
logger.info("=================== Configuration Information ===============")
config_dict = {}
for k, v in self.__dict__.items():
if k == "generation_config" and v is not None:
for gck, gcv in v.to_dict().items():
logger.info("{:<20}:{:<6}{}".format(gck, "", gcv))
elif (
k == "cache_config"
or k == "model_config"
or k == "scheduler_config"
or k == "parallel_config"
or k == "commit_config"
config_dict[k] = v.to_dict()
elif k in (
"cache_config",
"model_config",
"scheduler_config",
"parallel_config",
"commit_config",
"speculative_config",
"graph_opt_config",
"load_config",
"eplb_config",
"structured_outputs_config",
):
if v is not None:
v.print()
config_dict[k] = v.__dict__ if v is not None else None
else:
logger.info("{:<20}:{:<6}{}".format(k, "", v))
logger.info("=============================================================")
config_dict[k] = v

print_config_dict("Configuration Information", config_dict)

def init_cache_info(self):
"""
Expand Down
Loading
Loading