Глава 22. Оптимизация производительности
"Первая версия должна работать. Вторая версия — быстро работать."
22.1. Измеряй, потом оптимизируй
Правило оптимизации
"Преждевременная оптимизация — корень всех зол" — Donald Knuth
Правильный подход:
- Сделайте чтобы работало
- Измерьте производительность
- Найдите узкие места
- Оптимизируйте узкие места
- Повторите 2-4
Инструменты измерения
Python profiling:
import cProfile
import pstats
# Профилирование функции
def my_function():
# Ваш код
pass
cProfile.run('my_function()', 'profile_stats')
# Анализ
stats = pstats.Stats('profile_stats')
stats.sort_stats('cumulative')
stats.print_stats(10) # Топ-10 медленных функций
Line profiler (детально):
pip install line_profiler
from line_profiler import LineProfiler
lp = LineProfiler()
lp.add_function(my_function)
lp.run('my_function()')
lp.print_stats()
22.2. Caching — самая мощная оптимизация
Зачем кэшировать
Без кэша:
# Каждый запрос → вызов LLM ($$$, медленно)
result1 = llm("What is Python?") # 2 секунды, $0.002
result2 = llm("What is Python?") # 2 секунды, $0.002
С кэшем:
# Первый запрос → LLM
result1 = cached_llm("What is Python?") # 2 секунды, $0.002
# Повторный запрос → из кэша
result2 = cached_llm("What is Python?") # 0.001 секунды, $0
Ускорение: 2000x, экономия: 100%
In-memory кэш (функции)
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_user(user_id: int):
# Медленный запрос к БД
return db.query(User).get(user_id)
# Первый вызов → запрос к БД
user1 = get_user(123) # Медленно
# Повторный → из кэша
user2 = get_user(123) # Мгновенно
Redis для distributed кэша
Зачем Redis:
- Несколько серверов используют один кэш
- Кэш переживает перезапуск приложения
- TTL (время жизни) для записей
import redis
import json
import hashlib
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cached_llm_call(prompt: str, ttl: int = 3600):
# Создаём ключ кэша
cache_key = hashlib.md5(prompt.encode()).hexdigest()
# Проверяем кэш
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Вызываем LLM
response = llm(prompt)
# Сохраняем в кэш (TTL = 1 час)
redis_client.setex(cache_key, ttl, json.dumps(response))
return response
# Использование
result = cached_llm_call("What is Python?")
Semantic кэширование для LLM
Проблема: "What is Python?" и "Can you explain Python?" — разные промпты, но похожий смысл.
Решение: Semantic cache
from openai import OpenAI
import numpy as np
client = OpenAI()
class SemanticCache:
def __init__(self, similarity_threshold=0.95):
self.cache = [] # [(embedding, response)]
self.threshold = similarity_threshold
def get_embedding(self, text):
response = client.embeddings.create(
input=text,
model="text-embedding-ada-002"
)
return response.data[0].embedding
def cosine_similarity(self, a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, prompt):
prompt_emb = self.get_embedding(prompt)
# Ищем похожий промпт в кэше
for cached_emb, cached_response in self.cache:
similarity = self.cosine_similarity(prompt_emb, cached_emb)
if similarity >= self.threshold:
return cached_response # Cache hit!
return None # Cache miss
def set(self, prompt, response):
prompt_emb = self.get_embedding(prompt)
self.cache.append((prompt_emb, response))
# Использование
cache = SemanticCache()
def llm_with_semantic_cache(prompt):
# Проверяем кэш
cached = cache.get(prompt)
if cached:
return cached
# Вызываем LLM
response = llm(prompt)
# Сохраняем
cache.set(prompt, response)
return response
# Оба промпта вернут кэшированный ответ
result1 = llm_with_semantic_cache("What is Python?")
result2 = llm_with_semantic_cache("Can you explain Python?") # Cache hit!
22.3. Async и параллелизм
Sync vs Async
Синхронный код (медленно):
def process_requests(prompts):
results = []
for prompt in prompts:
result = llm(prompt) # Ждём ответ
results.append(result)
return results
# 10 запросов * 2 секунды = 20 секунд
prompts = ["Prompt 1", "Prompt 2", ..., "Prompt 10"]
results = process_requests(prompts)
Асинхронный код (быстро):
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def llm_async(prompt):
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def process_requests_async(prompts):
# Запускаем все запросы параллельно
tasks = [llm_async(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
# 10 запросов параллельно = ~2 секунды (10x быстрее!)
prompts = ["Prompt 1", "Prompt 2", ..., "Prompt 10"]
results = asyncio.run(process_requests_async(prompts))
Ускорение: 10x
FastAPI с async
from fastapi import FastAPI
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
@app.post("/generate")
async def generate(prompt: str):
# Асинхронный вызов
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return {"response": response.choices[0].message.content}
# Может обрабатывать сотни запросов одновременно
Параллелизм с multiprocessing
Для CPU-bound задач:
from multiprocessing import Pool
def process_item(item):
# CPU-intensive операция
result = expensive_computation(item)
return result
# Используем все CPU cores
with Pool() as pool:
results = pool.map(process_item, items)
22.4. Оптимизация LLM запросов
1. Выбор модели
| Задача | Модель | Скорость | Цена |
|---|---|---|---|
| Простая классификация | gpt-3.5-turbo | ⚡⚡⚡ | $ |
| Сложная генерация | gpt-4 | ⚡ | $$$ |
| Code generation | gpt-4 | ⚡⚡ | $$$ |
| Embeddings | text-embedding-ada-002 | ⚡⚡⚡ | $ |
Правило: Используйте самую простую модель, которая справляется с задачей.
2. Сократите промпт
Длинный промпт:
prompt = """
You are an AI assistant. I need you to help me with a task.
The task is to classify the following text into categories.
The categories are: positive, negative, neutral.
Please analyze the sentiment carefully and provide your answer.
Remember to be accurate.
Text: "I love this product!"
Please provide your answer in the following format:
Category: [your answer]
Короткий промпт (тот же результат):
prompt = """Classify sentiment (positive/negative/neutral):
Text: "I love this product!"
Answer:"""
Экономия токенов: 70%
3. Streaming для UX
from openai import OpenAI
client = OpenAI()
def stream_response(prompt):
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True # Включаем streaming
)
for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
# Использование
for token in stream_response("Write a story"):
print(token, end="", flush=True)
Пользователь видит ответ сразу, а не ждёт 10 секунд.
4. Batch обработка
Неэффективно:
for item in items:
result = llm(f"Process: {item}")
Эффективно:
# Обрабатываем батчами
batch_prompt = f"""
Process these items:
1. {items[0]}
2. {items[1]}
3. {items[2]}
...
"""
result = llm(batch_prompt)
Меньше запросов = быстрее + дешевле.
22.5. Database оптимизация
Индексы
Без индекса:
SELECT * FROM users WHERE email = 'alice@example.com';
-- Scan всех строк: O(n)
С индексом:
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'alice@example.com';
-- Lookup: O(log n)
Ускорение: 1000x на больших таблицах.
Connection pooling
Плохо: Создавать новое соединение для каждого запроса
def get_user(user_id):
conn = create_connection() # Медленно!
result = conn.execute(...)
conn.close()
return result
Хорошо: Использовать connection pool
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://...',
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
def get_user(user_id):
with engine.connect() as conn: # Берём из pool
result = conn.execute(...)
return result # Возвращаем в pool
Lazy loading vs Eager loading
N+1 проблема (медленно):
# 1 запрос для orders
orders = db.query(Order).all()
# N запросов для users
for order in orders:
print(order.user.name) # Каждый раз запрос к БД!
# Итого: 1 + N запросов
Eager loading (быстро):
from sqlalchemy.orm import joinedload
# 1 запрос с JOIN
orders = db.query(Order).options(joinedload(Order.user)).all()
for order in orders:
print(order.user.name) # Уже загружено!
# Итого: 1 запрос
22.6. Load balancing и scaling
Horizontal scaling (добавляем серверы)
Before:
[Server] ← all requests
After:
Load Balancer
│
┌───────┼───────┐
↓ ↓ ↓
[Server1][Server2][Server3]
Nginx как load balancer:
upstream backend {
server server1.example.com;
server server2.example.com;
server server3.example.com;
}
server {
location / {
proxy_pass http://backend;
}
}
Auto-scaling в Kubernetes
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ai-app-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ai-app
minReplicas: 2
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Автоматически добавляет pods при высокой нагрузке.
22.7. CDN для статики
Проблема: Медленная загрузка
User (Tokyo) → Server (US) → 200ms latency
Решение: CDN (Content Delivery Network)
User (Tokyo) → CDN Edge (Tokyo) → 10ms latency
CDN кэширует статические файлы (JS, CSS, images) близко к пользователям.
Настройка Cloudflare:
- Добавить домен в Cloudflare
- Включить CDN (автоматически)
- Настроить кэширование
Cache-Control headers:
from fastapi import FastAPI
from fastapi.responses import FileResponse
@app.get("/static/{file_path}")
def serve_static(file_path: str):
response = FileResponse(f"static/{file_path}")
response.headers["Cache-Control"] = "public, max-age=31536000" # 1 год
return response
22.8. Monitoring производительности
Метрики для отслеживания
- Latency — время ответа
- Throughput — запросов в секунду
- Error rate — процент ошибок
- Resource usage — CPU, память
Prometheus + Grafana
from prometheus_client import Counter, Histogram, start_http_server
import time
# Метрики
REQUEST_COUNT = Counter('requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency')
@app.middleware("http")
async def track_metrics(request, call_next):
start_time = time.time()
response = await call_next(request)
# Записываем метрики
REQUEST_COUNT.inc()
REQUEST_LATENCY.observe(time.time() - start_time)
return response
# Expose metrics endpoint
start_http_server(8001) # http://localhost:8001/metrics
Grafana отображает метрики в красивых dashboards.
Ключевые выводы главы
✅ Сначала измеряй: Профилирование перед оптимизацией
✅ Caching — #1 оптимизация: In-memory, Redis, semantic cache
✅ Async для I/O: 10x ускорение для сетевых запросов
✅ Оптимизируй LLM: Выбор модели, короткие промпты, батчинг
✅ Database: Индексы, connection pooling, eager loading
✅ Scaling: Load balancing, auto-scaling, CDN
✅ Мониторинг: Prometheus + Grafana для метрик
✅ Правило: Оптимизируйте узкие места, не всё подряд
Следующая глава: Security — защита AI-приложений