rabbitmq operations architecture debugging

RabbitMQ死信队列:设计、监控与失败消息的重新处理

DLQ会悄无声息地积压失败消息,直到数周后有人发现。本文介绍如何正确配置死信队列、监控其增长,以及如何构建一套不会让问题更糟的重处理策略。

Qarote Team
9 min read

事后复盘的时间线每次读起来都一样。14号部署了一个有bug的订单处理器。消费者开始拒绝格式错误的事件。DLQ在18天内从零积压到47,000条消息。没有人注意到,直到一位客户发现了异常。

死信队列是失败消息的归宿——当没有人盯着的时候,它们就在这里悄悄堆积。主队列上没有任何告警。吞吐量看起来一切正常。消费者在运行。DLQ的深度没有出现在任何人的仪表盘上。三周后,你在复盘会议上解释为什么47,000条订单事件从未被处理。

本文将介绍DLQ的实际工作原理、如何避开常见陷阱进行配置,以及如何构建一套不会火上浇油的重处理策略。

Qarote 可以按队列监控DLQ深度和增长速率——包括在任何死信队列开始接收消息时发出告警。 了解DLQ监控的工作方式 →


死信队列到底是什么

死信队列并不是RabbitMQ的特殊构造。它就是一个绑定到普通交换机的普通队列。触发消息进入死信的原因有三种:

1. basic.nackbasic.reject,且 requeue=false 2. 消息TTL过期。 3. 队列长度限制超出。

{
  "x-death": [
    {
      "count": 1,
      "reason": "rejected",
      "queue": "my-queue",
      "time": "2026-04-14T09:23:11Z",
      "exchange": "my-exchange",
      "routing-keys": ["my-queue"]
    }
  ],
  "x-first-death-reason": "rejected",
  "x-first-death-queue": "my-queue",
  "x-first-death-exchange": "my-exchange"
}

如何正确配置DLQ

rabbitmqadmin declare exchange name=my-dlx type=direct
rabbitmqadmin declare queue name=my-queue.dlq
rabbitmqadmin declare binding source=my-dlx destination=my-queue.dlq routing_key=my-queue
rabbitmqadmin declare queue name=my-queue \
  arguments='{"x-dead-letter-exchange":"my-dlx","x-dead-letter-routing-key":"my-queue"}'
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.exchange_declare(exchange="my-dlx", exchange_type="direct")
channel.queue_declare(queue="my-queue.dlq")
channel.queue_bind(queue="my-queue.dlq", exchange="my-dlx", routing_key="my-queue")
channel.queue_declare(queue="my-queue", arguments={"x-dead-letter-exchange": "my-dlx", "x-dead-letter-routing-key": "my-queue"})

对于已有队列:

rabbitmqctl set_policy dlx-policy "^my-queue$" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

消息进入DLQ的三种原因

被拒绝(x-first-death-reason: rejected

检查 x-death[0].count。大于1说明存在循环。参见 RabbitMQ消费者无法处理消息

TTL过期(x-first-death-reason: expired

将过期时间戳与消费者中断的时间窗口交叉比对。

队列长度限制(x-first-death-reason: maxlen

最旧的消息优先被移入死信。重新处理前请确认幂等性是否安全。参见 如何调试RabbitMQ队列积压RabbitMQ内存告警


如何检查DLQ中的消息

curl -u guest:guest -X POST \
  http://localhost:15672/api/queues/%2F/my-queue.dlq/get \
  -H 'content-type: application/json' \
  -d '{"count":5,"ackmode":"ack_requeue_true","encoding":"auto"}' \
  | jq '.[] | {payload: .payload, death_reason: .properties.headers["x-first-death-reason"], death_count: (.properties.headers["x-death"] | length)}'

监控DLQ增长

- alert: RabbitMQDLQGrowing
  expr: |
    rate(rabbitmq_queue_messages_published_total{queue=~".*dlq.*|.*dead.*|.*failed.*"}[5m]) > 0
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "DLQ {{ $labels.queue }} receiving messages at {{ $value }}/sec"

参见 如何配置真正能在关键时刻触发的RabbitMQ告警


重处理策略

1. Shovel插件

rabbitmqctl set_parameter shovel my-reprocess \
  '{"src-protocol":"amqp091","src-uri":"amqp://guest:guest@localhost","src-queue":"my-queue.dlq","dest-protocol":"amqp091","dest-uri":"amqp://guest:guest@localhost","dest-exchange":"my-exchange","dest-exchange-key":"my-queue"}'

2. 基于消费者的重处理(推荐)

import pika, json
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)

def reprocess(ch, method, properties, body):
    try:
        data = json.loads(body)
        death_count = len(properties.headers.get("x-death", []))
        if death_count > 5:
            ch.basic_ack(delivery_tag=method.delivery_tag)
            return
        fixed_data = migrate_schema(data)
        ch.basic_publish(exchange="my-exchange", routing_key="my-queue", body=json.dumps(fixed_data))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_consume(queue="my-queue.dlq", on_message_callback=reprocess)
channel.start_consuming()

3. 选择性重新入队

curl -u guest:guest -X POST \
  http://localhost:15672/api/queues/%2F/my-queue.dlq/get \
  -H 'content-type: application/json' \
  -d '{"count":1,"ackmode":"ack_requeue_false","encoding":"auto"}'

常见的DLQ设计错误

  • 未配置DLQ——被nack的消息悄无声息地丢失
  • DLQ没有消费者
  • DLQ使用与主队列相同的交换机——形成无限循环
  • 没有针对DLQ增长设置告警
  • DLQ本身没有TTL

简而言之

使用 x-dead-letter-exchange 配置DLQ。死信的三种原因:被拒绝、TTL过期、超出长度限制。重新处理前先读取 x-death 头信息。优先使用基于消费者的重处理方式。对增长速率设置告警。

Qarote 会在DLQ增长速率异常时立即告警,让你在消费者开始拒绝消息时第一时间知晓——而不是三周后才在复盘会议上发现。

了解Qarote的DLQ监控工作方式 →

Tired of debugging RabbitMQ blind?

Qarote gives you a real-time view of queues, consumers, and alarms — free.

Get started free