事后复盘的时间线每次读起来都一样。14号部署了一个有bug的订单处理器。消费者开始拒绝格式错误的事件。DLQ在18天内从零积压到47,000条消息。没有人注意到,直到一位客户发现了异常。
死信队列是失败消息的归宿——当没有人盯着的时候,它们就在这里悄悄堆积。主队列上没有任何告警。吞吐量看起来一切正常。消费者在运行。DLQ的深度没有出现在任何人的仪表盘上。三周后,你在复盘会议上解释为什么47,000条订单事件从未被处理。
本文将介绍DLQ的实际工作原理、如何避开常见陷阱进行配置,以及如何构建一套不会火上浇油的重处理策略。
Qarote 可以按队列监控DLQ深度和增长速率——包括在任何死信队列开始接收消息时发出告警。 了解DLQ监控的工作方式 →
死信队列到底是什么
死信队列并不是RabbitMQ的特殊构造。它就是一个绑定到普通交换机的普通队列。触发消息进入死信的原因有三种:
1. basic.nack 或 basic.reject,且 requeue=false。
2. 消息TTL过期。
3. 队列长度限制超出。
{
"x-death": [
{
"count": 1,
"reason": "rejected",
"queue": "my-queue",
"time": "2026-04-14T09:23:11Z",
"exchange": "my-exchange",
"routing-keys": ["my-queue"]
}
],
"x-first-death-reason": "rejected",
"x-first-death-queue": "my-queue",
"x-first-death-exchange": "my-exchange"
}
如何正确配置DLQ
rabbitmqadmin declare exchange name=my-dlx type=direct
rabbitmqadmin declare queue name=my-queue.dlq
rabbitmqadmin declare binding source=my-dlx destination=my-queue.dlq routing_key=my-queue
rabbitmqadmin declare queue name=my-queue \
arguments='{"x-dead-letter-exchange":"my-dlx","x-dead-letter-routing-key":"my-queue"}'
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.exchange_declare(exchange="my-dlx", exchange_type="direct")
channel.queue_declare(queue="my-queue.dlq")
channel.queue_bind(queue="my-queue.dlq", exchange="my-dlx", routing_key="my-queue")
channel.queue_declare(queue="my-queue", arguments={"x-dead-letter-exchange": "my-dlx", "x-dead-letter-routing-key": "my-queue"})
对于已有队列:
rabbitmqctl set_policy dlx-policy "^my-queue$" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
消息进入DLQ的三种原因
被拒绝(x-first-death-reason: rejected)
检查 x-death[0].count。大于1说明存在循环。参见 RabbitMQ消费者无法处理消息。
TTL过期(x-first-death-reason: expired)
将过期时间戳与消费者中断的时间窗口交叉比对。
队列长度限制(x-first-death-reason: maxlen)
最旧的消息优先被移入死信。重新处理前请确认幂等性是否安全。参见 如何调试RabbitMQ队列积压 和 RabbitMQ内存告警。
如何检查DLQ中的消息
curl -u guest:guest -X POST \
http://localhost:15672/api/queues/%2F/my-queue.dlq/get \
-H 'content-type: application/json' \
-d '{"count":5,"ackmode":"ack_requeue_true","encoding":"auto"}' \
| jq '.[] | {payload: .payload, death_reason: .properties.headers["x-first-death-reason"], death_count: (.properties.headers["x-death"] | length)}'
监控DLQ增长
- alert: RabbitMQDLQGrowing
expr: |
rate(rabbitmq_queue_messages_published_total{queue=~".*dlq.*|.*dead.*|.*failed.*"}[5m]) > 0
for: 5m
labels:
severity: warning
annotations:
summary: "DLQ {{ $labels.queue }} receiving messages at {{ $value }}/sec"
重处理策略
1. Shovel插件
rabbitmqctl set_parameter shovel my-reprocess \
'{"src-protocol":"amqp091","src-uri":"amqp://guest:guest@localhost","src-queue":"my-queue.dlq","dest-protocol":"amqp091","dest-uri":"amqp://guest:guest@localhost","dest-exchange":"my-exchange","dest-exchange-key":"my-queue"}'
2. 基于消费者的重处理(推荐)
import pika, json
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
def reprocess(ch, method, properties, body):
try:
data = json.loads(body)
death_count = len(properties.headers.get("x-death", []))
if death_count > 5:
ch.basic_ack(delivery_tag=method.delivery_tag)
return
fixed_data = migrate_schema(data)
ch.basic_publish(exchange="my-exchange", routing_key="my-queue", body=json.dumps(fixed_data))
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_consume(queue="my-queue.dlq", on_message_callback=reprocess)
channel.start_consuming()
3. 选择性重新入队
curl -u guest:guest -X POST \
http://localhost:15672/api/queues/%2F/my-queue.dlq/get \
-H 'content-type: application/json' \
-d '{"count":1,"ackmode":"ack_requeue_false","encoding":"auto"}'
常见的DLQ设计错误
- 未配置DLQ——被nack的消息悄无声息地丢失
- DLQ没有消费者
- DLQ使用与主队列相同的交换机——形成无限循环
- 没有针对DLQ增长设置告警
- DLQ本身没有TTL
简而言之
使用 x-dead-letter-exchange 配置DLQ。死信的三种原因:被拒绝、TTL过期、超出长度限制。重新处理前先读取 x-death 头信息。优先使用基于消费者的重处理方式。对增长速率设置告警。
Qarote 会在DLQ增长速率异常时立即告警,让你在消费者开始拒绝消息时第一时间知晓——而不是三周后才在复盘会议上发现。