Skip to content

Commit d11713a

Browse files
fix(adk): forward opts in taskTool.InvokableRun to emit internal events (#654)
1 parent e825e71 commit d11713a

File tree

3 files changed

+186
-1
lines changed

3 files changed

+186
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ output/*
4242
# Trae files
4343
.trae
4444

45+
.DS_Store

adk/prebuilt/deep/deep_test.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.uber.org/mock/gomock"
2626

2727
"github.com/cloudwego/eino/adk"
28+
"github.com/cloudwego/eino/adk/prebuilt/planexecute"
2829
"github.com/cloudwego/eino/components/model"
2930
"github.com/cloudwego/eino/components/tool"
3031
mockModel "github.com/cloudwego/eino/internal/mock/components/model"
@@ -105,3 +106,186 @@ func (s *spySubAgent) Run(ctx context.Context, _ *adk.AgentInput, _ ...adk.Agent
105106
gen.Close()
106107
return it
107108
}
109+
110+
func TestDeepAgentWithPlanExecuteSubAgent_InternalEventsEmitted(t *testing.T) {
111+
ctx := context.Background()
112+
113+
ctrl := gomock.NewController(t)
114+
defer ctrl.Finish()
115+
116+
deepModel := mockModel.NewMockToolCallingChatModel(ctrl)
117+
plannerModel := mockModel.NewMockToolCallingChatModel(ctrl)
118+
executorModel := mockModel.NewMockToolCallingChatModel(ctrl)
119+
replannerModel := mockModel.NewMockToolCallingChatModel(ctrl)
120+
121+
deepModel.EXPECT().WithTools(gomock.Any()).Return(deepModel, nil).AnyTimes()
122+
plannerModel.EXPECT().WithTools(gomock.Any()).Return(plannerModel, nil).AnyTimes()
123+
executorModel.EXPECT().WithTools(gomock.Any()).Return(executorModel, nil).AnyTimes()
124+
replannerModel.EXPECT().WithTools(gomock.Any()).Return(replannerModel, nil).AnyTimes()
125+
126+
plannerModel.EXPECT().Stream(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
127+
func(ctx context.Context, input []*schema.Message, opts ...interface{}) (*schema.StreamReader[*schema.Message], error) {
128+
sr, sw := schema.Pipe[*schema.Message](1)
129+
go func() {
130+
defer sw.Close()
131+
planJSON := `{"steps":["step1"]}`
132+
msg := schema.AssistantMessage("", []schema.ToolCall{
133+
{
134+
ID: "plan_call_1",
135+
Type: "function",
136+
Function: schema.FunctionCall{
137+
Name: "plan",
138+
Arguments: planJSON,
139+
},
140+
},
141+
})
142+
sw.Send(msg, nil)
143+
}()
144+
return sr, nil
145+
},
146+
).Times(1)
147+
148+
executorModel.EXPECT().Generate(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
149+
func(ctx context.Context, msgs []*schema.Message, opts ...model.Option) (*schema.Message, error) {
150+
return schema.AssistantMessage("executed step1", nil), nil
151+
},
152+
).Times(1)
153+
154+
replannerModel.EXPECT().Stream(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
155+
func(ctx context.Context, input []*schema.Message, opts ...interface{}) (*schema.StreamReader[*schema.Message], error) {
156+
sr, sw := schema.Pipe[*schema.Message](1)
157+
go func() {
158+
defer sw.Close()
159+
responseJSON := `{"response":"final response"}`
160+
msg := schema.AssistantMessage("", []schema.ToolCall{
161+
{
162+
ID: "respond_call_1",
163+
Type: "function",
164+
Function: schema.FunctionCall{
165+
Name: "respond",
166+
Arguments: responseJSON,
167+
},
168+
},
169+
})
170+
sw.Send(msg, nil)
171+
}()
172+
return sr, nil
173+
},
174+
).Times(1)
175+
176+
planner, err := planexecute.NewPlanner(ctx, &planexecute.PlannerConfig{
177+
ToolCallingChatModel: plannerModel,
178+
})
179+
assert.NoError(t, err)
180+
181+
executor, err := planexecute.NewExecutor(ctx, &planexecute.ExecutorConfig{
182+
Model: executorModel,
183+
})
184+
assert.NoError(t, err)
185+
186+
replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{
187+
ChatModel: replannerModel,
188+
})
189+
assert.NoError(t, err)
190+
191+
planExecuteAgent, err := planexecute.New(ctx, &planexecute.Config{
192+
Planner: planner,
193+
Executor: executor,
194+
Replanner: replanner,
195+
})
196+
assert.NoError(t, err)
197+
198+
namedPlanExecuteAgent := &namedPlanExecuteAgent{
199+
ResumableAgent: planExecuteAgent,
200+
name: "plan_execute_subagent",
201+
description: "a plan execute subagent",
202+
}
203+
204+
deepModelCalls := 0
205+
deepModel.EXPECT().Generate(gomock.Any(), gomock.Any(), gomock.Any()).
206+
DoAndReturn(func(ctx context.Context, msgs []*schema.Message, opts ...model.Option) (*schema.Message, error) {
207+
deepModelCalls++
208+
if deepModelCalls == 1 {
209+
c := schema.ToolCall{ID: "id-1", Type: "function"}
210+
c.Function.Name = taskToolName
211+
c.Function.Arguments = fmt.Sprintf(`{"subagent_type":"%s","description":"execute the plan"}`, namedPlanExecuteAgent.name)
212+
return schema.AssistantMessage("", []schema.ToolCall{c}), nil
213+
}
214+
return schema.AssistantMessage("done", nil), nil
215+
}).AnyTimes()
216+
217+
deepAgent, err := New(ctx, &Config{
218+
Name: "deep",
219+
Description: "deep agent",
220+
ChatModel: deepModel,
221+
Instruction: "you are deep agent",
222+
SubAgents: []adk.Agent{namedPlanExecuteAgent},
223+
ToolsConfig: adk.ToolsConfig{EmitInternalEvents: true},
224+
MaxIteration: 5,
225+
WithoutWriteTodos: true,
226+
WithoutGeneralSubAgent: true,
227+
})
228+
assert.NoError(t, err)
229+
230+
r := adk.NewRunner(ctx, adk.RunnerConfig{Agent: deepAgent})
231+
it := r.Run(ctx, []adk.Message{schema.UserMessage("hi")})
232+
233+
var events []*adk.AgentEvent
234+
for {
235+
event, ok := it.Next()
236+
if !ok {
237+
break
238+
}
239+
events = append(events, event)
240+
}
241+
242+
assert.Greater(t, len(events), 0, "should have at least one event")
243+
244+
var deepAgentEvents []*adk.AgentEvent
245+
var plannerEvents []*adk.AgentEvent
246+
var executorEvents []*adk.AgentEvent
247+
var replannerEvents []*adk.AgentEvent
248+
var planExecuteEvents []*adk.AgentEvent
249+
250+
for _, event := range events {
251+
switch event.AgentName {
252+
case "deep":
253+
deepAgentEvents = append(deepAgentEvents, event)
254+
case "planner":
255+
plannerEvents = append(plannerEvents, event)
256+
case "executor":
257+
executorEvents = append(executorEvents, event)
258+
case "replanner":
259+
replannerEvents = append(replannerEvents, event)
260+
case "plan_execute_replan", "execute_replan":
261+
planExecuteEvents = append(planExecuteEvents, event)
262+
}
263+
}
264+
265+
assert.Greater(t, len(deepAgentEvents), 0, "should have events from deep agent")
266+
267+
assert.Greater(t, len(plannerEvents), 0, "planner internal events should be emitted when EmitInternalEvents is true")
268+
assert.Greater(t, len(executorEvents), 0, "executor internal events should be emitted when EmitInternalEvents is true")
269+
assert.Greater(t, len(replannerEvents), 0, "replanner internal events should be emitted when EmitInternalEvents is true")
270+
271+
t.Logf("Total events: %d", len(events))
272+
t.Logf("Deep agent events: %d", len(deepAgentEvents))
273+
t.Logf("Planner events: %d", len(plannerEvents))
274+
t.Logf("Executor events: %d", len(executorEvents))
275+
t.Logf("Replanner events: %d", len(replannerEvents))
276+
t.Logf("PlanExecute events: %d", len(planExecuteEvents))
277+
}
278+
279+
type namedPlanExecuteAgent struct {
280+
adk.ResumableAgent
281+
name string
282+
description string
283+
}
284+
285+
func (n *namedPlanExecuteAgent) Name(_ context.Context) string {
286+
return n.name
287+
}
288+
289+
func (n *namedPlanExecuteAgent) Description(_ context.Context) string {
290+
return n.description
291+
}

adk/prebuilt/deep/task_tool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (t *taskTool) InvokableRun(ctx context.Context, argumentsInJSON string, opt
157157
return "", err
158158
}
159159

160-
return a.InvokableRun(ctx, params)
160+
return a.InvokableRun(ctx, params, opts...)
161161
}
162162

163163
func defaultTaskToolDescription(ctx context.Context, subAgents []adk.Agent) (string, error) {

0 commit comments

Comments
 (0)