Introduction
Three seconds is an eternity when you're staring at a loading spinner.
A user clicks "Send invoice." Your view generates a PDF, renders an email, connects to SMTP, waits... and only then returns a response. None of that work needed to block the user, but it did.
One request takes three seconds. Under load, it compounds. Your web layer becomes a bottleneck for work it shouldn't even be doing.
This is exactly what background tasks are for. Move the slow, non-essential work out of the request path. Return in 50ms. Let something else handle the rest.
Celery is still the default choice in Django. It's been around forever, it's not perfect, but when it's set up right, it quietly does its job at scale.
How Celery works
Three pieces work together: your Django application, a message broker, and one or more Celery workers.
When your view calls send_invoice.delay(order_id=42), Celery serialises the task name and its arguments into a message and pushes it onto a queue in the broker. The broker is just a message bus — Redis or RabbitMQ are the most common choices. Redis is simpler to operate and good enough for most applications.
A Celery worker process sits there watching the queue. When a message arrives, the worker picks it up, deserialises it, calls the actual Python function with the stored arguments, and marks the task as done. If it fails, the worker can retry according to whatever rules you configured.
Your Django app and the worker both import the same task functions, but they run as separate OS processes. The broker is the only thing connecting them.
Setting up Celery
Install Celery and Redis's Python client:
pip install celery redisCreate a celery.py file at the root of your Django project, next to your settings.py:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
app = Celery("myproject")
# Read configuration from Django settings, using the CELERY_ prefix
app.config_from_object("django.conf:settings", namespace="CELERY")
# Auto-discover tasks.py files in all installed apps
app.autodiscover_tasks()Wire it up in your project's __init__.py so the app is loaded when Django starts:
from .celery import app as celery_app
__all__ = ("celery_app",)Add the Celery configuration to settings.py:
# Broker — Redis running locally (use a proper URL in production)
CELERY_BROKER_URL = "redis://localhost:6379/0"
# Result backend — where task results are stored (optional)
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
# Always use JSON. The default (pickle) is a security risk.
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
# Store task results for 24 hours, then discard them
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# Timezone awareness
CELERY_TIMEZONE = "Africa/Nairobi"
CELERY_ENABLE_UTC = True⚠ Gotcha
Never use pickle as the serializer
Celery's default serializer used to be pickle. Pickle can execute arbitrary code when deserialised, which means a compromised broker can run code on your workers. Always set CELERY_TASK_SERIALIZER to 'json' explicitly.
Your first task
Create a tasks.py file in any of your Django apps. Celery's autodiscover_tasks() will find it automatically.
import logging
from celery import shared_task
from django.core.mail import send_mail
logger = logging.getLogger(__name__)
@shared_task
def send_order_confirmation(order_id):
from .models import Order # import inside the function to avoid circular imports
try:
order = Order.objects.select_related("customer").get(id=order_id)
except Order.DoesNotExist:
logger.warning("send_order_confirmation: order not found", extra={"order_id": order_id})
return
send_mail(
subject=f"Your order #{order.id} is confirmed",
message=f"Hi {order.customer.name}, your order has been confirmed.",
from_email="orders@example.com",
recipient_list=[order.customer.email],
)
logger.info("order_confirmation_sent", extra={"order_id": order_id})Call it from your view using .delay():
from .tasks import send_order_confirmation
def create_order(request):
order = Order.objects.create(customer=request.user.customer)
# Queue the email. Returns immediately.
send_order_confirmation.delay(order.id)
return JsonResponse({"id": order.id, "status": "created"})Start a worker in a separate terminal to process the queue:
celery -A myproject worker --loglevel=infoRetries and error handling
Networks fail. SMTP servers time out. External APIs return 500s. A task that doesn't retry is a task that silently drops work. Celery's retry mechanism handles this cleanly, and the best pattern is exponential backoff so you don't hammer a struggling service.
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
import logging
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=5,
default_retry_delay=60, # start at 60 seconds
)
def send_order_confirmation(self, order_id):
from .models import Order
try:
order = Order.objects.select_related("customer").get(id=order_id)
send_confirmation_email(order)
logger.info("order_confirmation_sent", extra={"order_id": order_id})
except Order.DoesNotExist:
# No point retrying — the order doesn't exist
logger.warning("order_not_found", extra={"order_id": order_id})
return
except Exception as exc:
logger.warning(
"order_confirmation_failed_retrying",
extra={"order_id": order_id, "attempt": self.request.retries},
)
# Exponential backoff: 60s, 120s, 240s, 480s, 960s
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))✦ Tip
bind=True gives you access to the task instance
When bind=True, the first argument to your task function is self — the task instance. This gives you access to self.retry(), self.request.retries (the current attempt number), and self.request.id (the task ID).
For tasks that must not be dropped even after max retries are exceeded, catch MaxRetriesExceededError and persist the failure so a human can investigate:
from celery.exceptions import MaxRetriesExceededError
@shared_task(bind=True, max_retries=5)
def send_order_confirmation(self, order_id):
try:
...
except Exception as exc:
try:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
except MaxRetriesExceededError:
logger.error(
"order_confirmation_max_retries_exceeded",
extra={"order_id": order_id},
)
# Persist the failure so it can be handled manually
FailedTask.objects.create(
task_name="send_order_confirmation",
args={"order_id": order_id},
error=str(exc),
)Designing tasks well
Most Celery bugs come from tasks that were designed incorrectly, not from Celery itself. A few principles go a long way.
Pass IDs, not objects
When you call .delay(), the arguments get serialised into JSON and stored in the broker. If you pass a Django model instance, it has to be serialised and deserialised — and the state of that object at deserialisation time may no longer match what it was when you called the task. Pass the primary key and let the task fetch the current state from the database.
# Wrong: object gets pickled, stale data, large payload
send_order_confirmation.delay(order=order)
# Right: pass the ID, fetch fresh from DB inside the task
send_order_confirmation.delay(order_id=order.id)Make tasks idempotent
A task might run more than once — due to retries, a worker crash mid-execution, or an at-least-once delivery guarantee from the broker. Design tasks so that running them twice produces the same result as running them once.
@shared_task(bind=True)
def charge_customer(self, order_id):
order = Order.objects.get(id=order_id)
# Idempotency check: don't charge twice
if order.payment_status == "charged":
logger.info("charge_skipped_already_charged", extra={"order_id": order_id})
return
charge = payment_gateway.charge(order.total, order.customer.payment_method)
order.payment_status = "charged"
order.charge_id = charge.id
order.save(update_fields=["payment_status", "charge_id"])Keep tasks small and focused
A task that does ten things is hard to retry, hard to monitor, and hard to debug when one of those ten things fails. One task, one responsibility. Chain tasks together using Celery's canvas primitives when you need a pipeline.
from celery import chain
# Instead of one giant task, compose a pipeline
order_pipeline = chain(
validate_order.s(order_id),
charge_customer.s(),
send_confirmation.s(),
update_inventory.s(),
)
order_pipeline.delay()Periodic tasks with Beat
Celery Beat is a scheduler that runs alongside your workers and fires tasks on a schedule — like a cron job that integrates directly with your Celery setup. Common use cases are sending daily digest emails, cleaning up expired sessions, syncing data from external APIs, and generating reports.
pip install django-celery-beatINSTALLED_APPS = [
...
"django_celery_beat",
]
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"send-daily-digest": {
"task": "notifications.tasks.send_daily_digest",
"schedule": crontab(hour=8, minute=0), # every day at 8am
},
"cleanup-expired-sessions": {
"task": "accounts.tasks.cleanup_expired_sessions",
"schedule": crontab(hour=2, minute=0), # every day at 2am
},
"sync-exchange-rates": {
"task": "finance.tasks.sync_exchange_rates",
"schedule": 60 * 15, # every 15 minutes
},
}Start the Beat scheduler as a separate process. Never run Beat and a worker in the same process — Beat can run multiple instances but only one should be active at a time, and mixing them makes that hard to enforce.
# In one terminal: the worker
celery -A myproject worker --loglevel=info
# In another: the beat scheduler
celery -A myproject beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseSchedulerMonitoring your workers
A Celery worker that silently stops processing is indistinguishable from a healthy one unless you're watching it. Two tools make this manageable.
Flower
Flower is a real-time web dashboard for Celery. It shows active workers, queued tasks, task history, failure rates, and lets you revoke or retry tasks from the browser.
pip install flower
# Start Flower alongside your workers
celery -A myproject flower --port=5555Queue depth alerting
A growing queue depth means tasks are arriving faster than workers can process them. This is an early warning sign before users start noticing delays. Check queue depth from Redis and alert when it grows past a threshold.
import redis
r = redis.Redis.from_url(settings.CELERY_BROKER_URL)
def get_queue_depth(queue_name="celery"):
return r.llen(queue_name)
# In a health check endpoint or a periodic monitoring task
depth = get_queue_depth()
if depth > 500:
logger.warning("celery_queue_depth_high", extra={"depth": depth})Common pitfalls
These are the mistakes that show up in almost every Celery codebase eventually.
⚠ Gotcha
Calling tasks synchronously in tests
By default, calling .delay() in tests actually sends the task to a broker, which means your tests need a running Redis instance. Set CELERY_TASK_ALWAYS_EAGER = True in test settings to run tasks synchronously inline. Use CELERY_TASK_EAGER_PROPAGATES = True alongside it so exceptions surface instead of being silently swallowed.
# Run tasks synchronously during testing
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True⚠ Gotcha
Importing models at module level inside tasks
Importing models at the top of tasks.py can cause circular import errors because Django's app registry may not be fully loaded when Celery imports your task module. Import models inside the task function body instead.
Note
Run multiple workers for parallelism
A single Celery worker process uses concurrency via threads or processes (depending on the pool). For high-throughput workloads, run multiple worker processes. Use --concurrency to control how many tasks a single worker handles in parallel.
# 4 worker processes, each handling 4 concurrent tasks
celery -A myproject worker --concurrency=4 --loglevel=info
# Named workers make it easier to route specific task types
celery -A myproject worker --concurrency=2 --queues=emails --hostname=email-worker@%h
celery -A myproject worker --concurrency=8 --queues=default --hostname=default-worker@%hSummary
Background tasks are one of the highest-leverage changes you can make to a Django application. Moving slow work out of the request cycle makes your API faster for users and makes your application more resilient to external failures.
- Use
@shared_taskso tasks don't depend on a specific Celery app instance. - Always use JSON serialisation. Never use pickle.
- Pass primary keys into tasks, not model instances. Fetch fresh data inside the task.
- Use
bind=Trueand exponential backoff for retries. - Design tasks to be idempotent so running them twice is safe.
- Keep tasks small. Use Celery's canvas primitives to chain them into pipelines.
- Use Celery Beat for periodic tasks. Run Beat as a separate process, never mixed with workers.
- Set
CELERY_TASK_ALWAYS_EAGER = Truein test settings so tasks run synchronously during tests. - Monitor queue depth. A queue that grows without bound means your workers are falling behind.