Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions fastdeploy/spec_decode/mtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ def _initialize_forward_meta(self, step_use_cudagraph: bool = False, is_dummy_ru
step_use_cudagraph and self.draft_model_use_cudagraph and not (substep > 0 and is_dummy_run)
)

def _initialize_forward_meta_xpu(self):
def _initialize_forward_meta_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = False, substep: int = 0):

self.forward_meta.decoder_batch_ids = (self.model_inputs["decoder_batch_ids"],)
self.forward_meta.decoder_tile_ids_per_batch = (self.model_inputs["decoder_tile_ids_per_batch"],)
Expand All @@ -719,6 +719,14 @@ def _initialize_forward_meta_xpu(self):
# Initialize attention meta data
for attn_backend in self.attn_backends:
attn_backend.init_attention_metadata(self.forward_meta)

# Notes(liuzichang):
Comment on lines 698 to +723
# 1. CUDA Graph capture sizes must be recorded in descending order (large → small).
# 2. In multi-step execution, only the first step should be captured.
self.forward_meta.step_use_cudagraph = (
step_use_cudagraph and self.draft_model_use_cudagraph and not (substep > 0 and is_dummy_run)
)


def exist_prefill(self):
"""
Expand Down Expand Up @@ -1087,7 +1095,8 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa
self.model_inputs["draft_tokens"],
self.model_inputs["seq_lens_encoder"],
self.model_inputs["seq_lens_decoder"],
num_speculative_tokens=self.speculative_config.num_speculative_tokens,
# num_speculative_tokens=self.speculative_config.num_speculative_tokens,
num_speculative_tokens=0,
Comment on lines +1098 to +1099
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 num_speculative_tokens 被硬编码为 0,原始动态值被注释掉。

update_model_inputs 接收到 num_speculative_tokens=0 后,依赖该字段的注意力 mask 构造或 token 计数逻辑可能产生错误结果,相当于在推理阶段也禁用了投机 token 数量感知。若这是 XPU 投机解码的特殊需求(如 attention mask 生成方式不同),请补充注释说明原因;若为调试遗留,请恢复为 self.speculative_config.num_speculative_tokens

)

if self.enable_mm:
Expand All @@ -1107,7 +1116,10 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa
)
self.model_inputs["attn_mask_offsets"].copy_(attn_mask_offsets, False)

self._initialize_forward_meta_xpu()
self._initialize_forward_meta_xpu(step_use_cudagraph=step_use_cudagraph, is_dummy_run=is_dummy_run, substep=substep)

# Padding inputs for cuda graph
self.padding_cudagraph_inputs()
# Get sampling metadata
self.sampling_metadata = SamplingMetadata(
temperature=self.model_inputs["temperature"],
Expand Down Expand Up @@ -1136,6 +1148,9 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa
previous_hidden_states=self.model_inputs["target_hidden_states"],
forward_meta=self.forward_meta,
)

if self.forward_meta.step_use_cudagraph:
model_output = model_output[: self.real_token_num]
hidden_states = xpu_process_output(model_output, self.forward_meta, self.model_inputs)
# 4. Compute logits, Sample
logits = self.model.compute_logits(hidden_states, forward_meta=self.forward_meta)
Expand Down
68 changes: 35 additions & 33 deletions fastdeploy/worker/xpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,23 +1141,7 @@ def _prepare_inputs(self, is_dummy_run=False) -> None:
if self.use_cudagraph:
# Update Batch type for cuda graph for only_decode_batch
if_only_decode = self.only_decode()

only_decode_use_cudagraph = self.use_cudagraph and if_only_decode
# Update config about moe for better performance
# TODO(wanglongzhi):Modifying the config at runtime is not appropriate; it needs to be moved to forward_meta. It will be used in MoEMethodBase.apply()
if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed":
self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill"
if self.speculative_decoding:
self.proposer.fd_config.parallel_config.moe_phase.phase = "decode" if if_only_decode else "prefill"

# Update Batch type for cuda graph for only_prefill_batch
only_prefill_use_cudagraph = self.use_cudagraph and self.cudagraph_only_prefill and self.only_prefill()

self.forward_meta.step_use_cudagraph = (
only_prefill_use_cudagraph
if self.cudagraph_only_prefill
else only_decode_use_cudagraph and self.forward_meta.ids_remove_padding.shape[0] > 0
)
self.forward_meta.step_use_cudagraph = self.use_cudagraph and if_only_decode and self.forward_meta.ids_remove_padding.shape[0] > 0
Comment on lines 1141 to +1144
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 cudagraph_only_prefill 路径被静默移除。

原代码支持:

self.forward_meta.step_use_cudagraph = (
    only_prefill_use_cudagraph
    if self.cudagraph_only_prefill
    else only_decode_use_cudagraph and ...
)

新代码只保留了 decode-only 路径,cudagraph_only_prefill=True 的配置将无法触发 cudagraph。请确认 XPU 平台是否已不再使用该配置,若是请同步更新文档或配置注释。


# Update bad tokens len
max_bad_tokens_len = paddle.max(self.share_inputs["bad_tokens_len"])
Expand All @@ -1169,8 +1153,8 @@ def _prepare_inputs(self, is_dummy_run=False) -> None:
self.forward_meta.kv_signal_sender = self.share_inputs["kv_signal_sender"]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 envs.FD_XPU_ENABLE_MIXED_EP_MODE 环境变量守卫被移除。

原条件:splitwise_role == "mixed" and envs.FD_XPU_ENABLE_MIXED_EP_MODE
新条件:use_ep and splitwise_role == "mixed"

这意味着在 use_ep=True + splitwise_role=mixed 的场景下,即使 FD_XPU_ENABLE_MIXED_EP_MODE=False,MoE phase 切换逻辑也会生效,可能改变已有部署的行为。此变更是否为有意为之?建议在 PR 描述中说明。

if (
self.fd_config.scheduler_config.splitwise_role == "mixed" and envs.FD_XPU_ENABLE_MIXED_EP_MODE
): # Centralized scenario: the phase is initialized as "prefill" by default. During inference runtime, different types of batches can achieve phase switching at this point.
self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed"
):
Comment on lines +1156 to +1157
if_only_decode = self.only_decode()
self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill"
Comment on lines +1157 to 1159

Expand Down Expand Up @@ -1506,14 +1490,28 @@ def capture_model(self) -> None:
capture_sizes = self.cudagraph_capture_sizes.copy()

try:
for batch_size in sorted(capture_sizes, reverse=True):
self._dummy_run(
num_tokens=self.scheduler_config.max_num_batched_tokens,
batch_size=batch_size,
expected_decode_len=expected_decode_len,
in_capturing=True,
)
logger.info(f"Warm up the model with the batch size:{batch_size}, num tokens:{expected_decode_len}")
if self.speculative_decoding and self.spec_method in [SpecMethod.MTP, SpecMethod.SUFFIX]:
for capture_size in sorted(capture_sizes, reverse=True):
expected_decode_len = (self.speculative_config.num_speculative_tokens + 1) * 2
self._dummy_run(
num_tokens=self.fd_config.get_max_chunk_tokens(),
batch_size=int(capture_size / (self.speculative_config.num_speculative_tokens + 1)),
in_capturing=True,
expected_decode_len=expected_decode_len,
# accept_all_drafts=True,
)
logger.info(
f"Warm up the model with the num_tokens:{capture_size}, expected_decode_len:{expected_decode_len}"
Comment on lines +1496 to +1504
)
else:
for batch_size in sorted(capture_sizes, reverse=True):
Comment on lines 1492 to +1507
self._dummy_run(
num_tokens=self.scheduler_config.max_num_batched_tokens,
batch_size=batch_size,
expected_decode_len=expected_decode_len,
in_capturing=True,
)
logger.info(f"Warm up the model with the batch size:{batch_size}, num tokens:{expected_decode_len}")
except RuntimeError as e:
if "out of memory" in str(e):
raise RuntimeError(
Expand Down Expand Up @@ -1562,11 +1560,12 @@ class at the server level, which is too granular for ModelRunner.
self.share_inputs["kv_signal_sender"] = sender
# 1. Prepare inputs of model and decoder.
self._prepare_inputs(is_dummy_run=is_dummy_run)

# 2. Padding inputs for cuda graph
self.padding_cudagraph_inputs()
if is_dummy_run:
self.forward_meta.step_use_cudagraph = in_capturing and self.forward_meta.step_use_cudagraph
# 2. Padding inputs for cuda grph
self.padding_cudagraph_inputs()
else:
self.forward_meta.step_use_cudagraph = self.forward_meta.step_use_cudagraph and self.real_token_num <= self.fd_config.graph_opt_config.max_capture_size

# NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state.
# This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode,
Expand All @@ -1575,8 +1574,6 @@ class at the server level, which is too granular for ModelRunner.
self._execute_empty_input(self.forward_meta)
return None

# 2. Padding inputs for cuda grph

model_inputs = {}
model_inputs["ids_remove_padding"] = self.share_inputs["ids_remove_padding"]
if self.enable_mm:
Expand Down Expand Up @@ -1696,7 +1693,12 @@ class at the server level, which is too granular for ModelRunner.
# 6. Draft model propose
if self.speculative_decoding and self.proposer is not None:
if self.spec_method == SpecMethod.MTP:
self.proposer.run(full_hidden_states=model_output)
self.proposer.run(
full_hidden_states=model_output,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 残留 debug 注释含拼写错误,建议合入前清理。

# tep_use_cudagraph=False,tep_use_cudagraph 应为 step_use_cudagraph(缺少首字母 s)。同样,capture_model 中的 # accept_all_drafts=True, 也是残留的调试注释,若不需要请一并移除。

step_use_cudagraph=self.forward_meta.step_use_cudagraph,
# tep_use_cudagraph=False,
is_dummy_run=is_dummy_run,
)
else:
self.proposer.run(share_inputs=self.share_inputs)

Expand Down