Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGLOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [0.1.25] - 2026-01-22
### Added
- add to_runnable decorator
- lcc tool message add name and tool_call_id
- support set finish time of span

## [0.1.24] - 2026-01-16
### Added
- client init set default client if not exist
Expand Down
1 change: 1 addition & 0 deletions cozeloop/decorator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

coze_loop_decorator= CozeLoopDecorator()
observe = coze_loop_decorator.observe
to_runnable = coze_loop_decorator.to_runnable
178 changes: 178 additions & 0 deletions cozeloop/decorator/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Optional, Callable, Any, overload, Dict, Generic, Iterator, TypeVar, List, cast, AsyncIterator
from functools import wraps

from langchain_core.runnables import RunnableLambda, RunnableConfig

from cozeloop import Client, Span, start_span
from cozeloop.decorator.utils import is_async_func, is_gen_func, is_async_gen_func, is_class_func

Expand Down Expand Up @@ -311,6 +313,182 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any):
else:
return decorator(func)

def to_runnable(
self,
func: Callable = None,
) -> Callable:
"""
Decorator to be RunnableLambda.

:param func: The function to be decorated, Requirements are as follows:
1. When the func is called, parameter config(RunnableConfig) is required, you must use the config containing cozeloop callback handler of 'current request', otherwise, the trace may be lost!

Examples:
@to_runnable
def runnable_func(my_input: dict) -> str:
return input

async def scorer_leader(state: MyState) -> dict | str:
await runnable_func({"a": "111", "b": 222, "c": "333"}, config=state.config) # config is required
"""

def decorator(func: Callable):

@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
res = None
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
res = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, **extra)
if hasattr(res, "__iter__"):
return res
except StopIteration:
pass
except Exception as e:
raise e
finally:
if res is not None:
return res

@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
res = None
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
res = await RunnableLambda(_param_wrapped_func_async).ainvoke(input=inp, config=config, **extra)
if hasattr(res, "__aiter__"):
return res
except StopIteration:
pass
except StopAsyncIteration:
pass
except Exception as e:
if e.args and e.args[0] == 'coroutine raised StopIteration': # coroutine StopIteration
pass
else:
raise e
finally:
if res is not None:
return res

@wraps(func)
def gen_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
gen = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, *extra)
try:
for item in gen:
yield item
except StopIteration:
pass
except Exception as e:
raise e

@wraps(func)
async def async_gen_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
gen = RunnableLambda(_param_wrapped_func_async).invoke(input=inp, config=config, **extra)
items = []
try:
async for item in gen:
items.append(item)
yield item
finally:
pass
except StopIteration:
pass
except StopAsyncIteration:
pass
except Exception as e:
if e.args and e.args[0] == 'coroutine raised StopIteration':
pass
else:
raise e

# for convert parameter
def _param_wrapped_func(input_dict: dict, **kwargs) -> Any:
real_args = input_dict.get("args", ())
real_kwargs = input_dict.get("kwargs", {})

inner_class_self = kwargs.get("_inner_class_self", None)
if inner_class_self is not None:
real_args = (inner_class_self, *real_args)

return func(*real_args, **real_kwargs)

async def _param_wrapped_func_async(input_dict: dict, **kwargs) -> Any:
real_args = input_dict.get("args", ())
real_kwargs = input_dict.get("kwargs", {})

inner_class_self = kwargs.get("_inner_class_self", None)
if inner_class_self is not None:
real_args = (inner_class_self, *real_args)

return await func(*real_args, **real_kwargs)

def _convert_config(config: RunnableConfig = None) -> RunnableConfig | None:
if config is None:
config = RunnableConfig(run_name=func.__name__)
config['run_name'] = func.__name__
elif isinstance(config, dict):
config['run_name'] = func.__name__
return config

if is_async_gen_func(func):
return async_gen_wrapper
if is_gen_func(func):
return gen_wrapper
elif is_async_func(func):
return async_wrapper
else:
return sync_wrapper

if func is None:
return decorator
else:
return decorator(func)


class _CozeLoopTraceStream(Generic[S]):
def __init__(
Expand Down
11 changes: 10 additions & 1 deletion cozeloop/integration/langchain/trace_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from langchain_core.outputs import LLMResult, ChatGeneration
from langchain_core.agents import AgentFinish, AgentAction
from langchain_core.prompt_values import PromptValue, ChatPromptValue
from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage
from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage, ToolMessage
from langchain_core.prompts import AIMessagePromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk

Expand Down Expand Up @@ -581,6 +581,15 @@ def _convert_inputs(inputs: Any) -> Any:
if inputs.content != '':
format_inputs['content'] = inputs.content
return format_inputs
if isinstance(inputs, ToolMessage):
"""
Must be before BaseMessage.
"""
content = {"content": inputs.content}
if inputs.artifact is not None:
content['artifact'] = _convert_inputs(inputs.artifact) # artifact is existed when response_format="content_and_artifact".
message = Message(role=inputs.type, content=content)
return message
if isinstance(inputs, BaseMessage):
message = Message(role=inputs.type, content=inputs.content,
tool_calls=inputs.additional_kwargs.get('tool_calls', []))
Expand Down
4 changes: 3 additions & 1 deletion cozeloop/integration/langchain/trace_model/llm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class Message:
tool_calls: List[ToolCall] = None
metadata: Optional[dict] = None
reasoning_content: Optional[str] = None
name: Optional[str] = None
tool_call_id: Optional[str] = None

def __post_init__(self):
if self.role is not None and (self.role == 'AIMessageChunk' or self.role == 'ai'):
Expand Down Expand Up @@ -155,7 +157,7 @@ def __init__(self, messages: List[Union[BaseMessage, List[BaseMessage]]], invoca
if message.additional_kwargs is not None and message.additional_kwargs.get('name', ''):
name = message.additional_kwargs.get('name', '')
tool_call = ToolCall(id=message.tool_call_id, type=message.type, function=ToolFunction(name=name))
self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call]))
self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call], name=name, tool_call_id=message.tool_call_id))
else:
self._messages.append(Message(role=message.type, content=message.content))

Expand Down
3 changes: 3 additions & 0 deletions cozeloop/internal/trace/noop_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
def set_deployment_env(self, deployment_env: str) -> None:
pass

def set_finish_time(self, finish_time: datetime) -> None:
pass

def __enter__(self):
return self

Expand Down
10 changes: 9 additions & 1 deletion cozeloop/internal/trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac
self.space_id = space_id
self.parent_span_id = parent_span_id
self.start_time = start_time if start_time else datetime.now()
self.finish_time: datetime = None
self.duration = duration
self.tag_map = tag_map if tag_map else {}
self.system_tag_map = system_tag_map if system_tag_map else {}
Expand Down Expand Up @@ -396,6 +397,10 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
def set_deployment_env(self, deployment_env: str) -> None:
self.set_tags({DEPLOYMENT_ENV: deployment_env})

def set_finish_time(self, finish_time: datetime) -> None:
self.finish_time = finish_time


def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[str], int):
validate_map = {}
cut_off_keys = []
Expand Down Expand Up @@ -541,7 +546,10 @@ def set_stat_info(self):
if input_tokens > 0 or output_tokens > 0:
self.set_tags({TOKENS: int(input_tokens) + int(output_tokens)})

duration = int((datetime.now().timestamp() - self.start_time.timestamp()) * 1000000)
finish_time_stamp = datetime.now().timestamp()
if self.finish_time is not None:
finish_time_stamp = self.finish_time.timestamp()
duration = int((finish_time_stamp - self.start_time.timestamp()) * 1000000)
with self.lock:
self.duration = duration

Expand Down
7 changes: 7 additions & 0 deletions cozeloop/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,18 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
Set system tags. DO NOT use this method unless you know what you are doing.
"""

@abstractmethod
def set_deployment_env(self, deployment_env: str) -> None:
"""
Set the deployment environment of the span, identify custom environments.
"""

@abstractmethod
def set_finish_time(self, finish_time: datetime) -> None:
"""
Set the finish time of the span.
"""


class Span(CommonSpanSetter, SpanContext):
"""
Expand Down
2 changes: 2 additions & 0 deletions examples/trace/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import time
from datetime import datetime, timedelta

import cozeloop

Expand Down Expand Up @@ -100,6 +101,7 @@ def do_simple_demo():
span.set_error(str(e))

# 3. span finish
span.set_finish_time(datetime.now() + timedelta(seconds=3)) # set finish time as your need to change duration
span.finish()

# 4. (optional) flush or close
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cozeloop"
version = "0.1.24"
version = "0.1.25"
description = "coze loop sdk"
authors = ["JiangQi715 <jiangqi.rrt@bytedance.com>"]
license = "MIT"
Expand Down