Skills 与主代理的交互
主代理与 Skills 的交互模式、通信与状态管理
交互机制概述
Skills 与主代理的交互是 Claude Code 系统的核心。主代理负责协调和管理 Skills,Skills 提供具体功能实现。
交互模式
主动调用模式
调用流程
- 任务识别:主代理接收用户请求,分析任务类型和需求
- Skill 选择:从可用 Skills 中选择最合适的 Skill
- 参数准备:准备 Skill 需要的参数和上下文信息
- Skill 调用:调用选定的 Skill,传递参数和上下文
- 结果处理:接收 Skill 执行结果并整合
- 响应生成:生成最终响应返回给用户
代码示例
class MainAgent:
def __init__(self):
self.skills = load_skills()
self.context_manager = ContextManager()
def process_request(self, user_request):
# 任务识别
task = self.analyze_task(user_request)
# Skill 选择
skill = self.select_skill(task)
# 参数准备
context = self.context_manager.collect_context(skill, task)
parameters = self.prepare_parameters(task, context)
# Skill 调用
result = skill.execute(parameters, context)
# 结果处理
processed_result = self.process_result(result, context)
# 响应生成
response = self.generate_response(processed_result)
return response被动调用模式
调用流程
- 用户指定:用户明确指定要使用的 Skill
- 参数验证:验证用户提供的参数是否有效
- 上下文收集:收集 Skill 需要的上下文信息
- Skill 执行:执行指定的 Skill
- 结果返回:直接返回 Skill 执行结果
代码示例
class MainAgent:
def execute_skill(self, skill_name, user_parameters):
# 验证 Skill 存在
if skill_name not in self.skills:
raise SkillNotFoundError(skill_name)
skill = self.skills[skill_name]
# 参数验证
validated_params = skill.validate_parameters(user_parameters)
# 上下文收集
context = self.context_manager.collect_context(skill, validated_params)
# Skill 执行
result = skill.execute(validated_params, context)
# 结果返回
return result嵌套调用模式
调用层次示例
主代理
└─> 部署 Skill
├─> 测试 Skill
│ └─> 代码分析 Skill
│ └─> 文档检查 Skill
├─> 构建 Skill
│ └─> 依赖检查 Skill
└─> 验证 Skill
└─> 健康检查 Skill代码示例
class DeploymentSkill(Skill):
def execute(self, parameters, context):
# 调用测试 Skill
test_result = self.call_skill('test', context)
if not test_result.success:
return DeploymentResult(success=False, error='Tests failed')
# 调用构建 Skill
build_result = self.call_skill('build', context)
if not build_result.success:
return DeploymentResult(success=False, error='Build failed')
# 执行部署
deploy_result = self.deploy(build_result.artifact)
# 调用验证 Skill
verify_result = self.call_skill('verify', context)
return DeploymentResult(
success=verify_result.success,
deploy_result=deploy_result,
verify_result=verify_result,
)通信机制
消息传递
请求消息
{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:00Z",
"type": "skill_request",
"skill_name": "code-review",
"parameters": {
"file": "src/main.py",
"strict": true
},
"context": {
"project": {},
"code": {},
"user": {}
}
}响应消息
{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:15Z",
"type": "skill_response",
"status": "success",
"result": {
"issues": [],
"summary": {}
},
"metadata": {
"execution_time": 15.2,
"memory_used": "256MB"
}
}错误消息
{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:10Z",
"type": "skill_error",
"error": {
"code": "FILE_NOT_FOUND",
"message": "File src/main.py not found",
"details": {}
}
}消息队列
class MessageQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.handlers = {}
async def send(self, message):
await self.queue.put(message)
async def receive(self):
return await self.queue.get()
def register_handler(self, message_type, handler):
self.handlers[message_type] = handler
async def process_messages(self):
while True:
message = await self.receive()
handler = self.handlers.get(message.type)
if handler:
await handler(message)事件驱动
事件类型
## 事件类型
### Skill 事件
- skill_started: Skill 开始执行
- skill_progress: Skill 执行进度更新
- skill_completed: Skill 执行完成
- skill_failed: Skill 执行失败
### 上下文事件
- context_updated: 上下文更新
- context_invalidated: 上下文失效
### 工具事件
- tool_called: 工具被调用
- tool_completed: 工具执行完成
- tool_failed: 工具执行失败事件处理
class EventHandler:
def __init__(self):
self.listeners = defaultdict(list)
def on(self, event_type, callback):
self.listeners[event_type].append(callback)
async def emit(self, event_type, data):
for callback in self.listeners.get(event_type, []):
await callback(data)
async def handle_skill_started(self, event):
print(f"Skill {event.skill_name} started")
async def handle_skill_progress(self, event):
print(f"Progress: {event.progress}%")
async def handle_skill_completed(self, event):
print(f"Skill {event.skill_name} completed")流式通信
流式输出
class StreamingSkill(Skill):
async def execute_stream(self, parameters, context):
# 步骤 1
yield {"step": 1, "message": "Analyzing code..."}
result1 = await self.analyze_code(parameters, context)
# 步骤 2
yield {"step": 2, "message": "Checking security..."}
result2 = await self.check_security(result1, context)
# 步骤 3
yield {"step": 3, "message": "Generating report..."}
result3 = await self.generate_report(result2, context)
# 最终结果
yield {"step": 4, "message": "Completed", "result": result3}流式消费
async def consume_stream(skill, parameters, context):
async for chunk in skill.execute_stream(parameters, context):
if "message" in chunk:
print(chunk["message"])
if "result" in chunk:
return chunk["result"]状态管理
执行状态
状态类型
## 执行状态
### 状态定义
- PENDING: 等待执行
- RUNNING: 正在执行
- PAUSED: 已暂停
- COMPLETED: 已完成
- FAILED: 执行失败
- CANCELLED: 已取消
### 状态转换
PENDING → RUNNING → COMPLETED
PENDING → RUNNING → FAILED
RUNNING → PAUSED → RUNNING
RUNNING → CANCELLED状态管理
class ExecutionState:
def __init__(self):
self.state = 'PENDING'
self.start_time = None
self.end_time = None
self.progress = 0
self.error = None
def start(self):
self.state = 'RUNNING'
self.start_time = datetime.now()
def complete(self):
self.state = 'COMPLETED'
self.end_time = datetime.now()
def fail(self, error):
self.state = 'FAILED'
self.error = error
self.end_time = datetime.now()
def update_progress(self, progress):
self.progress = progress
def get_duration(self):
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return None上下文状态
上下文快照
class ContextSnapshot:
def __init__(self, context):
self.timestamp = datetime.now()
self.context = copy.deepcopy(context)
self.version = self.generate_version()
def generate_version(self):
return hashlib.md5(
json.dumps(self.context, sort_keys=True).encode(),
).hexdigest()
def compare(self, other_snapshot):
return self.version == other_snapshot.version上下文恢复
class ContextManager:
def __init__(self):
self.snapshots = []
self.current_context = {}
def create_snapshot(self):
snapshot = ContextSnapshot(self.current_context)
self.snapshots.append(snapshot)
return snapshot
def restore_snapshot(self, snapshot):
self.current_context = copy.deepcopy(snapshot.context)
def rollback_to(self, version):
for snapshot in reversed(self.snapshots):
if snapshot.version == version:
self.restore_snapshot(snapshot)
return True
return False会话状态
会话管理
class SessionManager:
def __init__(self):
self.sessions = {}
self.current_session_id = None
def create_session(self):
session_id = generate_id()
self.sessions[session_id] = {
'id': session_id,
'created_at': datetime.now(),
'context': {},
'history': [],
'state': 'ACTIVE',
}
self.current_session_id = session_id
return session_id
def get_session(self, session_id):
return self.sessions.get(session_id)
def update_session(self, session_id, updates):
if session_id in self.sessions:
self.sessions[session_id].update(updates)
def close_session(self, session_id):
if session_id in self.sessions:
self.sessions[session_id]['state'] = 'CLOSED'
self.sessions[session_id]['closed_at'] = datetime.now()错误处理
错误传播
错误类型
## 错误类型
### Skill 错误
- SkillNotFoundError: Skill 不存在
- SkillExecutionError: Skill 执行失败
- SkillTimeoutError: Skill 执行超时
### 参数错误
- ParameterValidationError: 参数验证失败
- MissingParameterError: 缺少必需参数
- InvalidParameterError: 参数值无效
### 上下文错误
- ContextNotFoundError: 上下文不存在
- ContextInvalidError: 上下文无效
- ContextTimeoutError: 上下文获取超时错误处理策略
class ErrorHandler:
def __init__(self):
self.retries = {}
self.fallbacks = {}
def handle_error(self, error, context):
error_type = type(error).__name__
# 检查是否应该重试
if self.should_retry(error_type):
return self.retry(error, context)
# 检查是否有回退方案
if self.has_fallback(error_type):
return self.fallback(error, context)
# 否则抛出错误
raise error
def should_retry(self, error_type):
return error_type in self.retries
def retry(self, error, context):
retry_config = self.retries[type(error).__name__]
max_attempts = retry_config.get('max_attempts', 3)
delay = retry_config.get('delay', 1)
attempt = context.get('attempt', 0) + 1
if attempt < max_attempts:
context['attempt'] = attempt
time.sleep(delay * attempt)
return 'RETRY'
return error
def has_fallback(self, error_type):
return error_type in self.fallbacks
def fallback(self, error, context):
fallback_func = self.fallbacks[type(error).__name__]
return fallback_func(error, context)错误恢复
恢复策略
## 恢复策略
### 自动恢复
- 重试机制
- 回退方案
- 降级处理
### 手动恢复
- 用户确认
- 参数修正
- 上下文调整
### 状态恢复
- 快照恢复
- 断点续传
- 事务回滚恢复实现
class RecoveryManager:
def __init__(self):
self.checkpoints = {}
def create_checkpoint(self, execution_id, state):
self.checkpoints[execution_id] = {
'timestamp': datetime.now(),
'state': copy.deepcopy(state),
}
def restore_checkpoint(self, execution_id):
if execution_id in self.checkpoints:
return copy.deepcopy(self.checkpoints[execution_id]['state'])
return None
def recover_from_error(self, error, execution_id):
# 恢复到检查点
state = self.restore_checkpoint(execution_id)
if state:
# 尝试恢复执行
return self.resume_execution(state)
# 如果没有检查点,尝试其他恢复策略
return self.alternative_recovery(error)性能优化
并行执行
class ParallelExecutor:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def execute_parallel(self, tasks):
futures = []
for task in tasks:
future = self.executor.submit(task.execute)
futures.append(future)
results = []
for future in futures:
result = await asyncio.wrap_future(future)
results.append(result)
return results依赖管理
class DependencyManager:
def __init__(self):
self.dependencies = {}
def add_dependency(self, task, depends_on):
if task not in self.dependencies:
self.dependencies[task] = []
self.dependencies[task].extend(depends_on)
def get_execution_order(self, tasks):
order = []
visited = set()
def visit(task):
if task in visited:
return
visited.add(task)
for dep in self.dependencies.get(task, []):
visit(dep)
order.append(task)
for task in tasks:
visit(task)
return order总结
Skills 与主代理的交互机制涵盖交互模式、通信机制、状态管理、错误处理与性能优化。理解这些机制有助于优化性能、增强可靠性、改善体验,并为功能扩展提供基础。