ЧАСТЬ IV: МАСШТАБИРОВАНИЕ

Оптимизация производительности

Глава 22. Оптимизация производительности

"Первая версия должна работать. Вторая версия — быстро работать."


22.1. Измеряй, потом оптимизируй

Правило оптимизации

"Преждевременная оптимизация — корень всех зол" — Donald Knuth

Правильный подход:

  1. Сделайте чтобы работало
  2. Измерьте производительность
  3. Найдите узкие места
  4. Оптимизируйте узкие места
  5. Повторите 2-4

Инструменты измерения

Python profiling:

import cProfile
import pstats

# Профилирование функции
def my_function():
    # Ваш код
    pass

cProfile.run('my_function()', 'profile_stats')

# Анализ
stats = pstats.Stats('profile_stats')
stats.sort_stats('cumulative')
stats.print_stats(10)  # Топ-10 медленных функций

Line profiler (детально):

pip install line_profiler
from line_profiler import LineProfiler

lp = LineProfiler()
lp.add_function(my_function)
lp.run('my_function()')
lp.print_stats()

22.2. Caching — самая мощная оптимизация

Зачем кэшировать

Без кэша:

# Каждый запрос → вызов LLM ($$$, медленно)
result1 = llm("What is Python?")  # 2 секунды, $0.002
result2 = llm("What is Python?")  # 2 секунды, $0.002

С кэшем:

# Первый запрос → LLM
result1 = cached_llm("What is Python?")  # 2 секунды, $0.002

# Повторный запрос → из кэша
result2 = cached_llm("What is Python?")  # 0.001 секунды, $0

Ускорение: 2000x, экономия: 100%

In-memory кэш (функции)

from functools import lru_cache

@lru_cache(maxsize=1000)
def get_user(user_id: int):
    # Медленный запрос к БД
    return db.query(User).get(user_id)

# Первый вызов → запрос к БД
user1 = get_user(123)  # Медленно

# Повторный → из кэша
user2 = get_user(123)  # Мгновенно

Redis для distributed кэша

Зачем Redis:

  • Несколько серверов используют один кэш
  • Кэш переживает перезапуск приложения
  • TTL (время жизни) для записей
import redis
import json
import hashlib

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cached_llm_call(prompt: str, ttl: int = 3600):
    # Создаём ключ кэша
    cache_key = hashlib.md5(prompt.encode()).hexdigest()

    # Проверяем кэш
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # Вызываем LLM
    response = llm(prompt)

    # Сохраняем в кэш (TTL = 1 час)
    redis_client.setex(cache_key, ttl, json.dumps(response))

    return response

# Использование
result = cached_llm_call("What is Python?")

Semantic кэширование для LLM

Проблема: "What is Python?" и "Can you explain Python?" — разные промпты, но похожий смысл.

Решение: Semantic cache

from openai import OpenAI
import numpy as np

client = OpenAI()

class SemanticCache:
    def __init__(self, similarity_threshold=0.95):
        self.cache = []  # [(embedding, response)]
        self.threshold = similarity_threshold

    def get_embedding(self, text):
        response = client.embeddings.create(
            input=text,
            model="text-embedding-ada-002"
        )
        return response.data[0].embedding

    def cosine_similarity(self, a, b):
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def get(self, prompt):
        prompt_emb = self.get_embedding(prompt)

        # Ищем похожий промпт в кэше
        for cached_emb, cached_response in self.cache:
            similarity = self.cosine_similarity(prompt_emb, cached_emb)
            if similarity >= self.threshold:
                return cached_response  # Cache hit!

        return None  # Cache miss

    def set(self, prompt, response):
        prompt_emb = self.get_embedding(prompt)
        self.cache.append((prompt_emb, response))

# Использование
cache = SemanticCache()

def llm_with_semantic_cache(prompt):
    # Проверяем кэш
    cached = cache.get(prompt)
    if cached:
        return cached

    # Вызываем LLM
    response = llm(prompt)

    # Сохраняем
    cache.set(prompt, response)

    return response

# Оба промпта вернут кэшированный ответ
result1 = llm_with_semantic_cache("What is Python?")
result2 = llm_with_semantic_cache("Can you explain Python?")  # Cache hit!

22.3. Async и параллелизм

Sync vs Async

Синхронный код (медленно):

def process_requests(prompts):
    results = []
    for prompt in prompts:
        result = llm(prompt)  # Ждём ответ
        results.append(result)
    return results

# 10 запросов * 2 секунды = 20 секунд
prompts = ["Prompt 1", "Prompt 2", ..., "Prompt 10"]
results = process_requests(prompts)

Асинхронный код (быстро):

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def llm_async(prompt):
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_requests_async(prompts):
    # Запускаем все запросы параллельно
    tasks = [llm_async(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    return results

# 10 запросов параллельно = ~2 секунды (10x быстрее!)
prompts = ["Prompt 1", "Prompt 2", ..., "Prompt 10"]
results = asyncio.run(process_requests_async(prompts))

Ускорение: 10x

FastAPI с async

from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/generate")
async def generate(prompt: str):
    # Асинхронный вызов
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )

    return {"response": response.choices[0].message.content}

# Может обрабатывать сотни запросов одновременно

Параллелизм с multiprocessing

Для CPU-bound задач:

from multiprocessing import Pool

def process_item(item):
    # CPU-intensive операция
    result = expensive_computation(item)
    return result

# Используем все CPU cores
with Pool() as pool:
    results = pool.map(process_item, items)

22.4. Оптимизация LLM запросов

1. Выбор модели

ЗадачаМодельСкоростьЦена
Простая классификацияgpt-3.5-turbo⚡⚡⚡$
Сложная генерацияgpt-4$$$
Code generationgpt-4⚡⚡$$$
Embeddingstext-embedding-ada-002⚡⚡⚡$

Правило: Используйте самую простую модель, которая справляется с задачей.

2. Сократите промпт

Длинный промпт:

prompt = """
You are an AI assistant. I need you to help me with a task.
The task is to classify the following text into categories.
The categories are: positive, negative, neutral.
Please analyze the sentiment carefully and provide your answer.
Remember to be accurate.

Text: "I love this product!"

Please provide your answer in the following format:
Category: [your answer]

Короткий промпт (тот же результат):

prompt = """Classify sentiment (positive/negative/neutral):
Text: "I love this product!"
Answer:"""

Экономия токенов: 70%

3. Streaming для UX

from openai import OpenAI

client = OpenAI()

def stream_response(prompt):
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True  # Включаем streaming
    )

    for chunk in response:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# Использование
for token in stream_response("Write a story"):
    print(token, end="", flush=True)

Пользователь видит ответ сразу, а не ждёт 10 секунд.

4. Batch обработка

Неэффективно:

for item in items:
    result = llm(f"Process: {item}")

Эффективно:

# Обрабатываем батчами
batch_prompt = f"""
Process these items:
1. {items[0]}
2. {items[1]}
3. {items[2]}
...
"""
result = llm(batch_prompt)

Меньше запросов = быстрее + дешевле.


22.5. Database оптимизация

Индексы

Без индекса:

SELECT * FROM users WHERE email = 'alice@example.com';
-- Scan всех строк: O(n)

С индексом:

CREATE INDEX idx_users_email ON users(email);

SELECT * FROM users WHERE email = 'alice@example.com';
-- Lookup: O(log n)

Ускорение: 1000x на больших таблицах.

Connection pooling

Плохо: Создавать новое соединение для каждого запроса

def get_user(user_id):
    conn = create_connection()  # Медленно!
    result = conn.execute(...)
    conn.close()
    return result

Хорошо: Использовать connection pool

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'postgresql://...',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

def get_user(user_id):
    with engine.connect() as conn:  # Берём из pool
        result = conn.execute(...)
    return result  # Возвращаем в pool

Lazy loading vs Eager loading

N+1 проблема (медленно):

# 1 запрос для orders
orders = db.query(Order).all()

# N запросов для users
for order in orders:
    print(order.user.name)  # Каждый раз запрос к БД!

# Итого: 1 + N запросов

Eager loading (быстро):

from sqlalchemy.orm import joinedload

# 1 запрос с JOIN
orders = db.query(Order).options(joinedload(Order.user)).all()

for order in orders:
    print(order.user.name)  # Уже загружено!

# Итого: 1 запрос

22.6. Load balancing и scaling

Horizontal scaling (добавляем серверы)

Before:
[Server] ← all requests

After:
          Load Balancer
               │
       ┌───────┼───────┐
       ↓       ↓       ↓
   [Server1][Server2][Server3]

Nginx как load balancer:

upstream backend {
    server server1.example.com;
    server server2.example.com;
    server server3.example.com;
}

server {
    location / {
        proxy_pass http://backend;
    }
}

Auto-scaling в Kubernetes

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ai-app-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-app
  minReplicas: 2
  maxReplicas: 50
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

Автоматически добавляет pods при высокой нагрузке.


22.7. CDN для статики

Проблема: Медленная загрузка

User (Tokyo) → Server (US) → 200ms latency

Решение: CDN (Content Delivery Network)

User (Tokyo) → CDN Edge (Tokyo) → 10ms latency

CDN кэширует статические файлы (JS, CSS, images) близко к пользователям.

Настройка Cloudflare:

  1. Добавить домен в Cloudflare
  2. Включить CDN (автоматически)
  3. Настроить кэширование

Cache-Control headers:

from fastapi import FastAPI
from fastapi.responses import FileResponse

@app.get("/static/{file_path}")
def serve_static(file_path: str):
    response = FileResponse(f"static/{file_path}")
    response.headers["Cache-Control"] = "public, max-age=31536000"  # 1 год
    return response

22.8. Monitoring производительности

Метрики для отслеживания

  1. Latency — время ответа
  2. Throughput — запросов в секунду
  3. Error rate — процент ошибок
  4. Resource usage — CPU, память

Prometheus + Grafana

from prometheus_client import Counter, Histogram, start_http_server
import time

# Метрики
REQUEST_COUNT = Counter('requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency')

@app.middleware("http")
async def track_metrics(request, call_next):
    start_time = time.time()

    response = await call_next(request)

    # Записываем метрики
    REQUEST_COUNT.inc()
    REQUEST_LATENCY.observe(time.time() - start_time)

    return response

# Expose metrics endpoint
start_http_server(8001)  # http://localhost:8001/metrics

Grafana отображает метрики в красивых dashboards.


Ключевые выводы главы

Сначала измеряй: Профилирование перед оптимизацией

Caching — #1 оптимизация: In-memory, Redis, semantic cache

Async для I/O: 10x ускорение для сетевых запросов

Оптимизируй LLM: Выбор модели, короткие промпты, батчинг

Database: Индексы, connection pooling, eager loading

Scaling: Load balancing, auto-scaling, CDN

Мониторинг: Prometheus + Grafana для метрик

Правило: Оптимизируйте узкие места, не всё подряд


Следующая глава: Security — защита AI-приложений