# Execution control max_steps: int = Field(default=10, description="Maximum steps before termination") current_step: int = Field(default=0, description="Current step in execution")
duplicate_threshold: int = 2
这段代码定义了 BaseAgent 类的核心属性:
基本信息:
name:代理的唯一名称,必须提供
description:可选的代理描述
提示系统:
system_prompt:系统级指令提示,用于初始化代理
next_step_prompt:决定下一步行动的提示
依赖组件:
llm:语言模型实例,默认创建一个新的 LLM 实例
memory:代理的内存存储,用于保存消息历史
state:当前代理状态,默认为 IDLE
执行控制:
max_steps:终止前的最大步骤数,默认为 10
current_step:当前执行步骤,默认为 0
duplicate_threshold:检测重复响应的阈值,默认为 2
1.1.2. 状态管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@asynccontextmanager asyncdefstate_context(self, new_state: AgentState): """Context manager for safe agent state transitions.""" ifnotisinstance(new_state, AgentState): raise ValueError(f"Invalid state: {new_state}")
previous_state = self.state self.state = new_state try: yield except Exception as e: self.state = AgentState.ERROR # Transition to ERROR on failure raise e finally: self.state = previous_state # Revert to previous state
classManus(BrowserAgent): """ A versatile general-purpose agent that uses planning to solve various tasks. This agent extends BrowserAgent with a comprehensive set of tools and capabilities, including Python execution, web browsing, file operations, and information retrieval to handle a wide range of user requests. """
name: str = "Manus" description: str = ( "A versatile agent that can solve various tasks using multiple tools" )
```Python async def think(self) -> bool: """Process current state and decide next actions with appropriate context.""" # Store original prompt original_prompt = self.next_step_prompt
# Only check recent messages (last 3) for browser activity recent_messages = self.memory.messages[-3:] if self.memory.messages else [] browser_in_use = any( "browser_use" in msg.content.lower() for msg in recent_messages if hasattr(msg, "content") and isinstance(msg.content, str) )
if browser_in_use: # Override with browser-specific prompt temporarily to get browser context self.next_step_prompt = BROWSER_NEXT_STEP_PROMPT
# Call parent's think method result = await super().think()
# Restore original prompt self.next_step_prompt = original_prompt
asyncdefexecute(self, input_text: str) -> str: """Execute the planning flow with agents.""" try: ifnotself.primary_agent: raise ValueError("No primary agent available")
# Create initial plan if input provided if input_text: awaitself._create_initial_plan(input_text)
# Verify plan was created successfully ifself.active_plan_id notinself.planning_tool.plans: logger.error( f"Plan creation failed. Plan ID {self.active_plan_id} not found in planning tool." ) returnf"Failed to create plan for: {input_text}"
result = "" whileTrue: # Get current step to execute self.current_step_index, step_info = awaitself._get_current_step_info()
# Exit if no more steps or plan completed ifself.current_step_index isNone: result += awaitself._finalize_plan() break
# Execute current step with appropriate agent step_type = step_info.get("type") if step_info elseNone executor = self.get_executor(step_type) step_result = awaitself._execute_step(executor, step_info) result += step_result + "\n"
# Check if agent wants to terminate ifhasattr(executor, "state") and executor.state == AgentState.FINISHED: break
return result except Exception as e: logger.error(f"Error in PlanningFlow: {str(e)}") returnf"Execution failed: {str(e)}"
asyncdef_create_initial_plan(self, request: str) -> None: """Create an initial plan based on the request using the flow's LLM and PlanningTool.""" logger.info(f"Creating initial plan with ID: {self.active_plan_id}")
# Create a system message for plan creation system_message = Message.system_message( "You are a planning assistant. Create a concise, actionable plan with clear steps. " "Focus on key milestones rather than detailed sub-steps. " "Optimize for clarity and efficiency." )
# Create a user message with the request user_message = Message.user_message( f"Create a reasonable plan with clear steps to accomplish the task: {request}" )
# Process tool calls if present if response.tool_calls: for tool_call in response.tool_calls: if tool_call.function.name == "planning": # Parse the arguments args = tool_call.function.arguments ifisinstance(args, str): try: args = json.loads(args) except json.JSONDecodeError: logger.error(f"Failed to parse tool arguments: {args}") continue
# Ensure plan_id is set correctly and execute the tool args["plan_id"] = self.active_plan_id
# Execute the tool via ToolCollection instead of directly result = awaitself.planning_tool.execute(**args)
asyncdef_execute_step(self, executor: BaseAgent, step_info: dict) -> str: """Execute the current step with the specified agent using agent.run().""" # Prepare context for the agent with current plan status plan_status = awaitself._get_plan_text() step_text = step_info.get("text", f"Step {self.current_step_index}")
# Create a prompt for the agent to execute the current step step_prompt = f""" CURRENT PLAN STATUS: {plan_status} YOUR CURRENT TASK: You are now working on step {self.current_step_index}: "{step_text}" Please execute this step using the appropriate tools. When you're done, provide a summary of what you accomplished. """
# Use agent.run() to execute the step try: step_result = await executor.run(step_prompt)
# Mark the step as completed after successful execution awaitself._mark_step_completed()
def__bool__(self): returnany(getattr(self, field) for field inself.__fields__)
def__add__(self, other: "ToolResult"): defcombine_fields( field: Optional[str], other_field: Optional[str], concatenate: bool = True ): if field and other_field: if concatenate: return field + other_field raise ValueError("Cannot combine tool results") return field or other_field
defget_executor(self, step_type: Optional[str] = None) -> BaseAgent: # If step type is provided and matches an agent key, use that agent if step_type and step_type inself.agents: returnself.agents[step_type]
# Otherwise use the first available executor or fall back to primary agent for key inself.executor_keys: if key inself.agents: returnself.agents[key]
# Fallback to primary agent returnself.primary_agent
4.1.2. 策略模式
不同的代理实现(如 Manus)可以看作是不同的策略,可以根据需要选择不同的代理:
1 2 3 4 5
classManus(BrowserAgent): # ... 特定的实现 ...
asyncdefthink(self) -> bool: # ... 特定的思考策略 ...
4.1.3. 观察者模式
代理的状态变化和工具执行结果通过回调和事件通知机制传递:
1 2 3 4 5
asyncdef_mark_step_completed(self) -> None: # ... 标记步骤完成 ... logger.info( f"Marked step {self.current_step_index} as completed in plan {self.active_plan_id}" )