3. Паттерны применения и архитектура
Эта тема — про то, как проектировать messaging‑архитектуру на RabbitMQ: очередь задач (work queue), pub/sub, RPC поверх очередей, event-driven подход и обязательная идемпотентность. Везде — короткие примеры кода и production‑практики.
Быстрый выбор паттерна (шпаргалка)
| Нужно | Паттерн | Как в RabbitMQ |
|---|---|---|
| Асинхронно выполнить работу | Work queue | Одна очередь tasks, несколько consumer’ов, prefetch, manual ack |
| Разослать событие многим | Pub/Sub | fanout exchange → несколько очередей подписчиков |
| Селективная подписка по типу события | Topic-based pub/sub | topic exchange + routing keys (orders.created) |
| “Запрос‑ответ” через брокер | RPC over RabbitMQ | reply queue + correlation_id, timeout, idempotency |
| Интеграция доменных событий | Event-driven | контракт событий, versioning, DLQ, replay стратегия |
Важно: если нужна строгая гарантия порядка глобально или очень высокий throughput с партиционированием, RabbitMQ может быть не лучшим выбором (Kafka/NATS JetStream — по задаче). Для большинства интеграций и асинхронных задач RabbitMQ отлично подходит.
Work queues: очередь задач (job queue)
Когда использовать
- «Сделай работу и не блокируй API»: ресайз изображений, отправка email, генерация отчётов, фоновая обработка.
Базовая схема
producer → exchange (direct/default) → tasks queue → N consumers
Production‑настройки (минимум)
- manual ack (ack после успешной обработки)
- prefetch (fair dispatch)
- durable queue + persistent messages
- DLQ для “плохих” задач и ограничение retry
Мини‑пример (Python consumer)
import pika
import json
conn = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
ch = conn.channel()
ch.queue_declare(queue="tasks", durable=True)
ch.basic_qos(prefetch_count=10)
def on_message(ch, method, props, body):
job = json.loads(body.decode("utf-8"))
try:
# обработка job
ch.basic_ack(method.delivery_tag)
except Exception:
# “плохие” данные -> DLQ (requeue=False)
ch.basic_nack(method.delivery_tag, requeue=False)
ch.basic_consume("tasks", on_message_callback=on_message, auto_ack=False)
ch.start_consuming()
Best practice: если задачи долгие/дорогие — добавляйте heartbeat/timeout, а в самих задачах храните статус в БД, чтобы ретраи не выполняли работу дважды.
Pub/Sub: широковещательные события
Когда использовать
- «Событие произошло, многим нужно узнать»:
user.created,order.paid,invoice.issued.
Схема в RabbitMQ
Fanout exchange:
producer → exchange(fanout) → queue_A → consumer_A
producer → exchange(fanout) → queue_B → consumer_B
Каждый подписчик должен иметь свою очередь, иначе один consumer «съест» события другого (это будет work queue, а не pub/sub).
Пример (концептуально)
exchange: events.fanout (type=fanout, durable)
queues:
billing.events
analytics.events
bindings:
events.fanout -> billing.events
events.fanout -> analytics.events
Production best practices:
- События должны иметь контракт (schema), версию (
event_version) и метаданные (event_id,occurred_at). - У каждого consumer’а — DLQ и политика retry.
- Не публиковать PII/секреты в события «как есть».
Topic routing: селективная подписка
Когда использовать
- Разные consumer’ы хотят получать часть событий по шаблону: например,
orders.*илиpayments.#.
Схема
producer → exchange(topic) → queues (bindings по patterns)
Пример:
- Billing подписывается на
orders.paid - Analytics на
orders.#
Production best practices:
- Нормализуйте routing keys:
domain.event.action(orders.created,payments.failed). - Не делайте routing key «свалкой» (слишком много сегментов, сложно сопровождать).
RPC over RabbitMQ (request/reply)
Когда использовать (редко)
RPC через брокер имеет смысл, когда:
- нет прямой сети между сервисами, но есть доступ к брокеру
- нужна очередность/буферизация запросов
Во многих случаях проще HTTP/gRPC. Если используете RPC — обязательно добавляйте timeouts и защиту от утечек reply‑очередей.
Базовая идея
Producer отправляет request с:
reply_to: очередь для ответовcorrelation_id: идентификатор запроса
Consumer отвечает в reply_to, копируя correlation_id.
Мини‑пример (Python client)
import pika
import uuid
conn = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
ch = conn.channel()
result = ch.queue_declare(queue="", exclusive=True) # временная reply queue
callback_queue = result.method.queue
corr_id = str(uuid.uuid4())
response = None
def on_response(ch, method, props, body):
global response
if props.correlation_id == corr_id:
response = body
ch.basic_consume(queue=callback_queue, on_message_callback=on_response, auto_ack=True)
ch.basic_publish(
exchange="",
routing_key="rpc.queue",
properties=pika.BasicProperties(
reply_to=callback_queue,
correlation_id=corr_id,
delivery_mode=2,
),
body=b"ping",
)
# В production тут должен быть timeout + event loop.
while response is None:
conn.process_data_events(time_limit=1)
print("got:", response)
conn.close()
Production best practices:
- Timeout обязателен, иначе клиент может зависнуть навсегда.
- Reply queue должна быть exclusive + auto-delete (или server‑named), чтобы не накапливать мусор.
- Consumer должен быть идемпотентным — клиент может повторить RPC при таймауте.
Event-driven архитектура: как делать «по-взрослому»
Событие vs команда
- Command (команда) — “сделай”: одна цель, один владелец, чаще work queue.
- Event (событие) — “произошло”: множество подписчиков, pub/sub.
Versioning и совместимость
Best practice:
- добавляйте поля только backward compatible
- не удаляйте поля резко; делайте миграции в несколько шагов
- используйте
event_type+event_versionи schema registry (хотя бы в виде репозитория со схемами)
Дедупликация и идемпотентность
Идемпотентность — обязательна для at-least-once:
- в событиях:
event_id(UUID) - в команде/задаче:
job_idили business key - consumer хранит “processed ids” (БД/Redis) и пропускает дубликаты
Простой пример дедупа (псевдо):
if redis.setnx(event_id, 1) == 0: # уже было
ack
else:
redis.expire(event_id, 86400)
handle(event)
ack
Production best practices (проектирование)
- Выделяйте vhost по окружениям (
/prod,/staging) и разделяйте права. - Одна очередь на одного logical consumer group (для pub/sub — очередь на каждого подписчика).
- Ограничивайте ретраи (max attempts + DLQ), не допускайте бесконечных requeue.
- Измеряйте backlog и latency: рост
messages_ready= сигнал, что система не успевает. - Тестируйте схемы отказов: падение consumer’ов, рестарт брокера, сеть/timeout к БД.