diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index c17b8821ce2..1fcc4f2d4b4 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -1086,8 +1086,8 @@ def to_dict(self): """ return {k: v for k, v in asdict(self).items()} - def record_recv_first_token(self): - cur_time = time.time() + def record_recv_first_token(self, cur_time: float = None): + cur_time = time.time() if cur_time is None else cur_time self.record_recv_token(cur_time) self.engine_recv_first_token_time = cur_time @@ -1099,8 +1099,8 @@ def record_recv_token(self, cur_time: float = None): if self.inference_start_time: self.model_forward_time = cur_time - self.inference_start_time - def record_decode_recv_second_token(self): - cur_time = time.time() + def record_decode_recv_second_token(self, cur_time: float = None): + cur_time = time.time() if cur_time is None else cur_time self.record_recv_token(cur_time) self.decode_recv_second_token_time = cur_time diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index d6429521f05..e88ca93a949 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -325,10 +325,12 @@ async def chat_completion_stream_generator( raise ValueError("{}".format(res["error_msg"])) if inference_start_time[idx] == 0: - arrival_time = res["metrics"]["first_token_time"] - inference_start_time[idx] = res["metrics"]["inference_start_time"] + arrival_time = res["metrics"]["first_token_time"] or 0 + inference_start_time[idx] = res["metrics"]["inference_start_time"] or 0 else: - arrival_time = res["metrics"]["engine_recv_latest_token_time"] - inference_start_time[idx] + arrival_time = (res["metrics"]["engine_recv_latest_token_time"] or 0) - inference_start_time[ + idx + ] if first_iteration: num_prompt_tokens = len(prompt_token_ids) num_cached_tokens = res.get("num_cached_tokens", 0) @@ -468,7 +470,7 @@ async def chat_completion_stream_generator( trace_carrier = res.get("trace_carrier") if trace_carrier: tracing.trace_set_proc_propagate_context(request_id, trace_carrier) - start_time = res["metrics"]["engine_recv_latest_token_time"] + start_time = res["metrics"]["engine_recv_latest_token_time"] or 0 tracing.trace_report_span( tracing.TraceSpanName.POSTPROCESSING, request_id, @@ -480,7 +482,7 @@ async def chat_completion_stream_generator( del res["trace_carrier"] num_choices -= 1 main_process_metrics.e2e_request_latency.observe( - time.time() - res["metrics"]["request_start_time"] + time.time() - (res["metrics"]["request_start_time"] or 0) ) if previous_num_tokens[idx] != max_tokens: choice.finish_reason = "stop" @@ -715,7 +717,7 @@ async def chat_completion_full_generator( trace_carrier = data.get("trace_carrier") if trace_carrier: tracing.trace_set_proc_propagate_context(request_id, trace_carrier) - start_time = data["metrics"]["engine_recv_latest_token_time"] + start_time = data["metrics"]["engine_recv_latest_token_time"] or 0 tracing.trace_report_span( tracing.TraceSpanName.POSTPROCESSING, request_id, diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index e8cb5d99fe4..6776a9109bb 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -369,7 +369,7 @@ async def completion_full_generator( trace_carrier = data.get("trace_carrier") if trace_carrier: tracing.trace_set_proc_propagate_context(request_id, trace_carrier) - start_time = data["metrics"]["engine_recv_latest_token_time"] + start_time = data["metrics"]["engine_recv_latest_token_time"] or 0 tracing.trace_report_span( tracing.TraceSpanName.POSTPROCESSING, request_id, @@ -561,10 +561,12 @@ async def completion_stream_generator( await self._call_process_response_dict(res, request, stream=True) if inference_start_time[idx] == 0: - arrival_time = res["metrics"]["first_token_time"] - inference_start_time[idx] = res["metrics"]["inference_start_time"] + arrival_time = res["metrics"]["first_token_time"] or 0 + inference_start_time[idx] = res["metrics"]["inference_start_time"] or 0 else: - arrival_time = res["metrics"]["engine_recv_latest_token_time"] - inference_start_time[idx] + arrival_time = (res["metrics"]["engine_recv_latest_token_time"] or 0) - inference_start_time[ + idx + ] await self._process_echo_logic(request, idx, res["outputs"]) output = res["outputs"] @@ -655,7 +657,7 @@ async def completion_stream_generator( trace_carrier = res.get("trace_carrier") if trace_carrier: tracing.trace_set_proc_propagate_context(request_id, trace_carrier) - start_time = res["metrics"]["engine_recv_latest_token_time"] + start_time = res["metrics"]["engine_recv_latest_token_time"] or 0 tracing.trace_report_span( tracing.TraceSpanName.POSTPROCESSING, request_id, diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 6be28f1f3be..8b5c1f85e9d 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -207,6 +207,7 @@ def _validate_split_kv_size(value: int) -> int: "FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS": lambda: int(os.getenv("FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS", "500")), "FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE": lambda: int(os.getenv("FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE", "64")), "FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: float(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")), + "FD_ENABLE_OBSERVABILITY": lambda: os.getenv("FD_ENABLE_OBSERVABILITY", "1") == "1", "FD_XPU_MOE_FFN_QUANT_TYPE_MAP": lambda: os.getenv("FD_XPU_MOE_FFN_QUANT_TYPE_MAP", ""), # Whether to enable low latency in mixed scenario "FD_XPU_ENABLE_MIXED_EP_MODE": lambda: bool(int(os.getenv("FD_XPU_ENABLE_MIXED_EP_MODE", "0"))), diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index fd5328dc4f5..befcd43e083 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -51,7 +51,7 @@ from fastdeploy.spec_decode import SpecMethod from fastdeploy.trace.constants import LoggingEventName from fastdeploy.trace.trace_logger import print as trace_print -from fastdeploy.utils import get_base_request_id, llm_logger, spec_logger +from fastdeploy.utils import llm_logger, spec_logger from fastdeploy.worker.output import LogprobsLists RECOVERY_STOP_SIGNAL = -3 @@ -132,6 +132,9 @@ def __init__(self, cfg, cached_generated_tokens, engine_worker_queue, split_conn self.accept_token_num_per_head_per_request = {} self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS + # observability toggle + self._observability_enabled = envs.FD_ENABLE_OBSERVABILITY + # health monitor self.timestamp_for_alive_before_handle_batch = None self.timestamp_for_alive_after_handle_batch = None @@ -226,7 +229,7 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: task_id = task.request_id token_id_list = token_ids.tolist() - self._record_metrics(task, current_time, token_id_list) + self._record_prometheus_metrics_on_token(task, current_time, token_id_list) for token_id in token_id_list: recovery_stop = token_id == RECOVERY_STOP_SIGNAL if recovery_stop: @@ -246,48 +249,14 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: result.error_msg = "Recover is not supported, the result is incomplete!" # Calculate statistics for the combined log - is_decode = self.cfg.scheduler_config.splitwise_role == "decode" - inference_start_time = task.metrics.get_inference_start_time(is_decode) - task.metrics.cal_cost_time() - e2e_time = current_time - inference_start_time - token_ratio = self.tokens_counter[task_id] / e2e_time - - # Get cache information - gpu_cache = getattr(task.metrics, "gpu_cache_token_num", 0) - cpu_cache = getattr(task.metrics, "cpu_cache_token_num", 0) - total_cached = gpu_cache + cpu_cache - - # Build cached detail dict - cached_detail = f'{{"CachedToken": {total_cached}, "GPU": {gpu_cache}, "CPU": {cpu_cache}}}' - - # Print combined log with all required information - ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 - log_request( - RequestLogLevel.LIFECYCLE, - message=( - "Request={request_id}, InputToken={input_tokens}, " - "CachedDetail={cached_detail}, OutputToken={output_tokens}, " - "TokenRatio={token_ratio}, TTFT={ttft}, " - "E2E={e2e_time}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " - "PreemptedCount={preempted_count}" - ), - request_id=task_id, - input_tokens=task.prompt_token_ids_len, - cached_detail=cached_detail, - output_tokens=self.tokens_counter[task_id], - token_ratio=f"{token_ratio:.2f}", - ttft=f"{ttft:.2f}", - e2e_time=f"{e2e_time:.2f}", - is_prefill=is_prefill, - recovery_stop=recovery_stop, - preempted_count=getattr(task.metrics, "preempted_count", 0), - ) + self._record_task_metrics_on_completion(task, current_time, recovery_stop) + self._record_trace_on_completion(task) + self._record_prometheus_metrics_on_completion(task, current_time) + self._log_request_on_completion(task, current_time, recovery_stop) - main_process_metrics.request_token_ratio.observe(token_ratio) llm_logger.info(self.resource_manager.info()) if self.cfg.speculative_config.method: self._compute_speculative_status() - self._record_completion_metrics(task, current_time) self._recycle_resources(task_id, batch_id, task, result, is_prefill) break return result @@ -340,16 +309,14 @@ def _process_batch_output_use_zmq(self, receive_datas): current_time = time.time() if self.tokens_counter[task_id] == 0: - task.metrics.record_recv_first_token() - task.metrics.cal_cost_time() - metrics = copy.copy(task.metrics) - self._record_first_token_metrics(task, current_time) + rid = task_id.split("_")[0] + self._record_task_metrics_on_first_token(task, current_time) + self._record_trace_on_first_token(task, rid) + self._record_prometheus_metrics_on_first_token(task, current_time) else: - task.metrics.record_recv_token() - if self.tokens_counter[task_id] == 1 and self.cfg.scheduler_config.splitwise_role == "decode": - task.metrics.record_decode_recv_second_token() - metrics = copy.copy(task.metrics) + self._record_task_metrics_on_subsequent_token(task, current_time) + metrics = copy.copy(task.metrics) if task.pooling_params is not None: pooler_output = stream_data.pooler_output if isinstance(pooler_output, np.ndarray): @@ -806,14 +773,8 @@ def _process_batch_output(self): task = self.resource_manager.tasks_list[i] task_id = task.request_id is_prefill = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "prefill" - is_decode = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "decode" - - rid = get_base_request_id(task_id) - trace_carrier = task.trace_carrier - metrics = task.metrics - t = metrics.inference_start_time - ts = int(t * 1_000_000_000) if t is not None else 0 - tracing.trace_set_proc_propagate_context(rid, trace_carrier, ts) + + rid = self._setup_trace_context(task) if self.cfg.speculative_config.method: self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i]) if accept_num[i] == PREEMPTED_TOKEN_ID: # in MTP, means preemption has happened in worker @@ -909,32 +870,20 @@ def _process_batch_output(self): current_time = time.time() trace_carrier = None if self.tokens_counter[task_id] == 0: - task.metrics.record_recv_first_token() - task.metrics.cal_cost_time() - metrics = copy.copy(task.metrics) log_request( RequestLogLevel.STAGES, - message="task:{request_id} start recode first token", + message="task: {request_id} start record first token", request_id=task.request_id, ) - self._record_first_token_metrics(task, current_time) - - tracing.trace_report_span( - name=tracing.TraceSpanName.PREFILL, - rid=rid, - start_time_ns=int(task.metrics.inference_start_time * 1e9), - end_time_ns=int(time.time() * 1e9), - thread_finish_flag=False, - ) - + self._record_task_metrics_on_first_token(task, current_time) + self._record_trace_on_first_token(task, rid) + self._record_prometheus_metrics_on_first_token(task, current_time) else: - task.metrics.record_recv_token() - if self.tokens_counter[task_id] == 1 and self.cfg.scheduler_config.splitwise_role == "decode": - task.metrics.record_decode_recv_second_token() - metrics = copy.copy(task.metrics) + self._record_task_metrics_on_subsequent_token(task, current_time) + self._record_prometheus_metrics_on_token(task, current_time, token_ids) self.number_of_output_tokens += len(token_ids) - self._record_metrics(task, current_time, token_ids) + metrics = copy.copy(task.metrics) result = RequestOutput( request_id=task_id, output_type=mtype, @@ -995,59 +944,17 @@ def _process_batch_output(self): result.finished = True trace_carrier = tracing.trace_get_proc_propagate_context(rid=rid) result.trace_carrier = trace_carrier - tracing.trace_report_span( - name=tracing.TraceSpanName.DECODE, - rid=rid, - start_time_ns=int(task.metrics.inference_start_time * 1e9), - end_time_ns=int(time.time() * 1e9), - thread_finish_flag=True, - ) + if recovery_stop: result.error_msg = "Recover is not supported, the result is incomplete!" - # Calculate statistics for the combined log - inference_start_time = task.metrics.get_inference_start_time(is_decode) - task.metrics.cal_cost_time() - e2e_time = current_time - inference_start_time - token_ratio = self.tokens_counter[task_id] / e2e_time - - # Get cache information - gpu_cache = getattr(task.metrics, "gpu_cache_token_num", 0) - cpu_cache = getattr(task.metrics, "cpu_cache_token_num", 0) - total_cached = gpu_cache + cpu_cache - - # Build cached detail dict - cached_detail = f'{{"CachedToken": {total_cached}, "GPU": {gpu_cache}, "CPU": {cpu_cache}}}' + self._record_task_metrics_on_completion(task, current_time, recovery_stop) + self._record_trace_on_completion(task, rid) + self._record_prometheus_metrics_on_completion(task, current_time) + self._log_request_on_completion(task, current_time, recovery_stop) - # Print combined log with all required information - ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 - ttft_s = ttft + task.metrics.time_in_queue - log_request( - RequestLogLevel.LIFECYCLE, - message=( - "Request={request_id}, InputToken={input_tokens}, " - "CachedDetail={cached_detail}, OutputToken={output_tokens}, " - "TokenRatio={token_ratio}, TTFT={ttft}, TTFT_S={ttft_s}, " - "E2E={e2e_time}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " - "PreemptedCount={preempted_count}" - ), - request_id=task_id, - input_tokens=task.prompt_token_ids_len, - cached_detail=cached_detail, - output_tokens=self.tokens_counter[task_id], - token_ratio=f"{token_ratio:.2f}", - ttft=f"{ttft:.2f}", - ttft_s=f"{ttft_s:.2f}", - e2e_time=f"{e2e_time:.2f}", - is_prefill=is_prefill, - recovery_stop=recovery_stop, - preempted_count=getattr(task.metrics, "preempted_count", 0), - ) - - main_process_metrics.request_token_ratio.observe(token_ratio) if self.cfg.speculative_config.method: self._compute_speculative_status(result) - self._record_completion_metrics(task, current_time) log_request( RequestLogLevel.STAGES, message="task {request_id} received eos token. Recycling.", @@ -1078,8 +985,128 @@ def _process_batch_output(self): self._record_speculative_decoding_metrics(accept_num) self.postprocess(batch_result, mtype) - def _record_metrics(self, task, current_time, token_ids): - """Record all metrics for a task""" + def _setup_trace_context(self, task): + """Set up trace propagation context for a task and return rid""" + task_id = task.request_id + rid = task_id.split("_")[0] + if not self._observability_enabled: + return rid + trace_carrier = task.trace_carrier + t = task.metrics.inference_start_time + ts = int(t * 1_000_000_000) if t is not None else 0 + tracing.trace_set_proc_propagate_context(rid, trace_carrier, ts) + return rid + + def _record_trace_on_first_token(self, task, rid): + """Emit trace events and report PREFILL span when first token is received""" + if not self._observability_enabled: + return + trace_print(LoggingEventName.FIRST_TOKEN_GENERATED, task.request_id, getattr(task, "user", "")) + trace_print(LoggingEventName.DECODE_START, task.request_id, getattr(task, "user", "")) + tracing.trace_report_span( + name=tracing.TraceSpanName.PREFILL, + rid=rid, + start_time_ns=int(task.metrics.inference_start_time * 1e9), + end_time_ns=int(time.time() * 1e9), + thread_finish_flag=False, + ) + + def _record_trace_on_completion(self, task, rid=None): + """Emit trace events when request completes""" + if not self._observability_enabled: + return + role = self.cfg.scheduler_config.splitwise_role + if role in ("mixed", "decode"): + trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", "")) + if role == "prefill": + trace_print(LoggingEventName.PREFILL_INFERENCE_END, task.request_id, getattr(task, "user", "")) + elif role == "decode": + trace_print(LoggingEventName.DECODE_INFERENCE_END, task.request_id, getattr(task, "user", "")) + trace_print(LoggingEventName.POSTPROCESSING_START, task.request_id, getattr(task, "user", "")) + tracing.trace_report_span( + name=tracing.TraceSpanName.DECODE, + rid=rid, + start_time_ns=int(task.metrics.inference_start_time * 1e9), + end_time_ns=int(time.time() * 1e9), + thread_finish_flag=True, + ) + + def _record_task_metrics_on_first_token(self, task, current_time): + """Record first token arrival in task metrics""" + if not self._observability_enabled: + return + task.metrics.record_recv_first_token(current_time) + task.metrics.cal_cost_time() + + def _record_task_metrics_on_subsequent_token(self, task, current_time): + """Record subsequent token arrival in task metrics""" + if not self._observability_enabled: + return + task_id = task.request_id + task.metrics.record_recv_token(current_time) + if self.tokens_counter[task_id] == 1 and self.cfg.scheduler_config.splitwise_role == "decode": + task.metrics.record_decode_recv_second_token(current_time) + + def _record_task_metrics_on_completion(self, task, current_time, recovery_stop=False): + """Record completion metrics into task.metrics""" + if not self._observability_enabled: + return + task.metrics.cal_cost_time() + + def _log_request_on_completion(self, task, current_time, recovery_stop=False): + """Log LIFECYCLE info when request completes""" + if not self._observability_enabled: + return + task_id = task.request_id + metrics = task.metrics + role = self.cfg.scheduler_config.splitwise_role + is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill" + + e2e_time = current_time - metrics.get_inference_start_time(role == "decode") + token_ratio = self.tokens_counter[task_id] / e2e_time + + gpu_cache = getattr(metrics, "gpu_cache_token_num", 0) + cpu_cache = getattr(metrics, "cpu_cache_token_num", 0) + total_cached = gpu_cache + cpu_cache + cached_detail = f'{{"CachedToken": {total_cached}, "GPU": {gpu_cache}, "CPU": {cpu_cache}}}' + + ttft = metrics.first_token_time if metrics.first_token_time else 0 + ttft_s = ttft + metrics.time_in_queue + log_request( + RequestLogLevel.LIFECYCLE, + message=( + "Request={request_id}, InputToken={input_tokens}, " + "CachedDetail={cached_detail}, OutputToken={output_tokens}, " + "TokenRatio={token_ratio}, TTFT={ttft}, TTFT_S={ttft_s}, " + "E2E={e2e_time}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " + "PreemptedCount={preempted_count}" + ), + request_id=task_id, + input_tokens=task.prompt_token_ids_len, + cached_detail=cached_detail, + output_tokens=self.tokens_counter[task_id], + token_ratio=f"{token_ratio:.2f}", + ttft=f"{ttft:.2f}", + ttft_s=f"{ttft_s:.2f}", + e2e_time=f"{e2e_time:.2f}", + is_prefill=is_prefill, + recovery_stop=recovery_stop, + preempted_count=getattr(metrics, "preempted_count", 0), + ) + + def _record_prometheus_metrics_on_first_token(self, task, current_time): + """Report prometheus metrics for first token""" + if not self._observability_enabled: + return + metrics = task.metrics + main_process_metrics.time_to_first_token.observe(current_time - metrics.arrival_time) + main_process_metrics.request_queue_time.observe(metrics.inference_start_time - metrics.preprocess_end_time) + main_process_metrics.request_prefill_time.observe(current_time - metrics.inference_start_time) + + def _record_prometheus_metrics_on_token(self, task, current_time, token_ids): + """Report prometheus metrics per token generation""" + if not self._observability_enabled: + return if hasattr(task, "last_token_time") and task.last_token_time is not None: token_gen_time = current_time - task.last_token_time main_process_metrics.time_per_output_token.observe(token_gen_time) @@ -1088,17 +1115,10 @@ def _record_metrics(self, task, current_time, token_ids): # Record generation metrics main_process_metrics.generation_tokens_total.inc(len(token_ids)) - def _record_first_token_metrics(self, task, current_time): - """Record metrics for first token""" - metrics = task.metrics - trace_print(LoggingEventName.FIRST_TOKEN_GENERATED, task.request_id, getattr(task, "user", "")) - trace_print(LoggingEventName.DECODE_START, task.request_id, getattr(task, "user", "")) - main_process_metrics.time_to_first_token.observe(current_time - metrics.arrival_time) - main_process_metrics.request_queue_time.observe(metrics.inference_start_time - metrics.preprocess_end_time) - main_process_metrics.request_prefill_time.observe(current_time - metrics.inference_start_time) - - def _record_completion_metrics(self, task, current_time): - """Record metrics when request completes""" + def _record_prometheus_metrics_on_completion(self, task, current_time): + """Report prometheus metrics when request completes""" + if not self._observability_enabled: + return role = self.cfg.scheduler_config.splitwise_role metrics = task.metrics @@ -1106,20 +1126,18 @@ def _record_completion_metrics(self, task, current_time): if metrics.engine_recv_first_token_time: decode_time = current_time - metrics.engine_recv_first_token_time main_process_metrics.request_decode_time.observe(decode_time) - trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", "")) - - if role == "prefill": - trace_print(LoggingEventName.PREFILL_INFERENCE_END, task.request_id, getattr(task, "user", "")) - elif role == "decode": - trace_print(LoggingEventName.DECODE_INFERENCE_END, task.request_id, getattr(task, "user", "")) - trace_print(LoggingEventName.POSTPROCESSING_START, task.request_id, getattr(task, "user", "")) + e2e_time = current_time - metrics.get_inference_start_time(role == "decode") + token_ratio = self.tokens_counter[task.request_id] / e2e_time main_process_metrics.request_success_total.inc() main_process_metrics.request_inference_time.observe(current_time - metrics.inference_start_time) main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) + main_process_metrics.request_token_ratio.observe(token_ratio) def _record_speculative_decoding_metrics(self, accept_num): """Record metrics of speculative decoding""" + if not self._observability_enabled: + return if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): main_process_metrics._init_speculative_metrics( self.cfg.speculative_config.method,