Skills 性能优化

缓存、并行、增量处理与监控策略

性能优化概述

Skills 的性能优化是提高 Claude Code 整体效率的关键,本节汇总常见优化策略与实现示例。

优化策略

缓存优化

多级缓存

缓存层次

  • L1 缓存(内存):最快访问速度、最小容量、存储热点数据
  • L2 缓存(本地磁盘):中等访问速度、中等容量、存储温数据
  • L3 缓存(远程存储):较慢访问速度、最大容量、存储冷数据

缓存策略

  • LRU(最近最少使用)
  • LFU(最不经常使用)
  • TTL(生存时间)
  • 手动失效

缓存实现

class MultiLevelCache:
  def __init__(self):
    self.l1_cache = LRUCache(maxsize=100)
    self.l2_cache = DiskCache(cache_dir='/tmp/skills_cache')
    self.l3_cache = RedisCache(host='localhost', port=6379)

  async def get(self, key):
    # L1 缓存
    value = self.l1_cache.get(key)
    if value is not None:
      return value

    # L2 缓存
    value = await self.l2_cache.get(key)
    if value is not None:
      self.l1_cache[key] = value
      return value

    # L3 缓存
    value = await self.l3_cache.get(key)
    if value is not None:
      self.l1_cache[key] = value
      await self.l2_cache.set(key, value)
      return value

    return None

  async def set(self, key, value, ttl=3600):
    # 设置所有级别的缓存
    self.l1_cache[key] = value
    await self.l2_cache.set(key, value, ttl)
    await self.l3_cache.set(key, value, ttl)

  async def invalidate(self, key):
    # 使所有级别的缓存失效
    if key in self.l1_cache:
      del self.l1_cache[key]
    await self.l2_cache.delete(key)
    await self.l3_cache.delete(key)

缓存键设计

class CacheKeyGenerator:
  @staticmethod
  def generate(skill_name, parameters, context_version):
    key_parts = [
      skill_name,
      hashlib.md5(json.dumps(parameters, sort_keys=True).encode()).hexdigest(),
      context_version,
    ]
    return ':'.join(key_parts)

  @staticmethod
  def generate_context_key(context):
    key_parts = [
      'context',
      hashlib.md5(json.dumps(context, sort_keys=True).encode()).hexdigest(),
    ]
    return ':'.join(key_parts)

并行执行

任务并行

class ParallelSkillExecutor:
  def __init__(self, max_workers=4):
    self.max_workers = max_workers
    self.executor = ThreadPoolExecutor(max_workers=max_workers)

  async def execute_parallel(self, tasks):
    futures = []
    for task in tasks:
      future = self.executor.submit(task.execute)
      futures.append(future)

    results = []
    for future in as_completed(futures):
      result = await asyncio.wrap_future(future)
      results.append(result)

    return results

  async def execute_parallel_with_order(self, tasks):
    loop = asyncio.get_event_loop()
    futures = [loop.run_in_executor(self.executor, task.execute) for task in tasks]
    results = await asyncio.gather(*futures)
    return results

流水线执行

class PipelineExecutor:
  def __init__(self, stages):
    self.stages = stages
    self.queues = [asyncio.Queue() for _ in range(len(stages) + 1)]

  async def add_task(self, task):
    await self.queues[0].put(task)

  async def worker(self, stage_index):
    while True:
      task = await self.queues[stage_index].get()
      result = await self.stages[stage_index].process(task)
      await self.queues[stage_index + 1].put(result)

  async def run(self):
    workers = []
    for i in range(len(self.stages)):
      worker = asyncio.create_task(self.worker(i))
      workers.append(worker)
    await asyncio.gather(*workers)

增量处理

变更检测

class ChangeDetector:
  def __init__(self):
    self.file_hashes = {}
    self.context_hashes = {}

  def detect_file_changes(self, files):
    changes = []
    for file in files:
      current_hash = self.calculate_file_hash(file)
      previous_hash = self.file_hashes.get(file)

      if previous_hash is None:
        changes.append({'type': 'added', 'file': file})
      elif current_hash != previous_hash:
        changes.append({'type': 'modified', 'file': file})

      self.file_hashes[file] = current_hash

    return changes

  def calculate_file_hash(self, file_path):
    with open(file_path, 'rb') as f:
      return hashlib.md5(f.read()).hexdigest()

增量更新

class IncrementalUpdater:
  def __init__(self):
    self.previous_context = None
    self.previous_results = None

  async def update(self, new_context, changes):
    if self.previous_context is None:
      return await self.full_process(new_context)

    # 只处理变更的部分
    updated_results = copy.deepcopy(self.previous_results)
    for change in changes:
      if change['type'] == 'modified':
        result = await self.process_change(change)
        updated_results[change['file']] = result
      elif change['type'] == 'added':
        result = await self.process_change(change)
        updated_results[change['file']] = result
      elif change['type'] == 'deleted':
        del updated_results[change['file']]

    self.previous_context = new_context
    self.previous_results = updated_results
    return updated_results

资源优化

内存管理

class MemoryManager:
  def __init__(self, max_memory_mb=1024):
    self.max_memory = max_memory_mb * 1024 * 1024
    self.current_memory = 0
    self.allocations = {}

  def allocate(self, key, size):
    if self.current_memory + size > self.max_memory:
      self.free_memory(size)

    self.allocations[key] = size
    self.current_memory += size

  def free_memory(self, required_size):
    freed = 0
    for key in list(self.allocations.keys()):
      if freed >= required_size:
        break
      freed += self.allocations[key]
      del self.allocations[key]

    self.current_memory -= freed

  def get_memory_usage(self):
    return self.current_memory / (1024 * 1024)

连接池

class ConnectionPool:
  def __init__(self, max_connections=10):
    self.max_connections = max_connections
    self.connections = []
    self.available = asyncio.Queue()
    self.lock = asyncio.Lock()

  async def acquire(self):
    if not self.available.empty():
      return await self.available.get()

    async with self.lock:
      if len(self.connections) < self.max_connections:
        conn = await self.create_connection()
        self.connections.append(conn)
        return conn

    return await self.available.get()

  async def release(self, connection):
    await self.available.put(connection)

  async def create_connection(self):
    return await self.connect_to_service()

算法优化

常见算法复杂度

## 常见算法复杂度

### O(1) - 常数时间
- 数组索引访问
- 哈希表查找
- 栈操作

### O(log n) - 对数时间
- 二分查找
- 平衡树操作
- 堆操作

### O(n) - 线性时间
- 数组遍历
- 链表遍历
- 简单排序

### O(n log n) - 线性对数时间
- 快速排序
- 归并排序
- 堆排序

### O(n²) - 平方时间
- 冒泡排序
- 嵌套循环
- 矩阵乘法

优化示例

class OptimizedCodeAnalyzer:
  def __init__(self):
    self.symbol_index = {}
    self.dependency_graph = {}

  def analyze(self, code):
    # 使用索引加速查找
    symbols = self.extract_symbols(code)
    for symbol in symbols:
      if symbol not in self.symbol_index:
        self.symbol_index[symbol] = []
      self.symbol_index[symbol].append(code)

    # 使用图表示依赖关系
    dependencies = self.extract_dependencies(code)
    for dep in dependencies:
      if dep not in self.dependency_graph:
        self.dependency_graph[dep] = []
      self.dependency_graph[dep].append(code)

  def find_usages(self, symbol):
    # O(1) 查找
    return self.symbol_index.get(symbol, [])

  def find_dependents(self, code):
    # O(1) 查找
    return self.dependency_graph.get(code, [])

数据结构优化

数据结构选择

## 数据结构选择指南

### 数组
- 适合:随机访问、固定大小
- 不适合:频繁插入/删除

### 链表
- 适合:频繁插入/删除
- 不适合:随机访问

### 哈希表
- 适合:快速查找、键值对
- 不适合:有序遍历

### 树
- 适合:有序数据、层次结构
- 不适合:简单数据

### 图
- 适合:关系数据、网络结构
- 不适合:简单线性数据

优化实现

class OptimizedContextStore:
  def __init__(self):
    self.file_index = {}
    self.symbol_index = defaultdict(list)
    self.dependency_graph = defaultdict(set)
    self.reverse_dependency_graph = defaultdict(set)

  def add_file(self, file_path, content):
    self.file_index[file_path] = content
    symbols = self.extract_symbols(content)
    for symbol in symbols:
      self.symbol_index[symbol].append(file_path)

    dependencies = self.extract_dependencies(content)
    for dep in dependencies:
      self.dependency_graph[file_path].add(dep)
      self.reverse_dependency_graph[dep].add(file_path)

  def get_files_by_symbol(self, symbol):
    return self.symbol_index.get(symbol, [])

  def get_dependents(self, file_path):
    return self.reverse_dependency_graph.get(file_path, set())

  def get_dependencies(self, file_path):
    return self.dependency_graph.get(file_path, set())

性能监控

指标收集

性能指标

class PerformanceMonitor:
  def __init__(self):
    self.metrics = defaultdict(list)

  def record_execution_time(self, skill_name, duration):
    self.metrics[f"{skill_name}_execution_time"].append({
      'value': duration,
      'timestamp': datetime.now(),
    })

  def record_memory_usage(self, skill_name, memory_mb):
    self.metrics[f"{skill_name}_memory_usage"].append({
      'value': memory_mb,
      'timestamp': datetime.now(),
    })

  def record_cache_hit_rate(self, cache_name, hit_rate):
    self.metrics[f"{cache_name}_hit_rate"].append({
      'value': hit_rate,
      'timestamp': datetime.now(),
    })

  def get_average(self, metric_name):
    values = [m['value'] for m in self.metrics[metric_name]]
    return sum(values) / len(values) if values else 0

  def get_percentile(self, metric_name, percentile):
    values = sorted([m['value'] for m in self.metrics[metric_name]])
    if not values:
      return 0
    index = int(len(values) * percentile / 100)
    return values[index]

实时监控

class RealTimeMonitor:
  def __init__(self, interval=1):
    self.interval = interval
    self.running = False
    self.callbacks = []

  def add_callback(self, callback):
    self.callbacks.append(callback)

  async def start(self):
    self.running = True
    while self.running:
      metrics = self.collect_metrics()
      for callback in self.callbacks:
        await callback(metrics)
      await asyncio.sleep(self.interval)

  def stop(self):
    self.running = False

  def collect_metrics(self):
    return {
      'cpu_usage': psutil.cpu_percent(),
      'memory_usage': psutil.virtual_memory().percent,
      'disk_io': psutil.disk_io_counters(),
      'network_io': psutil.net_io_counters(),
    }

性能分析

瓶颈识别

class BottleneckAnalyzer:
  def __init__(self):
    self.execution_times = defaultdict(list)
    self.call_counts = defaultdict(int)

  def record_call(self, skill_name, duration):
    self.execution_times[skill_name].append(duration)
    self.call_counts[skill_name] += 1

  def analyze(self):
    report = {}

    for skill_name in self.execution_times:
      times = self.execution_times[skill_name]
      call_count = self.call_counts[skill_name]

      report[skill_name] = {
        'total_time': sum(times),
        'average_time': sum(times) / len(times),
        'max_time': max(times),
        'min_time': min(times),
        'call_count': call_count,
        'time_percentage': self.calculate_percentage(skill_name),
      }

    return report

  def calculate_percentage(self, skill_name):
    total_time = sum(sum(times) for times in self.execution_times.values())
    skill_time = sum(self.execution_times[skill_name])
    return (skill_time / total_time * 100) if total_time > 0 else 0

性能报告

class PerformanceReporter:
  def __init__(self):
    self.analyzer = BottleneckAnalyzer()

  def generate_report(self):
    analysis = self.analyzer.analyze()
    report = []
    report.append('# Performance Report')
    report.append(f"Generated: {datetime.now()}")
    report.append('')
    report.append('## Execution Summary')
    for skill_name, metrics in sorted(
      analysis.items(),
      key=lambda x: x[1]['total_time'],
      reverse=True,
    ):
      report.append(f"\n### {skill_name}")
      report.append(f"- Total Time: {metrics['total_time']:.2f}s")
      report.append(f"- Average Time: {metrics['average_time']:.2f}s")
      report.append(f"- Max Time: {metrics['max_time']:.2f}s")
      report.append(f"- Call Count: {metrics['call_count']}")
      report.append(f"- Time Percentage: {metrics['time_percentage']:.2f}%")
    return '\n'.join(report)

优化实践

代码审查优化

优化策略

  • 增量审查:只审查变更文件,复用结果
  • 并行审查:线程池并行加速
  • 缓存结果:多级缓存 + 合理 TTL
  • 智能过滤:优先审查重要文件

实现示例

class OptimizedCodeReview:
  def __init__(self):
    self.cache = MultiLevelCache()
    self.change_detector = ChangeDetector()
    self.executor = ParallelSkillExecutor()

  async def review(self, files):
    # 检测变更
    changes = self.change_detector.detect_file_changes(files)
    # 只审查变更的文件
    changed_files = [c['file'] for c in changes if c['type'] in ['added', 'modified']]
    # 并行审查
    tasks = [self.review_file(file) for file in changed_files]
    results = await self.executor.execute_parallel(tasks)
    return results

  async def review_file(self, file_path):
    # 检查缓存
    cache_key = self.generate_cache_key(file_path)
    cached_result = await self.cache.get(cache_key)
    if cached_result:
      return cached_result
    # 执行审查
    result = await self.execute_review(file_path)
    # 缓存结果
    await self.cache.set(cache_key, result)
    return result

文档生成优化

优化策略

  • 增量生成:只生成变更文档
  • 模板缓存:预编译模板
  • 批量处理:减少 I/O
  • 压缩输出:高效压缩算法

实现示例

class OptimizedDocGenerator:
  def __init__(self):
    self.template_cache = LRUCache(maxsize=50)
    self.diff_calculator = DiffCalculator()

  async def generate_docs(self, files, previous_docs=None):
    if previous_docs is None:
      return await self.full_generate(files)

    # 计算差异
    diffs = self.diff_calculator.calculate_diffs(files, previous_docs)

    # 只生成变更的部分
    results = {}
    for file_path, diff in diffs.items():
      if diff['changed']:
        results[file_path] = await self.generate_doc(file_path)
      else:
        results[file_path] = previous_docs[file_path]

    return results

  async def generate_doc(self, file_path):
    # 获取模板
    template = await self.get_template(file_path)
    # 生成文档
    content = await self.render_template(template, file_path)
    return content

  async def get_template(self, file_path):
    template_name = self.get_template_name(file_path)
    if template_name in self.template_cache:
      return self.template_cache[template_name]
    template = await self.load_template(template_name)
    self.template_cache[template_name] = template
    return template

测试生成优化

优化策略

  • 智能分析:识别未测试路径
  • 测试去重:合并相似测试
  • 并行执行:测试分片
  • 缓存结果:快照 + 增量

实现示例

class OptimizedTestGenerator:
  def __init__(self):
    self.coverage_analyzer = CoverageAnalyzer()
    self.test_deduplicator = TestDeduplicator()
    self.result_cache = ResultCache()

  async def generate_tests(self, files):
    # 分析覆盖率
    coverage = await self.coverage_analyzer.analyze(files)
    # 识别需要测试的代码
    untested_code = coverage.get_untested_code()
    # 生成测试
    tests = []
    for code in untested_code:
      test = await self.generate_test(code)
      tests.append(test)
    # 去重
    unique_tests = self.test_deduplicator.deduplicate(tests)
    return unique_tests

  async def generate_test(self, code):
    # 检查缓存
    cache_key = self.generate_cache_key(code)
    cached_test = await self.result_cache.get(cache_key)
    if cached_test:
      return cached_test
    # 生成测试
    test = await self.create_test(code)
    # 缓存结果
    await self.result_cache.set(cache_key, test)
    return test

On this page