最后更新:2026-03-26
生产环境下一个异步任务从“出生”到“消亡”的完整旅程:由客户端触发,进入 Broker 队列, 被 Worker 执行并写入 ResultBackend,最终由客户端查询结果。
将业务函数注册为 Celery Task,生成唯一任务名。
序列化任务名称、参数、ETA、重试策略等元数据。
客户端立刻返回 AsyncResult,不阻塞主业务。
消息进入 RabbitMQ / Redis,路由到指定队列。
Broker 维护队列顺序,等待 Worker 领取。
Worker 常驻 TCP 连接,轮询获取可执行消息。
执行任务逻辑,处理重试、超时、异常策略。
执行完成后写入结果后端,或更新重试状态。
保存状态、返回值、异常堆栈等。
客户端使用任务 ID 查询状态与结果。
结果过期或被清理,任务生命周期结束。
把一个普通函数用 @app.task 标记后,Celery 会把它“登记成任务”:给它分配一个唯一的任务名(后续靠任务名找到并执行它)。同时也会记录“发消息的默认规则”:当你调用 delay/apply_async 时,要把任务名、参数、task_id、倒计时/ETA、以及需要的元信息打包成一条可传输的字符串(常见是 JSON)。另外还会记录“发到哪里”:默认走哪个 queue、使用什么 routing_key(或在代码里显式指定),这样 Broker 才知道把消息放进哪个队列,Worker 才会从对应队列里取到并执行。最后再记录任务在执行生命周期里没有成功完成即任务失败时怎么处理(重试次数/间隔、超时等)。
调用 delay/apply_async:把“执行意图”封装成异步指令——任务名、Args/Kwargs、task_id、routing_key、ETA/countdown 等元数据;再通过 Kombu 将消息非阻塞投递到 Broker。调用会立刻返回 AsyncResult(追踪句柄,持有 task_id),之后可用它查询状态(PENDING/STARTED/SUCCESS/FAILURE)或取结果;同时 apply_async 还能在调用时覆盖默认路由(指定 queue/priority 等),把同一任务按场景分发到不同 Worker 集群。
生产者把消息推给 Broker:RabbitMQ 遵循 AMQP,把消息先投递到 Exchange(默认 celery),再由 Exchange 按 routing_key + binding 规则决定进入哪个 Queue;Redis 没有 Exchange 层,通常把“队列名”直接映射为一个 List Key,消息会直接写入该 Key 对应的列表。
核心动作:队列(Queue)主动绑定(Binding)到交换机(Exchange),并在绑定时写下自己的过滤规则(Routing Key)。
routing_keyerror,只有消息 key=error 才进入stock.#,可匹配 stock.usd/stock.cnRedis 用 List 模拟队列:Producer 执行 LPUSH 把消息塞到列表左端;Worker 执行 BRPOP 阻塞式从右端弹出消息(队列为空时连接挂起,不消耗 CPU;有新消息时立刻被“弹醒”)。因为没有 Exchange,路由通常在客户端侧就决定好写入哪个 List Key。
segment 文件并维护索引;堆积很大时会触发 paging,把部分消息从内存换到磁盘,用“变慢”换“服务不挂”。fsync 强制把 OS 缓存刷到物理磁盘。只有在“已经安全落盘”之后,RabbitMQ 才会向生产者确认收到——因此断电重启后还能从磁盘把未完成消息恢复回来。代价是吞吐下降(磁盘 IOPS 限制)。在 Celery 中通常可通过 app.conf.task_default_delivery_mode = 'persistent' 开启。分发:Broker 把队列里的消息“交给”哪些 Worker、以什么节奏交付。背压:当 Worker 忙/慢时,限制消息从 Broker(中间件)进入 Worker(执行进程)内存的速度,避免内存被撑爆或延迟失控。
prefetch_count(预取数量),RabbitMQ 会按配额把消息推过去。prefetch=4 时,RabbitMQ 最多同时让该 Worker 持有 4 条未确认消息;只要 Worker ACK 了一条,才再补推下一条。BRPOP 阻塞式拉取:有任务立即弹出;没任务就挂起等待,不忙轮询。目标:防止 Worker 中途挂掉导致“任务丢失”。两种 Broker 的实现思路不同:
Unacked(待确认),对其他 Worker 不可见。ACK,RabbitMQ 才真正从队列/磁盘中移除该消息。Ready,重新排队并投递给其他健康 Worker(Redelivery)。RPOPLPUSH(或新版 BLMOVE)把消息从主队列原子移动到“处理中/备份队列”(常见 key 形如 unacked:*)。这是任务的“高光时刻”:消息进入 Worker 后,会被还原为可执行的调用,并按以下阶段推进:
content-type(如 application/json),把消息体解包为 args/kwargsSoftTimeLimitExceeded 给清理机会;hard time limit 可能强制终止self.retry() 会按策略(如指数退避)重新投递带 ETA(预计到达时间) 的任务SUCCESS/FAILUREtask_ignore_result=True(或任务级 ignore_result=True),避免结果在 Backend 大量堆积acks_late=True:执行完成后再 ACK(更可靠,Worker 中途挂了可触发重投);相对地“收到即 ACK”吞吐更高但更易丢当你调用 AsyncResult.get() 时,本质是在“查任务结果 + 等它就绪”,以及在结果写入后“按策略回收”。
interval(默认约 0.5s)循环向 Backend 发起读取(例如 Redis 的 GET celery-task-meta-<task_id> 或查询 DB 记录)。timeout 防止无限等待;propagate 决定任务失败时是否把异常在客户端重新抛出。task_id 相关的通知通道;Worker 写完结果后发布消息,客户端被瞬间唤醒,显著降低无效请求与 CPU 压力。EXPIRE,由 result_expires 控制保留时间(默认通常是 1 天)。时间到后 Key 自动物理删除。celery.backend_cleanup(通常配合 celery beat)定时执行删除过期行。task_ignore_result:最关键优化。默认建议全局开启;只在确实需要返回值的任务上用 @app.task(ignore_result=False) 显式打开。result_expires:结果保留时长(秒)。并发高时建议设短一些(例如 3600=1h)以降低堆积风险。task_ignore_result=True,又从不调用 .get()/.forget(),结果会在 Backend 持续堆积直到过期,最终导致缓存/数据库被撑爆。A:目标是“消息至少成功处理一次”,允许重复,不允许静默丢失。关键是把 发送确认 → 存储持久化 → 消费确认 → 崩溃恢复 → 幂等去重 串成闭环。
durable=true(RabbitMQ 重启后队列定义仍保留);同时消息设为 delivery_mode=2(消息持久化),两者都开启才具备“重启后可恢复”的基础。另一个关键点是队列类型:classic queue 更轻更快但高可用能力相对弱,quorum queue 会在多节点间复制队列日志(类似 Raft 的主从一致性思路),单点故障下更抗风险,适合订单/支付/计费等关键链路。appendfsync always:每次写命令都执行一次 fsync,数据安全性最高(几乎不丢),但磁盘 I/O 开销极大,吞吐容易受限。appendfsync everysec:每秒执行一次 fsync,通常最多丢 1~2 秒数据,安全与性能折中,生产最常用。appendfsync no:交由操作系统决定刷盘时机,性能最好,但可能丢几十秒数据,不建议关键链路使用。BRPOPLPUSH/BLMOVE 将消息从 main 原子移动到 processing(处理中队列),避免“弹出即丢”。XREADGROUP + XACK,未确认消息进入 PEL(pending entries list)。visibility_timeout 的消息回捞到主队列(或转移给其他消费者)。Business_ID(业务主键),消费前先落“去重记录”(唯一索引冲突即判重)。A:死信队列不是“失败终点”,而是“故障隔离 + 诊断补偿入口”。目标是:主链路持续可用,异常消息可追踪、可回放、可审计。
x-dead-letter-exchange(必填,指定“消息出事后要转投到哪个交换机”)。x-dead-letter-routing-key(选填,指定“转投时改用哪个路由键”;不填则沿用原路由键)。dead_letter_key(List/Stream/ZSet 均可)。business_id、retry_count、first_fail_at、last_error、origin_queue、trace_id。