Revert "[PD] prepare request in prefill instance by multi threads"#7821
Revert "[PD] prepare request in prefill instance by multi threads"#7821Jiang-Jia-Jun wants to merge 1 commit into
Conversation
This reverts commit 4e7a46e.
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
本 PR 回滚了 #7724("prepare request in prefill instance by multi threads"),将相关功能恢复到此前实现:移除 EngineServicePrepareMixin 抽象、删除 FD_PREFILL_PREPARE_REQ_THREAD_NUM 环境变量,并恢复使用 ThreadPoolExecutor(max_workers=1) 的单线程 _fetch_request 路径。同时恢复 engine_worker_queue 中 finished_add_cache_task 系列队列/锁/flag 与 cache_messager 中 put_finished_add_cache_task_req、_maybe_wait_for_cache_task 移除等行为,并同步更新单元测试。
Changes:
- 删除
fastdeploy/engine/common_engine_prepare_mixin.py,将单线程 fetch 流程内联回EngineService._schedule_request_to_worker_v1。 - 在
engine_worker_queue.py中重新引入finished_add_cache_task_*相关共享资源和put/get_finished_add_cache_task_req接口;cache_messager恢复以 barrier + 队列方式上报已添加的 cache task。 - 移除
FD_PREFILL_PREPARE_REQ_THREAD_NUM环境变量,并同步调整tests/engine/test_common_engine.py、tests/inter_communicator/test_e2w_queue.py、tests/cache_manager/test_cache_messager.py。
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/engine/common_engine.py | 移除 mixin 继承,将 _fetch_request 内联回 _schedule_request_to_worker_v1,恢复单 ThreadPoolExecutor 方式 |
| fastdeploy/engine/common_engine_prepare_mixin.py | 整体删除该文件(多线程 prepare 实现回滚) |
| fastdeploy/envs.py | 删除 FD_PREFILL_PREPARE_REQ_THREAD_NUM 环境变量 |
| fastdeploy/inter_communicator/engine_worker_queue.py | 恢复 finished_add_cache_task 队列/锁/flag/barrier 及 put/get 接口 |
| fastdeploy/cache_manager/cache_messager.py | 恢复在 add_cache_task 后通过 put_finished_add_cache_task_req 通知,并删除 _maybe_wait_for_cache_task |
| tests/engine/test_common_engine.py | 测试恢复对内联 _schedule_request_to_worker_v1 与 ThreadPoolExecutor 的 patch 路径 |
| tests/inter_communicator/test_e2w_queue.py | 增加 finished_add_cache_task 队列流程的测试用例 |
| tests/cache_manager/test_cache_messager.py | 移除按文件路径动态导入逻辑,断言新增 finished_add_cache_task 上报行为 |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览
2 任务状态汇总2.1 Required任务 : 1/8 通过
2.2 可选任务 — 22/26 通过
3 失败详情(仅 required)Approval — 代码规范/审批检查(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请RD成员(jiangjiajun等)Approve,确认Cherry-Pick规范 关联变更: Revert PR中包含 |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-14 20:17:31
📋 Review 摘要
PR 概述:回滚 PR #7724(PD 分离场景下 prefill 实例多线程准备请求),改为在 _schedule_request_to_worker_v1 内用 ThreadPoolExecutor(max_workers=1) 内联方式执行,并以 barrier 替代原有的自旋等待。
变更范围:fastdeploy/engine/、fastdeploy/cache_manager/、fastdeploy/inter_communicator/
影响面 Tag:[PD Disaggregation] [Engine] [KVCache]
📝 PR 规范检查
PR 标题缺少官方 Tag,且目标分支为 release/2.6 应使用 Cherry-Pick 格式;PR 描述仅有一行 "Reverts #7724",缺少所有必填 section。
标题建议(可直接复制):
[Cherry-Pick][PD Disaggregation] Revert "prepare request in prefill instance by multi threads"(#7724)
PR 描述建议(可直接复制,必须复刻 checklist §D2 模板的完整结构):
## Motivation
Revert PR #7724 which introduced multi-threaded request preparation in the prefill instance. The original multi-thread approach caused stability issues that require rollback. This PR reverts to a `ThreadPoolExecutor(max_workers=1)` design while retaining barrier-based synchronization between the cache messager and engine.
## Modifications
- `fastdeploy/engine/common_engine.py`: Remove separate `_prepare_request_v1` thread; merge fetch-request logic inline into `_schedule_request_to_worker_v1` using `ThreadPoolExecutor(max_workers=1)`; remove `EngineServicePrepareMixin` dependency
- `fastdeploy/engine/common_engine_prepare_mixin.py`: Delete entire mixin file (282 lines)
- `fastdeploy/cache_manager/cache_messager.py`: Remove `_maybe_wait_for_cache_task` spin-wait; replace with `finish_add_cache_task_barrier`-based synchronization
- `fastdeploy/inter_communicator/engine_worker_queue.py`: Add `finished_add_cache_task_list`, related barriers/locks/flags; add `put_finished_add_cache_task_req` / `get_finished_add_cache_task_req` methods
- `fastdeploy/envs.py`: Remove `FD_PREFILL_PREPARE_REQ_THREAD_NUM` env variable
- Tests updated accordingly
## Usage or Command
N/A
## Accuracy Tests
N/A
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [ ] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [x] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 📝 PR 规范 | PR 标题 / 描述 | 标题缺少 Tag,描述缺少所有必填 section |
| ❓ 疑问 | engine_worker_queue.py:759 |
put_finished_add_cache_task_req 返回类型注解 -> None 但实际返回 bool |
| ❓ 疑问 | engine_worker_queue.py:779 |
get_finished_add_cache_task_req 返回类型注解 -> str 但实际返回 list |
| 🟡 建议 | common_engine.py:1008 |
need_check_req_ids 在 L1005 赋值后立刻被 L1008 覆盖,存在冗余赋值 |
总体评价
逻辑结构清晰,用 barrier 替换自旋等待的思路合理,测试也做了相应更新。除 PR 规范问题外,发现两处类型注解错误和一处冗余赋值,建议修复后合入。
| self.finish_send_cache_lock.release() | ||
| return response | ||
|
|
||
| def put_finished_add_cache_task_req(self, req_ids) -> None: |
There was a problem hiding this comment.
❓ 疑问 返回类型注解 -> None 与实际返回值不符。
该方法末尾 return all_client_put,实际返回 bool,但注解写的是 -> None,docstring 也未描述返回值。
建议修复:
def put_finished_add_cache_task_req(self, req_ids) -> bool:
"""
Put finished request ID into the queue.
Args:
req_ids: Request ID to be added to the queue
Returns:
bool: Whether all clients have put their finished requests
"""| self.finish_add_cache_task_lock.release() | ||
| return all_client_put | ||
|
|
||
| def get_finished_add_cache_task_req(self) -> str: |
There was a problem hiding this comment.
❓ 疑问 返回类型注解 -> str 与实际返回值不符。
该方法实际返回 list(初始化为 response = [],从列表中获取后也是列表类型),docstring 描述的也是 str: Finished request ID,三者不一致。
建议修复:
def get_finished_add_cache_task_req(self) -> list:
"""
Get finished request IDs from the queue.
Returns:
list: List of finished request IDs
"""| need_check_req_ids = [task.request_id for task in tasks] | ||
| self.split_connector.send_cache_info_to_messager(tasks, 0) | ||
| # ensure cache tasks has sent to cache_messager | ||
| need_check_req_ids = [task.request_id for task in tasks] |
There was a problem hiding this comment.
🟡 建议 need_check_req_ids 被连续赋值两次,第一次(L1005)赋值后立刻被第二次(L1008)覆盖,第一次赋值无意义。
# L1005: 此行赋值被 L1008 立刻覆盖,属于冗余代码
need_check_req_ids = [task.request_id for task in tasks]
self.split_connector.send_cache_info_to_messager(tasks, 0)
# ensure cache tasks has sent to cache_messager
need_check_req_ids = [task.request_id for task in tasks] # <-- 实际生效的赋值建议删除 L1005 的赋值,仅保留 L1008 处的赋值。
Reverts #7724