Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5420577
phase 1 refactor
griffinmilsap Feb 27, 2026
13226e1
bugfix
griffinmilsap Mar 2, 2026
d285340
phase-1 impl.
griffinmilsap Mar 2, 2026
cfe283c
topic and relay implementation
griffinmilsap Mar 2, 2026
70c0101
adjust examples
griffinmilsap Mar 2, 2026
4b83da0
Merge branch 'fix/collection-topics-relays' into feature/session-meta…
griffinmilsap Mar 2, 2026
2393c08
merged #228
griffinmilsap Mar 2, 2026
c548977
registration/snapshot foundation
griffinmilsap Mar 14, 2026
0ffd196
graphserver is ASOT for settings
griffinmilsap Mar 15, 2026
c7c2160
process control routing
griffinmilsap Mar 15, 2026
ab746c5
first process-control commands implemented
griffinmilsap Mar 16, 2026
ce8e192
profiling backend
griffinmilsap Mar 16, 2026
bf9e009
profiling hooks; trace/streams
griffinmilsap Mar 16, 2026
9c2dc79
topology change subscription api
griffinmilsap Mar 16, 2026
e183452
more flexible profile trace configurability
griffinmilsap Mar 16, 2026
0f90668
GraphContext settings control APIs
griffinmilsap Mar 16, 2026
35ed232
added settings structure and pydantic/param compat
griffinmilsap Mar 16, 2026
038843b
a few bugfixes and some code condensation
griffinmilsap Mar 16, 2026
01780c2
using UUIDs and errors on unit collision
griffinmilsap Mar 17, 2026
5655288
fix: profiling registry
griffinmilsap Mar 17, 2026
2b5ff31
better errors for high-level API name collisions, and fixed brittle t…
griffinmilsap Mar 17, 2026
09810f1
Include num_buffers in publisher profiling snapshots
griffinmilsap Mar 19, 2026
39bb093
modified the toy example to add some dynamic settings
griffinmilsap Mar 19, 2026
66e8261
Add trace sample sequence IDs for timing alignment
griffinmilsap Mar 19, 2026
690e12e
Improve high-rate profiling trace throughput and fairness
griffinmilsap Mar 20, 2026
553a353
address PROCESS_SETTINGS_UPDATE leakage
griffinmilsap Mar 23, 2026
7efabd6
profiling age-out on snapshot
griffinmilsap Mar 24, 2026
32924aa
better global topic output
griffinmilsap Mar 30, 2026
1b84fed
quicker/better hotpath and a/b testing
griffinmilsap Apr 1, 2026
72075e8
Merge branch 'codex/hotpath-ab-suite' into fix/process-control-perf
griffinmilsap Apr 1, 2026
e912e98
less expensive bookkeeping
griffinmilsap Apr 1, 2026
c2b7017
Merge branch 'codex/hotpath-ab-suite' into feature/process-control
griffinmilsap Apr 1, 2026
bf638b8
profiling refactor for perf
griffinmilsap Apr 1, 2026
91db698
small enhancements to hotpath
griffinmilsap Apr 1, 2026
4fc86d3
Merge branch 'fix/process-control-perf' into feature/process-control
griffinmilsap Apr 1, 2026
d2cc166
fix metadata registration race
griffinmilsap Apr 1, 2026
e25d249
fixed shutdown buffer error
griffinmilsap Apr 2, 2026
0b97f75
Fix Python 3.10 subscriber update timeout
griffinmilsap Apr 2, 2026
71071b3
maybe fix windows-only(?) test failure
griffinmilsap Apr 2, 2026
b0cd6cc
Merge pull request #234 from ezmsg-org/feature/process-control
griffinmilsap Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions examples/ezmsg_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ async def listen(self, msg: int) -> None:


class PassthroughCollection(ez.Collection):
INPUT = ez.InputStream(int)
OUTPUT = ez.OutputStream(int)
INPUT = ez.InputTopic(int)
OUTPUT = ez.OutputTopic(int)

def network(self) -> ez.NetworkDefinition:
return ((self.INPUT, self.OUTPUT),)
Expand Down Expand Up @@ -136,7 +136,7 @@ def configure(self) -> None:


class PubNoSubCollection(ez.Collection):
OUTPUT = ez.OutputStream(int)
OUTPUT = ez.OutputTopic(int)
GENERATE = Generator()
LOG = DebugLog()

Expand All @@ -148,7 +148,7 @@ def network(self) -> ez.NetworkDefinition:


class SubNoPubCollection(ez.Collection):
INPUT = ez.InputStream(int)
INPUT = ez.InputTopic(int)
LISTEN = Listener()

def network(self) -> ez.NetworkDefinition:
Expand All @@ -175,7 +175,7 @@ class PubNoSubPassthroughCollection(ez.Collection):
COLLECTION = PubNoSubCollection()
PASSTHROUGH = PassthroughCollection()

OUTPUT = ez.OutputStream(int)
OUTPUT = ez.OutputTopic(int)

def network(self) -> ez.NetworkDefinition:
return (
Expand All @@ -188,7 +188,7 @@ class SubNoPubPassthroughCollection(ez.Collection):
COLLECTION = SubNoPubCollection()
PASSTHROUGH = PassthroughCollection()

INPUT = ez.InputStream(int)
INPUT = ez.InputTopic(int)

def network(self) -> ez.NetworkDefinition:
return (
Expand Down
49 changes: 39 additions & 10 deletions examples/ezmsg_toy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,64 @@ class LFOSettings(ez.Settings):
update_rate: float = 2.0 # Hz, update rate


class LFOState(ez.State):
start_time: float
cur_settings: LFOSettings


class LFO(ez.Unit):
SETTINGS = LFOSettings
STATE = LFOState

OUTPUT = ez.OutputStream(float)

INPUT_SETTINGS = ez.InputStream(LFOSettings)

async def initialize(self) -> None:
self.start_time = time.time()
self.STATE.cur_settings = self.SETTINGS
self.STATE.start_time = time.time()

@ez.subscriber(INPUT_SETTINGS)
async def on_settings(self, msg: LFOSettings) -> None:
self.STATE.cur_settings = msg

@ez.publisher(OUTPUT)
async def generate(self) -> AsyncGenerator:
while True:
t = time.time() - self.start_time
yield self.OUTPUT, math.sin(2.0 * math.pi * self.SETTINGS.freq * t)
await asyncio.sleep(1.0 / self.SETTINGS.update_rate)
t = time.time() - self.STATE.start_time
yield self.OUTPUT, math.sin(2.0 * math.pi * self.STATE.cur_settings.freq * t)
await asyncio.sleep(1.0 / self.STATE.cur_settings.update_rate)


# MESSAGE GENERATOR
class MessageGeneratorSettings(ez.Settings):
message: str


class MessageGeneratorState(ez.State):
cur_settings: MessageGeneratorSettings


class MessageGenerator(ez.Unit):
SETTINGS = MessageGeneratorSettings
STATE = MessageGeneratorState

OUTPUT = ez.OutputStream(str)
INPUT_SETTINGS = ez.InputStream(MessageGeneratorSettings)

async def initialize(self) -> None:
self.STATE.cur_settings = self.SETTINGS

@ez.subscriber(INPUT_SETTINGS)
async def on_settings(self, msg: MessageGeneratorSettings) -> None:
self.STATE.cur_settings = msg

@ez.publisher(OUTPUT)
async def spawn_message(self) -> AsyncGenerator:
while True:
await asyncio.sleep(1.0)
ez.logger.info(f"Spawning {self.SETTINGS.message}")
yield self.OUTPUT, self.SETTINGS.message
ez.logger.info(f"Spawning {self.STATE.cur_settings.message}")
yield self.OUTPUT, self.STATE.cur_settings.message

@ez.publisher(OUTPUT)
async def spawn_once(self) -> AsyncGenerator:
Expand Down Expand Up @@ -123,8 +149,8 @@ class ModifierCollection(ez.Collection):
"""This collection will subscribe to messages
and append the most recent LFO output"""

INPUT = ez.InputStream(str)
OUTPUT = ez.OutputStream(str)
INPUT = ez.InputTopic(str)
OUTPUT = ez.OutputTopic(str)

SIN = LFO()
# SIN2 = LFO()
Expand Down Expand Up @@ -152,6 +178,8 @@ class TestSystemSettings(ez.Settings):
class TestSystem(ez.Collection):
SETTINGS = TestSystemSettings

OUTPUT_PING = ez.OutputTopic(str)

# Publishers
PING = MessageGenerator()
FOO = MessageGenerator()
Expand All @@ -173,6 +201,7 @@ def configure(self) -> None:
# Define Connections
def network(self) -> ez.NetworkDefinition:
return (
(self.PING.OUTPUT, self.OUTPUT_PING),
(self.PING.OUTPUT, self.PINGSUB1.INPUT),
(self.PING.OUTPUT, self.MODIFIER_COLLECTION.INPUT),
(self.MODIFIER_COLLECTION.OUTPUT, self.PINGSUB2.INPUT),
Expand All @@ -193,7 +222,7 @@ def process_components(self):
ez.run(
SYSTEM=system,
connections=[
# Make PING.OUTPUT available on a topic ezmsg_attach.py
(system.PING.OUTPUT, "GLOBAL_PING_TOPIC"),
# Make a system output available on a topic ezmsg_attach.py
(system.OUTPUT_PING, "GLOBAL_PING_TOPIC"),
],
)
Loading
Loading