Skills 与主代理的交互

主代理与 Skills 的交互模式、通信与状态管理

交互机制概述

Skills 与主代理的交互是 Claude Code 系统的核心。主代理负责协调和管理 Skills,Skills 提供具体功能实现。

交互模式

主动调用模式

调用流程

  • 任务识别:主代理接收用户请求,分析任务类型和需求
  • Skill 选择:从可用 Skills 中选择最合适的 Skill
  • 参数准备:准备 Skill 需要的参数和上下文信息
  • Skill 调用:调用选定的 Skill,传递参数和上下文
  • 结果处理:接收 Skill 执行结果并整合
  • 响应生成:生成最终响应返回给用户

代码示例

class MainAgent:
  def __init__(self):
    self.skills = load_skills()
    self.context_manager = ContextManager()

  def process_request(self, user_request):
    # 任务识别
    task = self.analyze_task(user_request)

    # Skill 选择
    skill = self.select_skill(task)

    # 参数准备
    context = self.context_manager.collect_context(skill, task)
    parameters = self.prepare_parameters(task, context)

    # Skill 调用
    result = skill.execute(parameters, context)

    # 结果处理
    processed_result = self.process_result(result, context)

    # 响应生成
    response = self.generate_response(processed_result)

    return response

被动调用模式

调用流程

  • 用户指定:用户明确指定要使用的 Skill
  • 参数验证:验证用户提供的参数是否有效
  • 上下文收集:收集 Skill 需要的上下文信息
  • Skill 执行:执行指定的 Skill
  • 结果返回:直接返回 Skill 执行结果

代码示例

class MainAgent:
  def execute_skill(self, skill_name, user_parameters):
    # 验证 Skill 存在
    if skill_name not in self.skills:
      raise SkillNotFoundError(skill_name)

    skill = self.skills[skill_name]

    # 参数验证
    validated_params = skill.validate_parameters(user_parameters)

    # 上下文收集
    context = self.context_manager.collect_context(skill, validated_params)

    # Skill 执行
    result = skill.execute(validated_params, context)

    # 结果返回
    return result

嵌套调用模式

调用层次示例

主代理
└─> 部署 Skill
    ├─> 测试 Skill
    │   └─> 代码分析 Skill
    │       └─> 文档检查 Skill
    ├─> 构建 Skill
    │   └─> 依赖检查 Skill
    └─> 验证 Skill
        └─> 健康检查 Skill

代码示例

class DeploymentSkill(Skill):
  def execute(self, parameters, context):
    # 调用测试 Skill
    test_result = self.call_skill('test', context)

    if not test_result.success:
      return DeploymentResult(success=False, error='Tests failed')

    # 调用构建 Skill
    build_result = self.call_skill('build', context)

    if not build_result.success:
      return DeploymentResult(success=False, error='Build failed')

    # 执行部署
    deploy_result = self.deploy(build_result.artifact)

    # 调用验证 Skill
    verify_result = self.call_skill('verify', context)

    return DeploymentResult(
      success=verify_result.success,
      deploy_result=deploy_result,
      verify_result=verify_result,
    )

通信机制

消息传递

请求消息

{
  "message_id": "msg_123456",
  "timestamp": "2024-01-15T10:30:00Z",
  "type": "skill_request",
  "skill_name": "code-review",
  "parameters": {
    "file": "src/main.py",
    "strict": true
  },
  "context": {
    "project": {},
    "code": {},
    "user": {}
  }
}

响应消息

{
  "message_id": "msg_123456",
  "timestamp": "2024-01-15T10:30:15Z",
  "type": "skill_response",
  "status": "success",
  "result": {
    "issues": [],
    "summary": {}
  },
  "metadata": {
    "execution_time": 15.2,
    "memory_used": "256MB"
  }
}

错误消息

{
  "message_id": "msg_123456",
  "timestamp": "2024-01-15T10:30:10Z",
  "type": "skill_error",
  "error": {
    "code": "FILE_NOT_FOUND",
    "message": "File src/main.py not found",
    "details": {}
  }
}

消息队列

class MessageQueue:
  def __init__(self):
    self.queue = asyncio.Queue()
    self.handlers = {}

  async def send(self, message):
    await self.queue.put(message)

  async def receive(self):
    return await self.queue.get()

  def register_handler(self, message_type, handler):
    self.handlers[message_type] = handler

  async def process_messages(self):
    while True:
      message = await self.receive()
      handler = self.handlers.get(message.type)
      if handler:
        await handler(message)

事件驱动

事件类型

## 事件类型

### Skill 事件
- skill_started: Skill 开始执行
- skill_progress: Skill 执行进度更新
- skill_completed: Skill 执行完成
- skill_failed: Skill 执行失败

### 上下文事件
- context_updated: 上下文更新
- context_invalidated: 上下文失效

### 工具事件
- tool_called: 工具被调用
- tool_completed: 工具执行完成
- tool_failed: 工具执行失败

事件处理

class EventHandler:
  def __init__(self):
    self.listeners = defaultdict(list)

  def on(self, event_type, callback):
    self.listeners[event_type].append(callback)

  async def emit(self, event_type, data):
    for callback in self.listeners.get(event_type, []):
      await callback(data)

  async def handle_skill_started(self, event):
    print(f"Skill {event.skill_name} started")

  async def handle_skill_progress(self, event):
    print(f"Progress: {event.progress}%")

  async def handle_skill_completed(self, event):
    print(f"Skill {event.skill_name} completed")

流式通信

流式输出

class StreamingSkill(Skill):
  async def execute_stream(self, parameters, context):
    # 步骤 1
    yield {"step": 1, "message": "Analyzing code..."}
    result1 = await self.analyze_code(parameters, context)

    # 步骤 2
    yield {"step": 2, "message": "Checking security..."}
    result2 = await self.check_security(result1, context)

    # 步骤 3
    yield {"step": 3, "message": "Generating report..."}
    result3 = await self.generate_report(result2, context)

    # 最终结果
    yield {"step": 4, "message": "Completed", "result": result3}

流式消费

async def consume_stream(skill, parameters, context):
  async for chunk in skill.execute_stream(parameters, context):
    if "message" in chunk:
      print(chunk["message"])
    if "result" in chunk:
      return chunk["result"]

状态管理

执行状态

状态类型

## 执行状态

### 状态定义
- PENDING: 等待执行
- RUNNING: 正在执行
- PAUSED: 已暂停
- COMPLETED: 已完成
- FAILED: 执行失败
- CANCELLED: 已取消

### 状态转换
PENDING → RUNNING → COMPLETED
PENDING → RUNNING → FAILED
RUNNING → PAUSED → RUNNING
RUNNING → CANCELLED

状态管理

class ExecutionState:
  def __init__(self):
    self.state = 'PENDING'
    self.start_time = None
    self.end_time = None
    self.progress = 0
    self.error = None

  def start(self):
    self.state = 'RUNNING'
    self.start_time = datetime.now()

  def complete(self):
    self.state = 'COMPLETED'
    self.end_time = datetime.now()

  def fail(self, error):
    self.state = 'FAILED'
    self.error = error
    self.end_time = datetime.now()

  def update_progress(self, progress):
    self.progress = progress

  def get_duration(self):
    if self.start_time and self.end_time:
      return (self.end_time - self.start_time).total_seconds()
    return None

上下文状态

上下文快照

class ContextSnapshot:
  def __init__(self, context):
    self.timestamp = datetime.now()
    self.context = copy.deepcopy(context)
    self.version = self.generate_version()

  def generate_version(self):
    return hashlib.md5(
      json.dumps(self.context, sort_keys=True).encode(),
    ).hexdigest()

  def compare(self, other_snapshot):
    return self.version == other_snapshot.version

上下文恢复

class ContextManager:
  def __init__(self):
    self.snapshots = []
    self.current_context = {}

  def create_snapshot(self):
    snapshot = ContextSnapshot(self.current_context)
    self.snapshots.append(snapshot)
    return snapshot

  def restore_snapshot(self, snapshot):
    self.current_context = copy.deepcopy(snapshot.context)

  def rollback_to(self, version):
    for snapshot in reversed(self.snapshots):
      if snapshot.version == version:
        self.restore_snapshot(snapshot)
        return True
    return False

会话状态

会话管理

class SessionManager:
  def __init__(self):
    self.sessions = {}
    self.current_session_id = None

  def create_session(self):
    session_id = generate_id()
    self.sessions[session_id] = {
      'id': session_id,
      'created_at': datetime.now(),
      'context': {},
      'history': [],
      'state': 'ACTIVE',
    }
    self.current_session_id = session_id
    return session_id

  def get_session(self, session_id):
    return self.sessions.get(session_id)

  def update_session(self, session_id, updates):
    if session_id in self.sessions:
      self.sessions[session_id].update(updates)

  def close_session(self, session_id):
    if session_id in self.sessions:
      self.sessions[session_id]['state'] = 'CLOSED'
      self.sessions[session_id]['closed_at'] = datetime.now()

错误处理

错误传播

错误类型

## 错误类型

### Skill 错误
- SkillNotFoundError: Skill 不存在
- SkillExecutionError: Skill 执行失败
- SkillTimeoutError: Skill 执行超时

### 参数错误
- ParameterValidationError: 参数验证失败
- MissingParameterError: 缺少必需参数
- InvalidParameterError: 参数值无效

### 上下文错误
- ContextNotFoundError: 上下文不存在
- ContextInvalidError: 上下文无效
- ContextTimeoutError: 上下文获取超时

错误处理策略

class ErrorHandler:
  def __init__(self):
    self.retries = {}
    self.fallbacks = {}

  def handle_error(self, error, context):
    error_type = type(error).__name__

    # 检查是否应该重试
    if self.should_retry(error_type):
      return self.retry(error, context)

    # 检查是否有回退方案
    if self.has_fallback(error_type):
      return self.fallback(error, context)

    # 否则抛出错误
    raise error

  def should_retry(self, error_type):
    return error_type in self.retries

  def retry(self, error, context):
    retry_config = self.retries[type(error).__name__]
    max_attempts = retry_config.get('max_attempts', 3)
    delay = retry_config.get('delay', 1)

    attempt = context.get('attempt', 0) + 1
    if attempt < max_attempts:
      context['attempt'] = attempt
      time.sleep(delay * attempt)
      return 'RETRY'

    return error

  def has_fallback(self, error_type):
    return error_type in self.fallbacks

  def fallback(self, error, context):
    fallback_func = self.fallbacks[type(error).__name__]
    return fallback_func(error, context)

错误恢复

恢复策略

## 恢复策略

### 自动恢复
- 重试机制
- 回退方案
- 降级处理

### 手动恢复
- 用户确认
- 参数修正
- 上下文调整

### 状态恢复
- 快照恢复
- 断点续传
- 事务回滚

恢复实现

class RecoveryManager:
  def __init__(self):
    self.checkpoints = {}

  def create_checkpoint(self, execution_id, state):
    self.checkpoints[execution_id] = {
      'timestamp': datetime.now(),
      'state': copy.deepcopy(state),
    }

  def restore_checkpoint(self, execution_id):
    if execution_id in self.checkpoints:
      return copy.deepcopy(self.checkpoints[execution_id]['state'])
    return None

  def recover_from_error(self, error, execution_id):
    # 恢复到检查点
    state = self.restore_checkpoint(execution_id)
    if state:
      # 尝试恢复执行
      return self.resume_execution(state)

    # 如果没有检查点,尝试其他恢复策略
    return self.alternative_recovery(error)

性能优化

并行执行

class ParallelExecutor:
  def __init__(self, max_workers=4):
    self.max_workers = max_workers
    self.executor = ThreadPoolExecutor(max_workers=max_workers)

  async def execute_parallel(self, tasks):
    futures = []
    for task in tasks:
      future = self.executor.submit(task.execute)
      futures.append(future)

    results = []
    for future in futures:
      result = await asyncio.wrap_future(future)
      results.append(result)

    return results

依赖管理

class DependencyManager:
  def __init__(self):
    self.dependencies = {}

  def add_dependency(self, task, depends_on):
    if task not in self.dependencies:
      self.dependencies[task] = []
    self.dependencies[task].extend(depends_on)

  def get_execution_order(self, tasks):
    order = []
    visited = set()

    def visit(task):
      if task in visited:
        return
      visited.add(task)

      for dep in self.dependencies.get(task, []):
        visit(dep)

      order.append(task)

    for task in tasks:
      visit(task)

    return order

总结

Skills 与主代理的交互机制涵盖交互模式、通信机制、状态管理、错误处理与性能优化。理解这些机制有助于优化性能、增强可靠性、改善体验,并为功能扩展提供基础。

On this page