Exploring Celery + Redis for Background Jobs
Purpose
The purpose of this lab was to explore Celery + Some Broker as a practical task queue option for the Labs Reviewer backend.
At that point, the Agent Backend Python was using FastAPI BackgroundTasks to run long markdown processing jobs. That works well for a first version, but the work still runs inside the API process. I wanted to test what it would look like to move that work into a separated worker process, using Redis as the broker and Celery as the task execution layer.
This was not a full production architecture redesign. The goal was narrower: compare Redis and RabbitMQ for this use case, implement Celery without losing the current BackgroundTasks path, validate that a worker could consume jobs from Redis, and define what happens when a task reaches a final failure.
The result is a transition-friendly implementation. Local development can still use BackgroundTasks, while production-like environments can run the same workflow through Celery + Redis.
Current Architecture
Just to be clear about the current architecture, before the queue implementation the system already had three main applications: a React frontend, a Node.js authentication backend, and a Python backend responsible for the agent workflow.
+------------------+ login +----------------------+
| Frontend | -------------------> | Auth Backend Node |
| React + Vite | | Express + JWT |
+------------------+ +----------------------+
|
| authenticated lab request
v
+------------------------+
| Agent Backend Python |
| FastAPI + Agents |
+------------------------+
|
| FastAPI BackgroundTasks
v
+------------------------+
| Markdown Processing |
| agents + output files |
+------------------------+
For this lab, the important part is the last step. Markdown processing was still attached to the API process through FastAPI BackgroundTasks. The queue experiment focused on moving that work behind Celery while keeping the request and tracking flow the same for the frontend.
Redis vs RabbitMQ
I researched two common Celery broker options: Celery + RabbitMQ and Celery + Redis.
Celery + RabbitMQ is a traditional and robust combination. RabbitMQ is a purpose-built message broker with strong queue semantics. Celery + Redis is also very common because Redis is simple to operate and often already exists in the infrastructure.
Redis was tempting because I already had experience with it. But the purpose of these labs is also to learn, so I compared both options using a few criteria: advantages, disadvantages, development time, production cost, and scalability.
| Topic | Celery + RabbitMQ | Celery + Redis |
| --------------------------------- | -------------------------------------------------------------------- | --------------------------------------------------- |
| Best fit | Production task queue with stronger broker semantics | Fast and simple task queue for small messages |
| Development speed | Medium | Fastest |
| Broker maturity for queues | Purpose-built broker | Data store used as a broker |
| Celery support | Stable, with monitoring and remote control | Stable, with monitoring and remote control |
| Message size | Better for larger messages according to Celery broker guidance | Best for small messages; large messages can congest |
| Reliability tools | Acks, publisher confirms, durable queues, dead lettering, quorum queues | Visibility timeout, Redis persistence, app-level idempotency |
| Operational complexity | Higher | Lower |
| Production cost at small scale | Usually higher than basic Redis | Often lower, especially if Redis already exists |
| Production cost at larger backlog | More predictable because queue data can live on disk | Memory can become expensive |
| Horizontal worker scaling | Strong | Strong, with more duplicate-delivery caveats |
| High availability | RabbitMQ clustering and quorum queues | Redis Sentinel, Redis Cluster, or managed HA |
| Recommended for this backend | Best long-term choice | Best quick first iteration |
Redis
Redis is the fastest path to implement. Celery's Redis setup is simple: install celery[redis] and configure a redis://... broker URL. Redis can also be used as Celery's result backend, although in this project MongoDB should remain the user-facing status store.
The tradeoff is that Redis is an in-memory data store first, not a dedicated AMQP broker. Broker memory pressure matters more, and Celery-on-Redis has caveats around visibility timeout, durability, and duplicate delivery. If a task is not acknowledged before the visibility timeout, it can be redelivered and executed again. Long ETA, countdown, and retry tasks need special care.
RabbitMQ
RabbitMQ is designed around queues, acknowledgements, routing, delivery semantics, and broker observability. It supports consumer acknowledgements, publisher confirms, routing keys, exchanges, queue TTL, queue length limits, priorities, dead-letter exchanges, and durable queues.
The tradeoff is operational complexity. A RabbitMQ setup needs users, virtual hosts, permissions, exchanges, queues, management, monitoring, and production planning around memory alarms, disk alarms, cluster sizing, connection limits, quorum queues, and upgrades.
RabbitMQ also scales well, but the queue topology matters. A single hot queue can become a bottleneck. At higher scale, the system should use multiple queues, route by task type, and run multiple Celery workers.
For this lab, RabbitMQ felt heavier than necessary. The goal was to move from FastAPI BackgroundTasks to Celery while keeping the first production-like version practical. I chose Redis as the broker for the first iteration, while keeping RabbitMQ as the stronger long-term option if the workload grows.
Redis Implementation
I wanted to implement Celery + Redis without losing the current BackgroundTasks behavior. To do that, I used the Strategy Pattern again. The API service should not be full of if TASK_DISPATCHER == ... checks. It should depend on a dispatcher contract, and the selected strategy should decide how the job is enqueued.
These environment variables configure the dispatch mode:
TASK_DISPATCHER=background_tasks
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
TASK_DISPATCHERdefines which dispatch strategy the application should use. It can point to the currentbackground_tasksimplementation or to the newceleryimplementation.CELERY_BROKER_URLis the Redis connection used by Celery as the message broker. In this example, Celery publishes pending tasks to Redis database0, and Celery workers consume them from there.CELERY_RESULT_BACKENDis where Celery stores task execution results and task states. In this example, Redis database1separates result data from broker data. MongoDB still remains the user-facing source of truth for process status.
The generic task infrastructure lives in core/tasks. It is not specific to markdown generation or to the Labs feature. It only knows how to enqueue work using the configured strategy.
Task Dispatcher
The central file is core/tasks/task_dispatcher.py. It defines a TaskDispatcher protocol with a single enqueue(...) method and a build_task_dispatcher() function that reads settings.TASK_DISPATCHER.
"""Generic task dispatcher contract and configuration-based builder."""
from typing import Any, Protocol
import core.tasks.constants as constants
from core.config import settings
from core.tasks.exceptions import TaskDispatcherConfigurationError
class TaskDispatcher(Protocol):
"""Generic task dispatcher contract."""
async def enqueue(self, **kwargs: Any) -> None:
...
def build_task_dispatcher() -> TaskDispatcher:
"""Build the generic task dispatcher configured by TASK_DISPATCHER."""
dispatcher = settings.TASK_DISPATCHER.strip().lower()
if dispatcher == constants.DISPATCHER_BACKGROUND_TASKS:
from core.tasks.background_task_dispatcher import BackgroundTasksDispatcher
return BackgroundTasksDispatcher()
if dispatcher == constants.DISPATCHER_CELERY:
from core.tasks.celery_task_dispatcher import CeleryTaskDispatcher
if not settings.CELERY_RESULT_BACKEND.strip():
raise TaskDispatcherConfigurationError(
"CELERY_RESULT_BACKEND is required when TASK_DISPATCHER=celery."
)
return CeleryTaskDispatcher()
raise TaskDispatcherConfigurationError(
f"Unsupported TASK_DISPATCHER: {settings.TASK_DISPATCHER}."
)
When the value is background_tasks, the application returns a BackgroundTasksDispatcher. When the value is celery, it validates the Celery configuration and returns a CeleryTaskDispatcher. If the value is unknown, the application fails with a clear configuration error instead of silently choosing the wrong execution mode.
BackgroundTasks Dispatcher
core/tasks/background_task_dispatcher.py keeps the old behavior. It receives a BackgroundTaskSubmission, which contains the Python function to run and its positional arguments. Then it calls background_tasks.add_task(...).
"""FastAPI BackgroundTasks dispatch strategy."""
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from fastapi import BackgroundTasks
@dataclass(frozen=True)
class BackgroundTaskSubmission:
"""Task payload for FastAPI BackgroundTasks dispatch."""
function: Callable[..., Any]
args: tuple[Any, ...] = ()
class BackgroundTasksDispatcher:
"""Schedule work with FastAPI BackgroundTasks."""
async def enqueue(
self,
*,
submission: BackgroundTaskSubmission,
background_tasks: BackgroundTasks,
) -> None:
background_tasks.add_task(
submission.function,
*submission.args,
)
This works because FastAPI BackgroundTasks runs inside the API application process after the HTTP response is sent. In this mode, the code can pass live Python objects, such as the writer agent, translator agent, metadata agent, and process status service, because everything still lives in the same process.
Celery Task Dispatcher
core/tasks/celery_task_dispatcher.py is the new scalable path. It receives a CeleryTaskSubmission, which contains a Celery task object and the positional arguments needed by that task. Instead of calling the processing function directly, it calls .delay(...).
"""Celery task dispatch strategy."""
from dataclasses import dataclass
from typing import Any, Protocol
from core.tasks.exceptions import TaskDispatchEnqueueError
class CeleryDelayableTask(Protocol):
def delay(self, *args: Any, **kwargs: Any) -> Any:
...
@dataclass(frozen=True)
class CeleryTaskSubmission:
"""Task payload for Celery dispatch."""
task: CeleryDelayableTask
args: tuple[Any, ...] = ()
class CeleryTaskDispatcher:
"""Schedule work with Celery."""
async def enqueue(
self,
*,
submission: CeleryTaskSubmission,
) -> None:
try:
submission.task.delay(*submission.args)
except Exception as exc:
raise TaskDispatchEnqueueError("Failed to enqueue Celery task.") from exc
This is the key difference: the job crosses a process boundary through Redis. Because of that, the payload must be serializable. The API does not send agent instances or service objects to Celery. It sends values such as context, output_path, and process_status_id.
Connecting the Dispatcher to the Labs Workflow
The bridge between the generic infrastructure and the real Labs workflow is labs/tasks/factory.py.
The service no longer enqueues "Celery" or "BackgroundTasks" directly. It creates a MarkdownOrganizationJob, which represents the application-level job: the markdown context, the output path, and the process status id. Then MarkdownOrganizationTaskDispatcher adapts that job into both possible submission formats:
- for
BackgroundTasks, it points toMarkdownHelper.process_and_save_markdown_with_statusand passes the in-memory agents and services as arguments; - for
Celery, it points to theprocess_markdown_jobCelery task and passes only the serializable fields needed by the worker.
Both submissions are prepared at the adapter layer, but only the configured dispatcher uses the one it needs. This keeps LabPostService focused on the product flow: validate the request, create the process status record in MongoDB, build the markdown job, enqueue it, and return process_id to the frontend.
In the Celery path, the worker receives the job from Redis and runs labs.process_markdown_job. Inside the worker process, the application initializes MongoDB, rebuilds the markdown processing dependencies, calls the same MarkdownHelper.process_and_save_markdown_with_status(...), and closes the MongoDB connection at the end.
That means the processing behavior stays the same. What changed is where the work runs and how it is dispatched.
Testing worker service
To manage the services, I created a Docker Compose setup with the API, worker, Redis, and MongoDB. The important service for this lab is worker and redis, because worker is the consumer that reads tasks from the Redis broker service.
services:
api:
build: .
env_file:
- .env
worker:
build: .
env_file:
- .env
command: celery -A core.tasks.celery_app.celery_app worker --loglevel=info
environment:
TASK_DISPATCHER: celery
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
MONGODB_URI: mongodb://mongodb:27017
MONGODB_DATABASE: labs_reviewer
depends_on:
redis:
condition: service_healthy
mongodb:
condition: service_started
redis:
image: redis:7-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 10
mongodb:
image: mongo:latest
ports:
- "27017:27017"
Running docker compose ps shows the worker service:
The worker logs can be followed with:
docker compose logs -f worker
The logs showed that the worker connected to Redis and started listening to the broker queue:
[2026-06-20 13:24:33,986: INFO/MainProcess] Connected to redis://redis:6379/0
worker-1 | [2026-06-20 13:24:33,993: INFO/MainProcess] mingle: searching for neighbors
worker-1 | [2026-06-20 13:24:35,008: INFO/MainProcess] mingle: all alone
worker-1 | [2026-06-20 13:24:35,022: INFO/MainProcess] celery@79080456f82e ready.
This confirmed that the worker was correctly configured. It connected to redis://redis:6379/0, the same Redis database configured as CELERY_BROKER_URL.
After sending a new process through the API, the worker received the Celery task:
That was the most important evidence of queue consumption. The API published a labs.process_markdown_job message into Redis, and the Celery worker consumed it from the broker. The flow was no longer running only inside the FastAPI process with BackgroundTasks; it crossed the Redis queue and was picked up by a separated worker process.
The logs also showed that Celery delegated execution to one of its worker processes:
The ForkPoolWorker-8 prefix matters. Celery's main process received the task, then delegated the real work to a worker process. That is the expected architecture: the API enqueues the job, and the worker service performs the expensive processing.
Now, in the middle of the logs we could notice there were also configuration warnings related to an invalid OpenAI model name:
Those warnings were not related to Celery or Redis. They only indicated that the configured model was invalid for the OpenAI provider, so the application fallback logic used gpt-4o-mini. Queue consumption still worked.
Finally, the worker completed the task successfully:
At this point, the backend was in a transition-friendly state. Local development can keep TASK_DISPATCHER=background_tasks, which is simple and fast. Production-like environments can run with TASK_DISPATCHER=celery, Redis, and a Celery worker. The API contract does not change, the frontend still tracks the process through the same process_id, and MongoDB remains the user-facing source of truth.
Dead Letter Queue
After validating that the worker consumed tasks from Redis, I wanted to test what happens when a task keeps failing after all retry attempts.
I had looked up RabbitMQ, and I found out that one common solution is a broker-level Dead Letter Queue through dead-letter exchanges and routing keys. However, in this implementation Redis is the Celery broker, and Redis does not provide the same native dead-letter queue semantics as RabbitMQ. For this first version, I implemented an application-level DLQ.
The flow is:
Celery task fails
-> Celery retries the task
-> task fails again
-> after the final retry, the task is considered a final failure
-> the application writes a compact failure payload to Redis
-> the process status is marked as failed in MongoDB
This keeps Redis as the broker while still giving the system a place to inspect failed jobs after Celery gives up retrying them.
The generic implementation lives in core/tasks/dead_letter.py. I created a custom Celery task base called DeadLetterTask, which centralizes the behavior that every task can reuse when it reaches a final failure.
class DeadLetterTask(Task):
"""Celery task base that writes final failures to a Redis DLQ."""
abstract = True
max_retries = constants.CELERY_TASK_MAX_RETRIES
_redis_client = None
def on_failure(self, exc, task_id, args, kwargs, einfo):
if self._is_final_failure():
if settings.CELERY_DLQ_ENABLED:
try:
self.write_dead_letter(exc, task_id, args, kwargs)
except Exception:
logger.exception(
"Failed to write Celery task to dead letter queue",
extra={"task_id": task_id, "task_name": self.name},
)
try:
self.on_final_failure(exc, task_id, args, kwargs, einfo)
except Exception:
logger.exception(
"Failed to run Celery task final failure hook",
extra={"task_id": task_id, "task_name": self.name},
)
super().on_failure(exc, task_id, args, kwargs, einfo)
on_failure(...) is called by Celery every time the task fails. The DLQ write only happens when _is_final_failure() returns true. This prevents the task from being sent to the DLQ during temporary failures while Celery still has retries available.
The maximum retry count is centralized in core/tasks/constants.py:
CELERY_TASK_MAX_RETRIES = 3
After the task reaches the configured retry limit, DeadLetterTask writes the failed payload to Redis:
def write_dead_letter(self, exc, task_id, args, kwargs) -> None:
"""Persist a compact final-failure payload to Redis."""
self._get_redis_client().lpush(
self.dead_letter_key(),
json.dumps(
self.build_dead_letter_payload(exc, task_id, args, kwargs),
default=str,
),
)
The DLQ key is built from the configured prefix and the Celery task name:
def dead_letter_key(self) -> str:
prefix = settings.CELERY_DLQ_KEY_PREFIX.strip()
return f"{prefix}:{self.name}"
For the markdown task, failed messages are stored under:
dlq:labs.process_markdown_job
The payload is intentionally compact and operational. It stores the task id, task name, sanitized args and kwargs, exception, retry count, queue name, and failure timestamp.
return {
"task_id": task_id,
"task_name": self.name,
"args": self.sanitize_args(args),
"kwargs": self.sanitize_kwargs(kwargs),
"error": f"{type(exc).__name__}: {exc}",
"exception_type": type(exc).__name__,
"retries": self._request_retries(),
"max_retries": self._max_retries(),
"queue": self._request_queue(),
"failed_at": utc_now().isoformat().replace("+00:00", "Z"),
}
Two settings control this behavior:
CELERY_DLQ_ENABLED: bool = _env_bool("CELERY_DLQ_ENABLED", True)
CELERY_DLQ_KEY_PREFIX: str = os.getenv("CELERY_DLQ_KEY_PREFIX", "dlq")
By default, DLQ writing is enabled and uses the dlq prefix. It can be disabled without changing the task implementation:
CELERY_DLQ_ENABLED=false
Markdown-specific DLQ Behavior
The generic DeadLetterTask is reusable, but the markdown workflow needs extra care because the task receives the full user context. This context can be large and can include private notes or uploaded content. The Labs task should not store the raw context in Redis.
The markdown-specific behavior lives in labs/tasks/celery_tasks.py, inside MarkdownDeadLetterTask:
class MarkdownDeadLetterTask(DeadLetterTask):
"""Dead-letter behavior for markdown processing tasks."""
def sanitize_args(self, args) -> list:
values = list(args or ())
if not values:
return values
return ["[omitted-context]", *values[1:]]
def sanitize_kwargs(self, kwargs) -> dict:
values = dict(kwargs or {})
if "context" in values:
values["context"] = "[omitted-context]"
return values
Instead of storing the full context, the task stores only a descriptor with length and SHA-256 hash:
return {
"omitted": True,
"length": len(text),
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(),
}
That gives enough information to debug whether two failed tasks had the same input without exposing the real content in Redis.
The final failure hook also updates the user-facing process status in MongoDB:
def on_final_failure(self, exc, task_id, args, kwargs, einfo) -> None:
anyio.run(_mark_process_failed_from_task_args, args, kwargs, exc)
Then _mark_process_failed_from_task_args(...) extracts the process_status_id from the task arguments and marks the process as failed:
await ProcessStatusService().mark_process_failed(
process_status_id=process_status_id,
result=f"{type(exc).__name__}: {exc}",
)
This matters because the DLQ is operational infrastructure, but MongoDB remains the source of truth for the frontend. The frontend should not need to inspect Redis to know whether a process failed.
Applying the DLQ to the Celery Task
The markdown processing task now uses MarkdownDeadLetterTask as its Celery base:
@celery_app.task(
bind=True,
base=MarkdownDeadLetterTask,
name="labs.process_markdown_job",
autoretry_for=(Exception,),
retry_backoff=True,
retry_kwargs={"max_retries": constants.CELERY_TASK_MAX_RETRIES},
max_retries=constants.CELERY_TASK_MAX_RETRIES,
)
def process_markdown_job(
self,
context: str,
output_path: str,
process_status_id: str,
simulate_failure: bool = False,
) -> dict[str, str]:
"""Process markdown in a Celery worker and store operational task metadata."""
if simulate_failure:
raise RuntimeError("Simulated DLQ failure")
anyio.run(
_process_markdown_job_async,
context,
output_path,
process_status_id,
)
return {
"process_status_id": process_status_id,
"output_path": output_path,
"status": "completed",
}
The important details are:
bind=Truegives the task access toself, which is necessary for retry and failure metadata.base=MarkdownDeadLetterTaskconnects the task to the DLQ behavior.autoretry_for=(Exception,)tells Celery to retry unexpected exceptions.retry_backoff=Trueprevents immediate retry loops by spacing retries out.max_retries=3means the DLQ is only written after the final failure.simulate_failureexists only to test the DLQ path by forcing a controlled exception.
The normal dispatcher still sends only the real job payload:
CeleryTaskSubmission(
task=process_markdown_job,
args=(
job.context,
str(job.output_path),
str(job.process_status_id),
),
)
So the public application flow does not expose simulate_failure. It can be used from tests or from a controlled internal/manual test path.
Testing the DLQ Behavior
To simulate a DLQ scenario, I enqueued the task with simulate_failure=True. The expected behavior was:
1. Task labs.process_markdown_job received
2. Task raises RuntimeError("Simulated DLQ failure")
3. Celery retries the task
4. Task keeps failing until max_retries is reached
5. DeadLetterTask writes the final failure to Redis
6. MarkdownDeadLetterTask marks the MongoDB process status as failed
Because my container was not using a bind mount for the worker code, I needed to rebuild and recreate the worker:
docker compose up -d --build --force-recreate worker
Then I watched the worker logs:
docker compose logs --tail=220 worker
The logs showed the expected simulated failure and retry flow:
After the final failure, I inspected the Redis DLQ. I checked retries and max_retries to confirm that the retry limit was respected. I also checked whether the job id and process id were stored correctly, because those values are needed to investigate or replay the failed process.
Replaying a DLQ Job
After confirming that the failed task was stored in Redis, the next question was whether I could add that same job back to the main queue.
In this implementation, the answer is "no", at least not directly. The DLQ record is not a Celery message that can be moved back into the celery queue. It is an application-level JSON record created for debugging and operational inspection. Its purpose is to help me understand what failed, when it failed, how many retries were attempted, which output path was affected, and which process_status_id should be investigated.
That means I should not use RPUSH or LPUSH to insert the DLQ JSON into the main Celery queue. A real Celery queue message has Celery's own format, including headers, body, retry metadata, task name, serializer information, and routing data. If I want to run the task again, I need to publish a new Celery task through the task API:
process_markdown_job.apply_async(
args=(
context,
output_path,
process_status_id,
),
kwargs={
"simulate_failure": False,
},
)
This creates a valid Celery message and sends it to Redis through the normal broker path.
There is one important limitation in my current design. The DLQ payload intentionally does not store the full context, because that content can be large and can include private user notes or uploaded file content. In the DLQ entry, the first argument is replaced with:
"[omitted-context]"
The real content is represented only by a descriptor:
{
"omitted": true,
"length": 12842,
"sha256": "context-content-sha256"
}
That means the DLQ entry alone is not enough to replay the job. It contains useful debugging metadata, but not the full input required to execute the task again.
A retry or replay option would be possible, but only if the system also had access to the original context. That could happen by storing the full context in the DLQ, or by saving it somewhere more appropriate, such as a persisted uploaded file, MongoDB, object storage, or another durable input source. Without that original input, the DLQ can identify the failure, but it cannot fully recreate the job.
For manual validation, the flow was:
1. Inspect the DLQ entry in Redis.
2. Copy the output_path and process_status_id.
3. Recover the original context from the test input.
4. Publish a new Celery task with simulate_failure=False.
5. Watch the worker consume the new task.
6. Remove the DLQ entry only after the new execution succeeds.
The Redis inspection command was:
docker compose exec redis redis-cli -n 0 LRANGE dlq:labs.process_markdown_job 0 -1
Then I could enqueue the job again from inside the application container:
docker compose exec api python -c "from labs.tasks.celery_tasks import process_markdown_job; process_markdown_job.apply_async(args=('<original-context>', 'public/markdown/example_reviewed.md', '<process-status-id>'), kwargs={'simulate_failure': False})"
In the manual test, I used simulate_failure=False because the previous task was intentionally failing only to validate the DLQ path. Without that, the requeued task would fail again and return to the DLQ after the retry limit.
After publishing the task again, the worker logs should show the normal consumption flow:
docker compose logs --tail=120 worker
The expected result is that the worker receives labs.process_markdown_job, processes the markdown again, writes the output file, and marks the process as completed in MongoDB.
Only after confirming the new execution succeeded should the old DLQ record be removed. Because this implementation stores the newest DLQ item at the beginning of the Redis list with LPUSH, the latest failed item can be removed with:
docker compose exec redis redis-cli -n 0 LPOP dlq:labs.process_markdown_job
For a manual lab, that is enough. For production, I would not keep this as a copy-and-paste process. If the application needed a real replay feature, I would create an internal admin-only tool that:
- lists DLQ records by task name;
- shows the error, retry count, failure time, output path, and process id;
- recovers the original input from the DLQ or another durable source;
- publishes a new task through
apply_async; - marks the old DLQ record as replayed or removes it after success.
The most important lesson is that a DLQ is not automatically a retry mechanism. In this version, the DLQ is mainly a debugging and investigation tool for final failures. Replaying is a separate feature, and it only becomes possible when the system also preserves or can recover the complete input needed to publish a new valid task.
Conclusion
This lab moved the backend from a single-process background execution model toward a more scalable task queue architecture with Celery + Redis.
The important part was not only adding Celery. It was keeping the application flexible during the transition. The dispatcher layer allows local development to continue using FastAPI BackgroundTasks, while production-like environments can use Celery workers without changing the API contract.
The DLQ implementation also made failure handling more explicit. Redis does not provide RabbitMQ-style broker-level dead lettering, so the application now records compact final-failure payloads, protects the user's raw context, and keeps MongoDB as the user-facing status source.
The next production step would be to make replay operationally safe with an internal admin tool and a durable source for the original input.