diff --git a/CHANGLOG.md b/CHANGLOG.md index e5aba50..a504e52 100644 --- a/CHANGLOG.md +++ b/CHANGLOG.md @@ -1,3 +1,9 @@ +## [0.1.25] - 2026-01-22 +### Added +- add to_runnable decorator +- lcc tool message add name and tool_call_id +- support set finish time of span + ## [0.1.24] - 2026-01-16 ### Added - client init set default client if not exist diff --git a/cozeloop/decorator/__init__.py b/cozeloop/decorator/__init__.py index 61d4add..121ccfd 100644 --- a/cozeloop/decorator/__init__.py +++ b/cozeloop/decorator/__init__.py @@ -5,3 +5,4 @@ coze_loop_decorator= CozeLoopDecorator() observe = coze_loop_decorator.observe +to_runnable = coze_loop_decorator.to_runnable diff --git a/cozeloop/decorator/decorator.py b/cozeloop/decorator/decorator.py index de9772a..59fa3b4 100644 --- a/cozeloop/decorator/decorator.py +++ b/cozeloop/decorator/decorator.py @@ -4,6 +4,8 @@ from typing import Optional, Callable, Any, overload, Dict, Generic, Iterator, TypeVar, List, cast, AsyncIterator from functools import wraps +from langchain_core.runnables import RunnableLambda, RunnableConfig + from cozeloop import Client, Span, start_span from cozeloop.decorator.utils import is_async_func, is_gen_func, is_async_gen_func, is_class_func @@ -311,6 +313,182 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any): else: return decorator(func) + def to_runnable( + self, + func: Callable = None, + ) -> Callable: + """ + Decorator to be RunnableLambda. + + :param func: The function to be decorated, Requirements are as follows: + 1. When the func is called, parameter config(RunnableConfig) is required, you must use the config containing cozeloop callback handler of 'current request', otherwise, the trace may be lost! + + Examples: + @to_runnable + def runnable_func(my_input: dict) -> str: + return input + + async def scorer_leader(state: MyState) -> dict | str: + await runnable_func({"a": "111", "b": 222, "c": "333"}, config=state.config) # config is required + """ + + def decorator(func: Callable): + + @wraps(func) + def sync_wrapper(*args: Any, **kwargs: Any): + config = kwargs.pop("config", None) + config = _convert_config(config) + res = None + try: + extra = {} + if len(args) > 0 and is_class_func(func): + extra = {"_inner_class_self": args[0]} + args = args[1:] + inp = {} + if len(args) > 0: + inp['args'] = args + if len(kwargs) > 0: + inp['kwargs'] = kwargs + res = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, **extra) + if hasattr(res, "__iter__"): + return res + except StopIteration: + pass + except Exception as e: + raise e + finally: + if res is not None: + return res + + @wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any): + config = kwargs.pop("config", None) + config = _convert_config(config) + res = None + try: + extra = {} + if len(args) > 0 and is_class_func(func): + extra = {"_inner_class_self": args[0]} + args = args[1:] + inp = {} + if len(args) > 0: + inp['args'] = args + if len(kwargs) > 0: + inp['kwargs'] = kwargs + res = await RunnableLambda(_param_wrapped_func_async).ainvoke(input=inp, config=config, **extra) + if hasattr(res, "__aiter__"): + return res + except StopIteration: + pass + except StopAsyncIteration: + pass + except Exception as e: + if e.args and e.args[0] == 'coroutine raised StopIteration': # coroutine StopIteration + pass + else: + raise e + finally: + if res is not None: + return res + + @wraps(func) + def gen_wrapper(*args: Any, **kwargs: Any): + config = kwargs.pop("config", None) + config = _convert_config(config) + try: + extra = {} + if len(args) > 0 and is_class_func(func): + extra = {"_inner_class_self": args[0]} + args = args[1:] + inp = {} + if len(args) > 0: + inp['args'] = args + if len(kwargs) > 0: + inp['kwargs'] = kwargs + gen = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, *extra) + try: + for item in gen: + yield item + except StopIteration: + pass + except Exception as e: + raise e + + @wraps(func) + async def async_gen_wrapper(*args: Any, **kwargs: Any): + config = kwargs.pop("config", None) + config = _convert_config(config) + try: + extra = {} + if len(args) > 0 and is_class_func(func): + extra = {"_inner_class_self": args[0]} + args = args[1:] + inp = {} + if len(args) > 0: + inp['args'] = args + if len(kwargs) > 0: + inp['kwargs'] = kwargs + gen = RunnableLambda(_param_wrapped_func_async).invoke(input=inp, config=config, **extra) + items = [] + try: + async for item in gen: + items.append(item) + yield item + finally: + pass + except StopIteration: + pass + except StopAsyncIteration: + pass + except Exception as e: + if e.args and e.args[0] == 'coroutine raised StopIteration': + pass + else: + raise e + + # for convert parameter + def _param_wrapped_func(input_dict: dict, **kwargs) -> Any: + real_args = input_dict.get("args", ()) + real_kwargs = input_dict.get("kwargs", {}) + + inner_class_self = kwargs.get("_inner_class_self", None) + if inner_class_self is not None: + real_args = (inner_class_self, *real_args) + + return func(*real_args, **real_kwargs) + + async def _param_wrapped_func_async(input_dict: dict, **kwargs) -> Any: + real_args = input_dict.get("args", ()) + real_kwargs = input_dict.get("kwargs", {}) + + inner_class_self = kwargs.get("_inner_class_self", None) + if inner_class_self is not None: + real_args = (inner_class_self, *real_args) + + return await func(*real_args, **real_kwargs) + + def _convert_config(config: RunnableConfig = None) -> RunnableConfig | None: + if config is None: + config = RunnableConfig(run_name=func.__name__) + config['run_name'] = func.__name__ + elif isinstance(config, dict): + config['run_name'] = func.__name__ + return config + + if is_async_gen_func(func): + return async_gen_wrapper + if is_gen_func(func): + return gen_wrapper + elif is_async_func(func): + return async_wrapper + else: + return sync_wrapper + + if func is None: + return decorator + else: + return decorator(func) + class _CozeLoopTraceStream(Generic[S]): def __init__( diff --git a/cozeloop/integration/langchain/trace_callback.py b/cozeloop/integration/langchain/trace_callback.py index 0010a12..8d0dbe6 100644 --- a/cozeloop/integration/langchain/trace_callback.py +++ b/cozeloop/integration/langchain/trace_callback.py @@ -14,7 +14,7 @@ from langchain_core.outputs import LLMResult, ChatGeneration from langchain_core.agents import AgentFinish, AgentAction from langchain_core.prompt_values import PromptValue, ChatPromptValue -from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage +from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage, ToolMessage from langchain_core.prompts import AIMessagePromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate from langchain_core.outputs import ChatGenerationChunk, GenerationChunk @@ -581,6 +581,15 @@ def _convert_inputs(inputs: Any) -> Any: if inputs.content != '': format_inputs['content'] = inputs.content return format_inputs + if isinstance(inputs, ToolMessage): + """ + Must be before BaseMessage. + """ + content = {"content": inputs.content} + if inputs.artifact is not None: + content['artifact'] = _convert_inputs(inputs.artifact) # artifact is existed when response_format="content_and_artifact". + message = Message(role=inputs.type, content=content) + return message if isinstance(inputs, BaseMessage): message = Message(role=inputs.type, content=inputs.content, tool_calls=inputs.additional_kwargs.get('tool_calls', [])) diff --git a/cozeloop/integration/langchain/trace_model/llm_model.py b/cozeloop/integration/langchain/trace_model/llm_model.py index 3e12a86..7b01304 100644 --- a/cozeloop/integration/langchain/trace_model/llm_model.py +++ b/cozeloop/integration/langchain/trace_model/llm_model.py @@ -53,6 +53,8 @@ class Message: tool_calls: List[ToolCall] = None metadata: Optional[dict] = None reasoning_content: Optional[str] = None + name: Optional[str] = None + tool_call_id: Optional[str] = None def __post_init__(self): if self.role is not None and (self.role == 'AIMessageChunk' or self.role == 'ai'): @@ -155,7 +157,7 @@ def __init__(self, messages: List[Union[BaseMessage, List[BaseMessage]]], invoca if message.additional_kwargs is not None and message.additional_kwargs.get('name', ''): name = message.additional_kwargs.get('name', '') tool_call = ToolCall(id=message.tool_call_id, type=message.type, function=ToolFunction(name=name)) - self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call])) + self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call], name=name, tool_call_id=message.tool_call_id)) else: self._messages.append(Message(role=message.type, content=message.content)) diff --git a/cozeloop/internal/trace/noop_span.py b/cozeloop/internal/trace/noop_span.py index 561fb0b..86743e5 100644 --- a/cozeloop/internal/trace/noop_span.py +++ b/cozeloop/internal/trace/noop_span.py @@ -125,6 +125,9 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None: def set_deployment_env(self, deployment_env: str) -> None: pass + def set_finish_time(self, finish_time: datetime) -> None: + pass + def __enter__(self): return self diff --git a/cozeloop/internal/trace/span.py b/cozeloop/internal/trace/span.py index a4f1328..3a0de67 100644 --- a/cozeloop/internal/trace/span.py +++ b/cozeloop/internal/trace/span.py @@ -78,6 +78,7 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac self.space_id = space_id self.parent_span_id = parent_span_id self.start_time = start_time if start_time else datetime.now() + self.finish_time: datetime = None self.duration = duration self.tag_map = tag_map if tag_map else {} self.system_tag_map = system_tag_map if system_tag_map else {} @@ -396,6 +397,10 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None: def set_deployment_env(self, deployment_env: str) -> None: self.set_tags({DEPLOYMENT_ENV: deployment_env}) + def set_finish_time(self, finish_time: datetime) -> None: + self.finish_time = finish_time + + def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[str], int): validate_map = {} cut_off_keys = [] @@ -541,7 +546,10 @@ def set_stat_info(self): if input_tokens > 0 or output_tokens > 0: self.set_tags({TOKENS: int(input_tokens) + int(output_tokens)}) - duration = int((datetime.now().timestamp() - self.start_time.timestamp()) * 1000000) + finish_time_stamp = datetime.now().timestamp() + if self.finish_time is not None: + finish_time_stamp = self.finish_time.timestamp() + duration = int((finish_time_stamp - self.start_time.timestamp()) * 1000000) with self.lock: self.duration = duration diff --git a/cozeloop/span.py b/cozeloop/span.py index 3f2c278..6ec0cb0 100644 --- a/cozeloop/span.py +++ b/cozeloop/span.py @@ -186,11 +186,18 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None: Set system tags. DO NOT use this method unless you know what you are doing. """ + @abstractmethod def set_deployment_env(self, deployment_env: str) -> None: """ Set the deployment environment of the span, identify custom environments. """ + @abstractmethod + def set_finish_time(self, finish_time: datetime) -> None: + """ + Set the finish time of the span. + """ + class Span(CommonSpanSetter, SpanContext): """ diff --git a/examples/trace/simple.py b/examples/trace/simple.py index ff2f744..12f9e4a 100644 --- a/examples/trace/simple.py +++ b/examples/trace/simple.py @@ -4,6 +4,7 @@ import logging import os import time +from datetime import datetime, timedelta import cozeloop @@ -100,6 +101,7 @@ def do_simple_demo(): span.set_error(str(e)) # 3. span finish + span.set_finish_time(datetime.now() + timedelta(seconds=3)) # set finish time as your need to change duration span.finish() # 4. (optional) flush or close diff --git a/pyproject.toml b/pyproject.toml index 0bf9ce7..62d9fa8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cozeloop" -version = "0.1.24" +version = "0.1.25" description = "coze loop sdk" authors = ["JiangQi715 "] license = "MIT"