From 7ee60ac0b4d82b6ff1f185f4955bdd735a4e7d83 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 19 Feb 2026 09:45:55 +0000 Subject: [PATCH 1/3] feat: add e2e test for ACP --- e2e/acp_echo.go | 145 ++++++++++++++++++++++++++++++++++++ e2e/echo_test.go | 28 +++++++ e2e/testdata/acp_basic.json | 6 ++ 3 files changed, 179 insertions(+) create mode 100644 e2e/acp_echo.go create mode 100644 e2e/testdata/acp_basic.json diff --git a/e2e/acp_echo.go b/e2e/acp_echo.go new file mode 100644 index 00000000..a7986e75 --- /dev/null +++ b/e2e/acp_echo.go @@ -0,0 +1,145 @@ +//go:build ignore + +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "strings" + + acp "github.com/coder/acp-go-sdk" +) + +// ScriptEntry defines a single entry in the test script. +type ScriptEntry struct { + ExpectMessage string `json:"expectMessage"` + ThinkDurationMS int64 `json:"thinkDurationMS"` + ResponseMessage string `json:"responseMessage"` +} + +// acpEchoAgent implements the ACP Agent interface for testing. +type acpEchoAgent struct { + script []ScriptEntry + scriptIndex int + conn *acp.AgentSideConnection + sessionID acp.SessionId +} + +var _ acp.Agent = (*acpEchoAgent)(nil) + +func main() { + if len(os.Args) != 2 { + fmt.Fprintln(os.Stderr, "Usage: acp_echo ") + os.Exit(1) + } + + script, err := loadScript(os.Args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "Error loading script: %v\n", err) + os.Exit(1) + } + + if len(script) == 0 { + fmt.Fprintln(os.Stderr, "Script is empty") + os.Exit(1) + } + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + go func() { + <-sigCh + os.Exit(0) + }() + + agent := &acpEchoAgent{ + script: script, + } + + conn := acp.NewAgentSideConnection(agent, os.Stdout, os.Stdin) + agent.conn = conn + + <-conn.Done() +} + +func (a *acpEchoAgent) Initialize(_ context.Context, _ acp.InitializeRequest) (acp.InitializeResponse, error) { + return acp.InitializeResponse{ + ProtocolVersion: acp.ProtocolVersionNumber, + AgentCapabilities: acp.AgentCapabilities{}, + }, nil +} + +func (a *acpEchoAgent) Authenticate(_ context.Context, _ acp.AuthenticateRequest) (acp.AuthenticateResponse, error) { + return acp.AuthenticateResponse{}, nil +} + +func (a *acpEchoAgent) Cancel(_ context.Context, _ acp.CancelNotification) error { + return nil +} + +func (a *acpEchoAgent) NewSession(_ context.Context, _ acp.NewSessionRequest) (acp.NewSessionResponse, error) { + a.sessionID = "test-session" + return acp.NewSessionResponse{ + SessionId: a.sessionID, + }, nil +} + +func (a *acpEchoAgent) Prompt(ctx context.Context, params acp.PromptRequest) (acp.PromptResponse, error) { + // Extract text from prompt + var promptText string + for _, block := range params.Prompt { + if block.Text != nil { + promptText = block.Text.Text + break + } + } + promptText = strings.TrimSpace(promptText) + + if a.scriptIndex >= len(a.script) { + return acp.PromptResponse{ + StopReason: acp.StopReasonEndTurn, + }, nil + } + + entry := a.script[a.scriptIndex] + expected := strings.TrimSpace(entry.ExpectMessage) + + // Empty ExpectMessage matches any prompt + if expected != "" && expected != promptText { + return acp.PromptResponse{}, fmt.Errorf("expected message %q but got %q", expected, promptText) + } + + a.scriptIndex++ + + // Send response via session update + if err := a.conn.SessionUpdate(ctx, acp.SessionNotification{ + SessionId: params.SessionId, + Update: acp.UpdateAgentMessageText(entry.ResponseMessage), + }); err != nil { + return acp.PromptResponse{}, err + } + + return acp.PromptResponse{ + StopReason: acp.StopReasonEndTurn, + }, nil +} + +func (a *acpEchoAgent) SetSessionMode(_ context.Context, _ acp.SetSessionModeRequest) (acp.SetSessionModeResponse, error) { + return acp.SetSessionModeResponse{}, nil +} + +func loadScript(scriptPath string) ([]ScriptEntry, error) { + data, err := os.ReadFile(scriptPath) + if err != nil { + return nil, fmt.Errorf("failed to read script file: %w", err) + } + + var script []ScriptEntry + if err := json.Unmarshal(data, &script); err != nil { + return nil, fmt.Errorf("failed to parse script JSON: %w", err) + } + + return script, nil +} diff --git a/e2e/echo_test.go b/e2e/echo_test.go index 765521cf..fd44d32a 100644 --- a/e2e/echo_test.go +++ b/e2e/echo_test.go @@ -100,6 +100,34 @@ func TestE2E(t *testing.T) { require.Equal(t, script[0].ExpectMessage, strings.TrimSpace(msgResp.Messages[1].Content)) require.Equal(t, script[0].ResponseMessage, strings.TrimSpace(msgResp.Messages[2].Content)) }) + + t.Run("acp_basic", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + script, apiClient := setup(ctx, t, ¶ms{ + cmdFn: func(ctx context.Context, t testing.TB, serverPort int, binaryPath, cwd, scriptFilePath string) (string, []string) { + return binaryPath, []string{ + "server", + fmt.Sprintf("--port=%d", serverPort), + "--experimental-acp", + "--", "go", "run", filepath.Join(cwd, "acp_echo.go"), scriptFilePath, + } + }, + }) + messageReq := agentapisdk.PostMessageParams{ + Content: "This is a test message.", + Type: agentapisdk.MessageTypeUser, + } + _, err := apiClient.PostMessage(ctx, messageReq) + require.NoError(t, err, "Failed to send message via SDK") + require.NoError(t, waitAgentAPIStable(ctx, t, apiClient, operationTimeout, "post message")) + msgResp, err := apiClient.GetMessages(ctx) + require.NoError(t, err, "Failed to get messages via SDK") + require.Len(t, msgResp.Messages, 2) + require.Equal(t, script[0].ExpectMessage, strings.TrimSpace(msgResp.Messages[0].Content)) + require.Equal(t, script[0].ResponseMessage, strings.TrimSpace(msgResp.Messages[1].Content)) + }) } type params struct { diff --git a/e2e/testdata/acp_basic.json b/e2e/testdata/acp_basic.json new file mode 100644 index 00000000..22dd8d98 --- /dev/null +++ b/e2e/testdata/acp_basic.json @@ -0,0 +1,6 @@ +[ + { + "expectMessage": "This is a test message.", + "responseMessage": "Echo: This is a test message." + } +] From a33d95e883b85b201672996be0a7c83f1ee917ec Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 25 Feb 2026 17:49:43 +0000 Subject: [PATCH 2/3] fix race in ACPConversation: wait for chunk after Prompt() returns --- x/acpio/acp_conversation.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go index 3d514f2f..2ee2a0a5 100644 --- a/x/acpio/acp_conversation.go +++ b/x/acpio/acp_conversation.go @@ -6,6 +6,7 @@ import ( "slices" "strings" "sync" + "time" st "github.com/coder/agentapi/lib/screentracker" "github.com/coder/quartz" @@ -31,7 +32,8 @@ type ACPConversation struct { agentIO ChunkableAgentIO messages []st.ConversationMessage nextID int // monotonically increasing message ID - prompting bool // true while agent is processing + prompting bool // true while agent is processing + chunkReceived chan struct{} // signals that handleChunk has accumulated a chunk streamingResponse strings.Builder logger *slog.Logger emitter st.Emitter @@ -68,6 +70,7 @@ func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *s initialPrompt: initialPrompt, emitter: emitter, clock: clock, + chunkReceived: make(chan struct{}, 1), } return c } @@ -202,6 +205,12 @@ func (c *ACPConversation) handleChunk(chunk string) { screen := c.streamingResponse.String() c.mu.Unlock() + // Signal that a chunk has been received (non-blocking; a pending signal is sufficient). + select { + case c.chunkReceived <- struct{}{}: + default: + } + c.emitter.EmitMessages(messages) c.emitter.EmitStatus(status) c.emitter.EmitScreen(screen) @@ -209,6 +218,12 @@ func (c *ACPConversation) handleChunk(chunk string) { // executePrompt runs the actual agent request and returns any error. func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error { + // Drain any stale signal before sending the prompt. + select { + case <-c.chunkReceived: + default: + } + var err error for _, part := range messageParts { if c.ctx.Err() != nil { @@ -221,6 +236,13 @@ func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error { } } + // The ACP SDK dispatches SessionUpdate notifications as goroutines, so + // the chunk may arrive after conn.Prompt() returns. Wait up to 100ms. + select { + case <-c.chunkReceived: + case <-time.After(100 * time.Millisecond): + } + c.mu.Lock() c.prompting = false From 587fbba2bdcafe904426608d53f802fd1350b515 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 25 Feb 2026 18:07:20 +0000 Subject: [PATCH 3/3] fix: use clock.NewTimer instead of time.After to avoid resource leak --- x/acpio/acp_conversation.go | 4 +++- x/acpio/acp_conversation_test.go | 32 +++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go index 2ee2a0a5..ed742eea 100644 --- a/x/acpio/acp_conversation.go +++ b/x/acpio/acp_conversation.go @@ -238,10 +238,12 @@ func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error { // The ACP SDK dispatches SessionUpdate notifications as goroutines, so // the chunk may arrive after conn.Prompt() returns. Wait up to 100ms. + timer := c.clock.NewTimer(100 * time.Millisecond) select { case <-c.chunkReceived: - case <-time.After(100 * time.Millisecond): + case <-timer.C: } + timer.Stop() c.mu.Lock() c.prompting = false diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go index 0632dd17..b662ee0f 100644 --- a/x/acpio/acp_conversation_test.go +++ b/x/acpio/acp_conversation_test.go @@ -227,6 +227,9 @@ func Test_Send_AddsUserMessage(t *testing.T) { assert.Equal(t, "hello", messages[0].Message) assert.Equal(t, screentracker.ConversationRoleAgent, messages[1].Role) + // Signal a chunk so executePrompt's timer wait doesn't hang on the mock clock. + mock.SimulateChunks("hello response") + // Unblock the write to let Send complete close(done) require.NoError(t, <-errCh) @@ -290,6 +293,9 @@ func Test_Send_RejectsDuplicateSend(t *testing.T) { err := conv.Send(screentracker.MessagePartText{Content: "second"}) assert.ErrorIs(t, err, screentracker.ErrMessageValidationChanging) + // Signal a chunk so executePrompt's timer wait doesn't hang on the mock clock. + mock.SimulateChunks("first response") + // Unblock the write to let the test complete cleanly close(done) require.NoError(t, <-errCh) @@ -318,6 +324,9 @@ func Test_Status_ChangesWhileProcessing(t *testing.T) { // Status should be changing while processing assert.Equal(t, screentracker.ConversationStatusChanging, conv.Status()) + // Signal a chunk so executePrompt's timer wait doesn't hang on the mock clock. + mock.SimulateChunks("test response") + // Unblock the write close(done) @@ -428,6 +437,9 @@ func Test_InitialPrompt_SentOnStart(t *testing.T) { assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) assert.Equal(t, "initial prompt", messages[0].Message) + // Signal a chunk so executePrompt's timer wait doesn't hang on the mock clock. + mock.SimulateChunks("initial response") + // Unblock the write to let the test complete cleanly close(done) } @@ -457,6 +469,9 @@ func Test_Messages_AreCopied(t *testing.T) { originalMessages := conv.Messages() assert.Equal(t, "test", originalMessages[0].Message) + // Signal a chunk so executePrompt's timer wait doesn't hang on the mock clock. + mock.SimulateChunks("test response") + // Unblock the write to let Send complete close(done) require.NoError(t, <-errCh) @@ -518,12 +533,15 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { // Send a second message — IDs must not reuse the removed agent message's ID (1). mock.mu.Lock() mock.writeErr = nil - mock.writeBlock = nil - mock.writeStarted = nil mock.mu.Unlock() - - err := conv.Send(screentracker.MessagePartText{Content: "retry"}) - require.NoError(t, err) + started2, done2 := mock.BlockWrite() + errCh2 := make(chan error, 1) + go func() { errCh2 <- conv.Send(screentracker.MessagePartText{Content: "retry"}) }() + <-started2 + // Signal a chunk so executePrompt's timer wait doesn't hang on the mock clock. + mock.SimulateChunks("retry response") + close(done2) + require.NoError(t, <-errCh2) messages = conv.Messages() require.Len(t, messages, 3, "first user + second user + second agent") @@ -548,6 +566,10 @@ func Test_LateChunkAfterError_DoesNotCorruptUserMessage(t *testing.T) { mock.mu.Lock() mock.writeErr = assert.AnError mock.mu.Unlock() + + // Signal a chunk before unblocking; the error path still waits on chunkReceived + // or the timer, so pre-signaling avoids a hang on the mock clock. + mock.SimulateChunks("unexpected chunk") close(done) require.ErrorIs(t, <-errCh, assert.AnError)