[RL] pause: use abort pipeline with scheduling loop alive for gracefu…#7753
[RL] pause: use abort pipeline with scheduling loop alive for gracefu…#7753jackyYang6 wants to merge 2 commits into
Conversation
|
Thanks for your contribution! |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览所有 Required 任务通过(本 PR 无 Required 任务配置),有 1 个可选任务失败,不阻塞合并。
2 任务状态汇总2.1 Required 任务:0/0 通过
2.2 可选任务 — 1/2 通过
3 失败详情(仅 required)无 required 失败任务。 |
…l termination Replace the old preempted_all + error_response approach in _control_pause with a two-phase design: Phase 1: Block new requests via _rejecting_new_requests (NOT is_paused) - Scheduling loop keeps running so _trigger_abort can process - add_abort_req_ids(ALL) marks all requests for abort - Scheduling loop catches them via _trigger_abort as they cycle through Phase 2: After drain, set is_paused=True to fully stop scheduling loop - Handle scheduler-only stragglers with direct _send_error_response - Wait for output queue empty, then reset Depends-on: PaddlePaddle#7615 (refact abort_requests to fire-and-forget)
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #7753 +/- ##
==========================================
Coverage ? 71.56%
==========================================
Files ? 396
Lines ? 55609
Branches ? 8694
==========================================
Hits ? 39797
Misses ? 13074
Partials ? 2738
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-11 17:22:12
📋 Review 摘要
PR 概述:RL 场景 pause 流程重构 — 将请求拒绝(_rejecting_new_requests)与调度循环暂停(is_paused)解耦,通过 abort pipeline 优雅终止 inflight 请求,解决了死锁与 partial result 丢失问题。
变更范围:engine/common_engine.py、engine/sched/resource_manager_v1.py、entrypoints/、router/router.py
影响面 Tag:[RL] [Engine] [Scheduler] [APIServer]
📝 PR 规范检查
标题 [RL] Tag 合规,描述结构完整(Motivation / Modifications / Usage or Command / Accuracy Tests / Checklist 均存在且有实质内容)。Checklist 中 Format your code, run pre-commit before commit 未勾选,请确认是否已运行 pre-commit 或补充说明跳过原因。
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | fastdeploy/engine/common_engine.py:1325 |
req_id=None 时警告后未 continue,仍调用 add_abort_req_ids(None) |
| 🟡 建议 | fastdeploy/engine/common_engine.py:1504 |
_wait_inflight_drained() 无超时保护,worker 崩溃时将永久 hang |
| ❓ 疑问 | fastdeploy/entrypoints/openai/api_server.py:500 |
/v1/abort_requests 响应格式由 JSON 改为空 200,属 Breaking Change |
总体评价
两阶段 pause 设计思路清晰,有效解决了死锁与 partial result 丢失问题,测试数据充分。有一处逻辑 Bug(req_id=None 路径未实际跳过)需修复后合入。
| self.llm_logger.warning( | ||
| "Receive abort request without request_id, skip invalid abort message" | ||
| ) | ||
| self.llm_logger.info(f"Receive abort request, req_id: {req_id}") |
There was a problem hiding this comment.
🔴 Bug req_id 为 None 时,警告后缺少 continue,导致仍调用 add_abort_req_ids(None)。
add_abort_req_ids 内部对非列表类型执行 waiting_abort_req_id_set.add(None),不会立即崩溃,但会在 abort set 中留下无效 None 条目,可能干扰后续 abort 流程。警告消息写的是 "skip invalid abort message",但实际上并没有 skip。
建议修复:
req_id = data.get("request_id", None)
if not req_id:
self.llm_logger.warning(
"Receive abort request without request_id, skip invalid abort message"
)
continue # 实际跳过,与警告语义一致
self.llm_logger.info(f"Receive abort request, req_id: {req_id}")
self.resource_manager.add_abort_req_ids(req_id)| No timeout — abort pipeline will complete. Aligned with SGLang's poll-until-drained. | ||
| """ | ||
| start_time = time.time() | ||
| while self.resource_manager.requests or self.scheduler.requests: |
There was a problem hiding this comment.
🟡 建议 _wait_inflight_drained() 无超时保护。
当前实现依赖 abort pipeline 必然完成的假设,但在以下场景中可能永久 block:
- GPU worker 进程异常退出未能回写 ack
- ZMQ 通信卡死
建议参照原 _control_pause 中 60s 超时的做法,添加可配置超时 + 告警兜底:
timeout = 60
start_time = time.time()
while self.resource_manager.requests or self.scheduler.requests:
time.sleep(0.005)
if time.time() - start_time > timeout:
self.llm_logger.error(
f"_wait_inflight_drained timeout after {timeout}s, forcing pause"
)
break| control_response = await app.state.engine_client.run_control_method(control_request) | ||
| return control_response.to_api_json_response() | ||
| await app.state.engine_client.abort_reqs(req_ids=req_ids or [], abort_all=abort_all) | ||
| return Response(status_code=200) |
There was a problem hiding this comment.
❓ 疑问 /v1/abort_requests 响应格式由 {"request_id": ..., "status": ..., "result": {"aborted": [...], "not_found": [...]}} 改为空 HTTP 200,属 Breaking Change。
请确认:
- 是否有下游(RL 框架、监控脚本、集成测试)依赖旧的 JSON 响应体?
- 是否需要在 CHANGELOG 或 API 文档中标注此变更?
Depends-on: #7615 (refact abort_requests to fire-and-forget)
Motivation
In RL scenarios, the upstream framework calls
abort_requestfollowed bypauseto stop the engine. The old_control_pauseimplementation had two critical issues:Lost partial results:
preempted_all()+_send_error_response(500)discarded already-inferred tokens, returning error instead of partial results to clients.Deadlock with abort pipeline: Setting
is_paused=Trueat the start blocked the scheduling loop (_pause_cond.wait_for), which prevented_trigger_abortfrom processing abort requests — causing a 30s timeout deadlock.The new design separates "reject new requests" (
_rejecting_new_requests) from "pause scheduling loop" (is_paused), allowing the abort pipeline to complete naturally before engine state reset. This ensures partial inference results are returned to clients viatoken_processor._put_abort_results(200 "Aborted") through the normal output path.Modifications
fastdeploy/engine/common_engine.pyself._rejecting_new_requests = False__init__to decouple request rejection from scheduling loop pauseif self.is_paused or self._rejecting_new_requests:_control_pause()rewrite_wait_inflight_drained()resource_manager.requestsis empty (no timeout, aligned with SGLang)_wait_output_queue_empty()Execution flow
Usage or Command
Accuracy Tests
Functional verification with 20 concurrent streaming requests (max_num_seqs=8):
_rejecting_new_requestswindow rejectionChecklist
[RL],[Engine]pre-commitbefore commit.test_control_pause_and_resume_paths)releasebranch, make sure the PR has been submitted to thedevelopbranch.