[XPU] add build_sampling_params op.#7738
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 为 XPU 后端新增 build_sampling_params 自定义算子,用 XPU kernel 替换原先 Python 的 sampling 参数 padding 逻辑,并尝试将 infer_seed 的更新收敛到算子内部,以对齐 GPU 的 seed 步进策略(尤其在 speculative decoding 场景)。
Changes:
- 新增 XPU
build_sampling_paramskernel + plugin wrapper + Paddle static op,并在 XPU speculative verify(TARGET_MATCH)路径中接入。 - XPU ModelRunner 侧引入
increment_value(对齐 GPU:非 speculative 为 4,speculative 为(num_speculative_tokens+1)*4),并调整infer_seed的更新时机。 - 新增
custom_ops/xpu_ops/test/test_build_sampling_params.py单测,对比 Python 参考实现并覆盖多种 batch 形态与 wrap-around。
PR 元信息检查(需补充)
- 标题已包含
[XPU]tag,格式符合要求。 - 描述中 “Modifications / Usage or Command / Accuracy Tests” 等小节未补全;若该算子会影响采样结果或可复现性,建议补充 accuracy 对比与对应运行命令/环境信息;如不加单测或无法跑到 XPU CI,也需注明原因(本 PR 已新增单测文件,但仍建议在描述里给出如何运行)。
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/worker/xpu_model_runner.py | 计算并下发 increment_value,并调整 speculative 场景下 infer_seed 的更新逻辑 |
| fastdeploy/model_executor/layers/sample/sampler.py | XPU verify(TARGET_MATCH) 路径改用 build_sampling_params,并透传 increment_value |
| custom_ops/xpu_ops/test/test_build_sampling_params.py | 新增 XPU op 单测,与 Python 参考实现对齐校验 |
| custom_ops/xpu_ops/src/plugin/src/wrapper/mtp_wrapper/build_sampling_params.cpp | 新增 plugin wrapper(CPU + XPU3 分发) |
| custom_ops/xpu_ops/src/plugin/src/kernel/kunlun3cpp/mtp_kernel/build_sampling_params.xpu | 新增 Kunlun3 XPU kernel 实现 |
| custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h | 导出 build_sampling_params 声明 |
| custom_ops/xpu_ops/src/ops/mtp/build_sampling_params.cc | 新增 Paddle static op 注册与调用桥接 |
| # 7. Updata 'infer_seed' and step_paddle() | ||
| self.share_inputs["infer_seed"].add_(self.infer_seed_increment) | ||
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED | ||
| if not self.speculative_decoding: |
| share_inputs["seq_lens_this_time"], | ||
| share_inputs["seq_lens_encoder"], | ||
| token_num_output_cpu=int(share_inputs["cu_seqlens_q_output"][-1]), | ||
| increment_value=increment_value, |
| api::Context* ctx = xpu_ctx->x_context(); | ||
| if (top_p.is_cpu()) { | ||
| ctx = new api::Context(api::kCPU); |
| // Shared prefix-sum buffer: each cluster computes its own pad_start via | ||
| // a two-pass scan over seq_lens_this_time / seq_lens_encoder. | ||
| // We use a simple approach: core 0 of cluster 0 writes per-batch start | ||
| // offsets into a global scratch area is not available here, so instead we | ||
| // compute pad_start with a sequential scan in core 0 of each cluster. | ||
| // Because clusters run concurrently we cannot share a global accumulator; | ||
| // instead each cluster independently sums the first `bi` entries. | ||
| // This is O(bs) per cluster but bs is typically small (<=512). |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览CI 尚有 1 个 Required 任务失败,1 个 Required 任务运行中,1 个 Required 任务等待中,合并暂时受阻,请关注
2 任务状态汇总2.1 Required任务 : 7/10 通过
2.2 可选任务 — 22/26 通过
3 失败详情(仅 required)Approval — 审批流程(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请FD RD和Paddle RD各一人在PR页面点击Approve 链接: 查看日志 |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #7738 +/- ##
==========================================
Coverage ? 71.61%
==========================================
Files ? 396
Lines ? 55702
Branches ? 8709
==========================================
Hits ? 39891
Misses ? 13070
Partials ? 2741
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| top_k=sampling_metadata.top_k, | ||
| top_k_list=sampling_metadata.top_k_list, | ||
| topp_seed=topp_seed, | ||
| topp_seed=sampling_metadata.topp_seed, |
| self.increment_value = ( | ||
| 4 if not self.speculative_decoding else (self.speculative_config.num_speculative_tokens + 1) * 4 | ||
| ) |
| # 7. Updata 'infer_seed' and step_paddle() | ||
| self.share_inputs["infer_seed"].add_(self.infer_seed_increment) | ||
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED | ||
| if not self.speculative_decoding: |
| // Shared prefix-sum buffer: each cluster computes its own pad_start via | ||
| // a two-pass scan over seq_lens_this_time / seq_lens_encoder. | ||
| // We use a simple approach: core 0 of cluster 0 writes per-batch start | ||
| // offsets into a global scratch area is not available here, so instead we | ||
| // compute pad_start with a sequential scan in core 0 of each cluster. | ||
| // Because clusters run concurrently we cannot share a global accumulator; | ||
| // instead each cluster independently sums the first `bi` entries. | ||
| // This is O(bs) per cluster but bs is typically small (<=512). | ||
|
|
||
| for (int bi = clusterid; bi < bs; bi += nclusters) { | ||
| if (cid == 0) { | ||
| // Read per-batch parameters from global memory. | ||
| float lm_top_p; | ||
| int64_t lm_top_k; | ||
| int64_t lm_seed; | ||
| int lm_slt; // seq_lens_this_time[bi] | ||
| int lm_sle; // seq_lens_encoder[bi] | ||
|
|
||
| GM2LM_ASYNC(top_p + bi, &lm_top_p, sizeof(float)); | ||
| GM2LM_ASYNC(top_k + bi, &lm_top_k, sizeof(int64_t)); | ||
| GM2LM_ASYNC(infer_seed + bi, &lm_seed, sizeof(int64_t)); | ||
| GM2LM_ASYNC(seq_lens_this_time + bi, &lm_slt, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + bi, &lm_sle, sizeof(int)); // sync barrier | ||
|
|
||
| bool is_decoder = (lm_sle == 0); | ||
| int repeat = is_decoder ? lm_slt : 1; | ||
|
|
||
| // Compute pad_start = sum of token counts for batches [0, bi). | ||
| int pad_start = 0; | ||
| for (int k = 0; k < bi; k++) { | ||
| int slt_k, sle_k; | ||
| GM2LM_ASYNC(seq_lens_this_time + k, &slt_k, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + k, &sle_k, sizeof(int)); | ||
| pad_start += (sle_k == 0) ? slt_k : 1; | ||
| } |
| RequestFuncOutput(no=2347, request_id='None', generated_text='', reasoning_content='', success=False, latency=0.0, end_timestamp=0.0, output_tokens=0, ttft=0.0, arrival_time=[], itl=[], tpot=0.0, prompt_len=0, prompt_tokens=0, reasoning_tokens=0, res_ttft=0, error='{"error":{"message":"request[chatcmpl-814e8d96-3da8-46b0-b4da-31925c313041] generator error: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192), Traceback (most recent call last):\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/openai/serving_chat.py\\", line 168, in create_chat_completion\\n prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 300, in format_and_add_data\\n await self.add_requests(request)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 390, in add_requests\\n raise EngineError(error_msg, error_code=400)\\nfastdeploy.utils.EngineError: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192)\\n","type":"invalid_request_error","param":null,"code":null}}', metrics={}, tool_calls=[], output_ids=[]) | ||
| RequestFuncOutput(no=2347, request_id='None', generated_text='', reasoning_content='', success=False, latency=0.0, end_timestamp=0.0, output_tokens=0, ttft=0.0, arrival_time=[], itl=[], tpot=0.0, prompt_len=0, prompt_tokens=0, reasoning_tokens=0, res_ttft=0, error='{"error":{"message":"request[chatcmpl-799cdf97-ab7e-4823-80e4-1833bf5f7d90] generator error: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192), Traceback (most recent call last):\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/openai/serving_chat.py\\", line 168, in create_chat_completion\\n prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 300, in format_and_add_data\\n await self.add_requests(request)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 390, in add_requests\\n raise EngineError(error_msg, error_code=400)\\nfastdeploy.utils.EngineError: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192)\\n","type":"invalid_request_error","param":null,"code":null}}', metrics={}, tool_calls=[], output_ids=[]) |
651d7cb to
cfc5936
Compare
| _, next_tokens = top_k_top_p_sampling( | ||
| probs, | ||
| top_p=top_p, | ||
| top_k=top_k, | ||
| top_p=sampling_metadata.top_p, | ||
| top_k=sampling_metadata.top_k, | ||
| top_k_list=sampling_metadata.top_k_list, | ||
| topp_seed=topp_seed, | ||
| topp_seed=sampling_metadata.topp_seed, | ||
| ) |
| sampling_metadata.seed, | ||
| paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]), | ||
| paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), | ||
| share_inputs["seq_lens_this_time"], | ||
| share_inputs["seq_lens_encoder"], | ||
| token_num_output_cpu=int(share_inputs["cu_seqlens_q_output"][-1]), | ||
| increment_value=increment_value, | ||
| ) |
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED | ||
| if not self.speculative_decoding: | ||
| self.share_inputs["infer_seed"].add_(self.infer_seed_increment) | ||
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED |
| int64_t pad_idx = 0; | ||
| for (int bi = 0; bi < bs; bi++) { | ||
| bool is_decoder = (seq_lens_encoder[bi] == 0); | ||
| int repeat = is_decoder ? seq_lens_this_time[bi] : 1; | ||
| int64_t bi_seed = infer_seed[bi]; | ||
| for (int local_pos = 0; local_pos < repeat; local_pos++) { | ||
| int64_t offset = is_decoder ? static_cast<int64_t>(local_pos) * 4 : 0LL; | ||
| top_p_padding[pad_idx] = top_p[bi]; | ||
| top_k_padding[pad_idx] = top_k[bi]; | ||
| topp_seed[pad_idx] = (bi_seed + offset) % BUILD_SAMPLING_MAX_INFER_SEED; | ||
| pad_idx++; | ||
| } | ||
| infer_seed[bi] = | ||
| (infer_seed[bi] + increment_value) % BUILD_SAMPLING_MAX_INFER_SEED; | ||
| } |
| // Shared prefix-sum buffer: each cluster computes its own pad_start via | ||
| // a two-pass scan over seq_lens_this_time / seq_lens_encoder. | ||
| // We use a simple approach: core 0 of cluster 0 writes per-batch start | ||
| // offsets into a global scratch area is not available here, so instead we | ||
| // compute pad_start with a sequential scan in core 0 of each cluster. | ||
| // Because clusters run concurrently we cannot share a global accumulator; | ||
| // instead each cluster independently sums the first `bi` entries. | ||
| // This is O(bs) per cluster but bs is typically small (<=512). | ||
|
|
||
| for (int bi = clusterid; bi < bs; bi += nclusters) { | ||
| if (cid == 0) { | ||
| // Read per-batch parameters from global memory. | ||
| float lm_top_p; | ||
| int64_t lm_top_k; | ||
| int64_t lm_seed; | ||
| int lm_slt; // seq_lens_this_time[bi] | ||
| int lm_sle; // seq_lens_encoder[bi] | ||
|
|
||
| GM2LM_ASYNC(top_p + bi, &lm_top_p, sizeof(float)); | ||
| GM2LM_ASYNC(top_k + bi, &lm_top_k, sizeof(int64_t)); | ||
| GM2LM_ASYNC(infer_seed + bi, &lm_seed, sizeof(int64_t)); | ||
| GM2LM_ASYNC(seq_lens_this_time + bi, &lm_slt, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + bi, &lm_sle, sizeof(int)); // sync barrier | ||
|
|
||
| bool is_decoder = (lm_sle == 0); | ||
| int repeat = is_decoder ? lm_slt : 1; | ||
|
|
||
| // Compute pad_start = sum of token counts for batches [0, bi). | ||
| int pad_start = 0; | ||
| for (int k = 0; k < bi; k++) { | ||
| int slt_k, sle_k; | ||
| GM2LM_ASYNC(seq_lens_this_time + k, &slt_k, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + k, &sle_k, sizeof(int)); | ||
| pad_start += (sle_k == 0) ? slt_k : 1; | ||
| } |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-11 15:46:02
📋 Review 摘要
PR 概述:将 XPU 下 padding_sampling_params Python 实现替换为 XPU kernel(build_sampling_params),同时将 infer_seed 更新逻辑内化到 kernel,并对齐 GPU 的 increment_value 步进策略。
变更范围:custom_ops/xpu_ops/、fastdeploy/model_executor/layers/sample/sampler.py、fastdeploy/worker/xpu_model_runner.py
影响面 Tag:[XPU] [OP]
📝 PR 规范检查
## Modifications 和 ## Usage or Command 两个 section 内容为空(仅模板注释),Checklist 全部未勾选。
标题建议(可直接复制):
[XPU][OP] Add build_sampling_params XPU kernel to replace Python padding_sampling_params
PR 描述建议(可直接复制,必须复刻 checklist §D2 模板的完整结构):
## Motivation
将XPU下的padding_sampling_params的py实现改为XPU kernel实现build_sampling_params,此外将infer_seed更新收敛到build_sampling_params内部,并将infer_seed的increment_value步进对齐GPU实现。
## Modifications
- 新增 `custom_ops/xpu_ops/src/ops/mtp/build_sampling_params.cc`:Paddle 自定义 Op 注册(`PD_BUILD_STATIC_OP`),声明输入/输出/属性
- 新增 `custom_ops/xpu_ops/src/plugin/src/kernel/kunlun3cpp/mtp_kernel/build_sampling_params.xpu`:XPU3 kernel,每个 cluster 处理一个 batch item,并行填充 top_p/top_k/topp_seed,同时原地更新 infer_seed
- 新增 `custom_ops/xpu_ops/src/plugin/src/wrapper/mtp_wrapper/build_sampling_params.cpp`:CPU/XPU3 双路 wrapper,含 CPU reference 实现
- 更新 `custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h`:新增 `build_sampling_params` 函数声明
- 更新 `fastdeploy/model_executor/layers/sample/sampler.py`:`_verify_and_sample_xpu` 改用 XPU kernel;`_normal_sample_xpu` 去掉 `padding_sampling_params`;`forward_xpu` 新增 `increment_value` 参数
- 更新 `fastdeploy/worker/xpu_model_runner.py`:动态计算 `increment_value`;推测解码模式下 seed 更新移交 kernel 内部
- 新增 `custom_ops/xpu_ops/test/test_build_sampling_params.py`:6 个单测覆盖纯 decoder、纯 encoder、混合、单 item、seed 溢出、每批单 token 场景
## Usage or Command
N/A(内部 XPU kernel 替换,不涉及对外接口变更)
## Accuracy Tests
测试XPU kernel内INT64取模正常(见 PR 图片)
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [x] Add unit tests. Please write the reason in this PR if no unit tests.
- [x] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 📝 PR 规范 | — | ## Modifications 和 ## Usage or Command section 内容为空,Checklist 全部未勾选 |
| 🟡 建议 | fastdeploy/worker/xpu_model_runner.py:171 |
推测解码模式下 infer_seed_increment 已不再使用,但仍以 speculative increment_value 填充,易产生误解 |
总体评价
实现整体设计合理,XPU3 kernel 与 CPU wrapper 语义一致,infer_seed 原地更新和 seed 偏移逻辑正确对齐 GPU 路径;单测覆盖了 6 种典型场景(纯 decoder/encoder、混合、seed 溢出等)。编译系统通过 GLOB_RECURSE/os.walk 自动发现新文件,无需手动注册。建议补充 PR 描述的 Modifications 和 Usage 内容。
|
|
||
| self.increment_value = ( | ||
| 4 if not self.speculative_decoding else (self.speculative_config.num_speculative_tokens + 1) * 4 | ||
| ) |
There was a problem hiding this comment.
🟡 建议 infer_seed_increment 在推测解码模式下实际不再使用(seed 更新已移入 build_sampling_params kernel),但此处仍以 speculative 的 increment_value 填充该张量,容易产生误解。
建议在推测解码模式下将该张量的 fill_value 保持为 4(非推测步进),或者添加注释说明其仅在非推测模式下生效:
# infer_seed_increment is only used in non-speculative mode;
# in speculative mode, the seed update is handled inside build_sampling_params kernel.
self.infer_seed_increment = paddle.full(
shape=[self.scheduler_config.max_num_seqs, 1],
fill_value=4, # always 4; speculative mode updates seed in-kernel
dtype="int64",
).cpu()
Motivation
将XPU下的padding_sampling_params的py实现改为XPU kernel实现build_sampling_params,此外将infer_seed更新收敛到build_sampling_params内部,并将infer_seed的increment_value步进对齐GPU实现。
Modifications
Usage or Command
Accuracy Tests
测试XPU kernel内INT64取模正常

Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.