diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 109f5f9729c..0aa0931479c 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -695,7 +695,7 @@ def _initialize_forward_meta(self, step_use_cudagraph: bool = False, is_dummy_ru step_use_cudagraph and self.draft_model_use_cudagraph and not (substep > 0 and is_dummy_run) ) - def _initialize_forward_meta_xpu(self): + def _initialize_forward_meta_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = False, substep: int = 0): self.forward_meta.decoder_batch_ids = (self.model_inputs["decoder_batch_ids"],) self.forward_meta.decoder_tile_ids_per_batch = (self.model_inputs["decoder_tile_ids_per_batch"],) @@ -719,6 +719,14 @@ def _initialize_forward_meta_xpu(self): # Initialize attention meta data for attn_backend in self.attn_backends: attn_backend.init_attention_metadata(self.forward_meta) + + # Notes(liuzichang): + # 1. CUDA Graph capture sizes must be recorded in descending order (large → small). + # 2. In multi-step execution, only the first step should be captured. + self.forward_meta.step_use_cudagraph = ( + step_use_cudagraph and self.draft_model_use_cudagraph and not (substep > 0 and is_dummy_run) + ) + def exist_prefill(self): """ @@ -1087,7 +1095,8 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa self.model_inputs["draft_tokens"], self.model_inputs["seq_lens_encoder"], self.model_inputs["seq_lens_decoder"], - num_speculative_tokens=self.speculative_config.num_speculative_tokens, + # num_speculative_tokens=self.speculative_config.num_speculative_tokens, + num_speculative_tokens=0, ) if self.enable_mm: @@ -1107,7 +1116,10 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa ) self.model_inputs["attn_mask_offsets"].copy_(attn_mask_offsets, False) - self._initialize_forward_meta_xpu() + self._initialize_forward_meta_xpu(step_use_cudagraph=step_use_cudagraph, is_dummy_run=is_dummy_run, substep=substep) + + # Padding inputs for cuda graph + self.padding_cudagraph_inputs() # Get sampling metadata self.sampling_metadata = SamplingMetadata( temperature=self.model_inputs["temperature"], @@ -1136,6 +1148,9 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa previous_hidden_states=self.model_inputs["target_hidden_states"], forward_meta=self.forward_meta, ) + + if self.forward_meta.step_use_cudagraph: + model_output = model_output[: self.real_token_num] hidden_states = xpu_process_output(model_output, self.forward_meta, self.model_inputs) # 4. Compute logits, Sample logits = self.model.compute_logits(hidden_states, forward_meta=self.forward_meta) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index c6dacb44f83..474a4e497ea 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1141,23 +1141,7 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: if self.use_cudagraph: # Update Batch type for cuda graph for only_decode_batch if_only_decode = self.only_decode() - - only_decode_use_cudagraph = self.use_cudagraph and if_only_decode - # Update config about moe for better performance - # TODO(wanglongzhi):Modifying the config at runtime is not appropriate; it needs to be moved to forward_meta. It will be used in MoEMethodBase.apply() - if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": - self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" - if self.speculative_decoding: - self.proposer.fd_config.parallel_config.moe_phase.phase = "decode" if if_only_decode else "prefill" - - # Update Batch type for cuda graph for only_prefill_batch - only_prefill_use_cudagraph = self.use_cudagraph and self.cudagraph_only_prefill and self.only_prefill() - - self.forward_meta.step_use_cudagraph = ( - only_prefill_use_cudagraph - if self.cudagraph_only_prefill - else only_decode_use_cudagraph and self.forward_meta.ids_remove_padding.shape[0] > 0 - ) + self.forward_meta.step_use_cudagraph = self.use_cudagraph and if_only_decode and self.forward_meta.ids_remove_padding.shape[0] > 0 # Update bad tokens len max_bad_tokens_len = paddle.max(self.share_inputs["bad_tokens_len"]) @@ -1169,8 +1153,8 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: self.forward_meta.kv_signal_sender = self.share_inputs["kv_signal_sender"] if ( - self.fd_config.scheduler_config.splitwise_role == "mixed" and envs.FD_XPU_ENABLE_MIXED_EP_MODE - ): # Centralized scenario: the phase is initialized as "prefill" by default. During inference runtime, different types of batches can achieve phase switching at this point. + self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed" + ): if_only_decode = self.only_decode() self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" @@ -1506,14 +1490,28 @@ def capture_model(self) -> None: capture_sizes = self.cudagraph_capture_sizes.copy() try: - for batch_size in sorted(capture_sizes, reverse=True): - self._dummy_run( - num_tokens=self.scheduler_config.max_num_batched_tokens, - batch_size=batch_size, - expected_decode_len=expected_decode_len, - in_capturing=True, - ) - logger.info(f"Warm up the model with the batch size:{batch_size}, num tokens:{expected_decode_len}") + if self.speculative_decoding and self.spec_method in [SpecMethod.MTP, SpecMethod.SUFFIX]: + for capture_size in sorted(capture_sizes, reverse=True): + expected_decode_len = (self.speculative_config.num_speculative_tokens + 1) * 2 + self._dummy_run( + num_tokens=self.fd_config.get_max_chunk_tokens(), + batch_size=int(capture_size / (self.speculative_config.num_speculative_tokens + 1)), + in_capturing=True, + expected_decode_len=expected_decode_len, + # accept_all_drafts=True, + ) + logger.info( + f"Warm up the model with the num_tokens:{capture_size}, expected_decode_len:{expected_decode_len}" + ) + else: + for batch_size in sorted(capture_sizes, reverse=True): + self._dummy_run( + num_tokens=self.scheduler_config.max_num_batched_tokens, + batch_size=batch_size, + expected_decode_len=expected_decode_len, + in_capturing=True, + ) + logger.info(f"Warm up the model with the batch size:{batch_size}, num tokens:{expected_decode_len}") except RuntimeError as e: if "out of memory" in str(e): raise RuntimeError( @@ -1562,11 +1560,12 @@ class at the server level, which is too granular for ModelRunner. self.share_inputs["kv_signal_sender"] = sender # 1. Prepare inputs of model and decoder. self._prepare_inputs(is_dummy_run=is_dummy_run) - + # 2. Padding inputs for cuda graph + self.padding_cudagraph_inputs() if is_dummy_run: self.forward_meta.step_use_cudagraph = in_capturing and self.forward_meta.step_use_cudagraph - # 2. Padding inputs for cuda grph - self.padding_cudagraph_inputs() + else: + self.forward_meta.step_use_cudagraph = self.forward_meta.step_use_cudagraph and self.real_token_num <= self.fd_config.graph_opt_config.max_capture_size # NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, @@ -1575,8 +1574,6 @@ class at the server level, which is too granular for ModelRunner. self._execute_empty_input(self.forward_meta) return None - # 2. Padding inputs for cuda grph - model_inputs = {} model_inputs["ids_remove_padding"] = self.share_inputs["ids_remove_padding"] if self.enable_mm: @@ -1696,7 +1693,12 @@ class at the server level, which is too granular for ModelRunner. # 6. Draft model propose if self.speculative_decoding and self.proposer is not None: if self.spec_method == SpecMethod.MTP: - self.proposer.run(full_hidden_states=model_output) + self.proposer.run( + full_hidden_states=model_output, + step_use_cudagraph=self.forward_meta.step_use_cudagraph, + # tep_use_cudagraph=False, + is_dummy_run=is_dummy_run, + ) else: self.proposer.run(share_inputs=self.share_inputs)