When choosing a Python task queue, both Celery and ARQ are options, though other alternatives like RQ, Huey, and Dramatiq are also popular.
Python web applications need to run
background jobs or asynchronous tasks, then you are at right place. This post outline the arq and celery.
You might need to:
-
Send emails without blocking your API request.
-
Process large files in the background.
-
Run scheduled jobs like cleaning up old records.
-
Or offload tasks that depend on external APIs.
Two popular tools for doing this are Celery and ARQ.
Both solve the same core problem — task queuing — but in very different ways.
What is Celery?
Celery is a battle-tested distributed task queue for Python.
It’s been around for years, supports multiple brokers (RabbitMQ, Redis, SQS), and can scale to handle millions of tasks.
Key features:
-
Works with many brokers.
-
Has retry, result backend, monitoring (Flower).
-
Good ecosystem of extensions.
-
Supports periodic tasks (via Celery Beat).
One drawback of using Celery is that it may be overkill for smaller projects, as it requires dedicated worker processes, operates with multiple Python processes, and involves a more complex setup.
What is ARQ?
ARQ is a modern asyncio task queue built specifically for Python async apps — especially FastAPI, Starlette, or any other asyncio framework.
Key features:
-
Built for async/await.
-
Uses Redis as the only backend.
-
Tiny footprint — workers run inside your async event loop.
-
Simple config — no separate Beat for scheduling.
Downside: ARQ is great for modern async apps, but lacks some advanced Celery features (multiple brokers, older ecosystem, no Flower-like GUI).
When should you use them?
| Use Case |
ARQ |
Celery |
| Small to medium FastAPI app |
✔️ |
|
| You want native async |
✔️ |
|
| You use Django or Flask |
|
✔️ |
| You want AMQP (RabbitMQ) |
|
✔️ |
| Need advanced monitoring |
|
✔️ |
| Complex retry workflows |
|
✔️ |
| Simple Redis-only async tasks |
✔️ |
|
| Use Case |
ARQ |
Celery |
| Small to medium FastAPI app |
✔️ |
|
| You want native async |
✔️ |
|
| You use Django or Flask |
|
✔️ |
| You want AMQP (RabbitMQ) |
|
✔️ |
| Need advanced monitoring |
|
✔️ |
| Complex retry workflows |
|
✔️ |
| Simple Redis-only async tasks |
✔️ |
|
Example Use Cases
Common scenarios:
-
Send welcome emails after user signs up → Both Celery and ARQ work.
-
Process video encoding → Both work, but ARQ if your app is all async.
-
Run heavy data pipelines with 10+ workers → Celery often better.
-
Small FastAPI with async HTTP calls (APIs) → ARQ is perfect.
Example — Celery
pip install celery redis
tasks.py
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
celery -A tasks worker --loglevel=info
from tasks import add
result = add.delay(4, 6)
print("Task ID:", result.id)
print("Result:", result.get(timeout=10))
from tasks import add
result = add.delay(4, 6)
print("Task ID:", result.id)
print("Result:", result.get(timeout=10))
Celery uses Redis here, but you can swap in RabbitMQ or SQS easily.
You can also run Flower to monitor tasks:
pip install flower
celery -A tasks flower
Example — ARQ:
pip install arq
tasks.py
from arq import create_pool
from arq.connections import RedisSettings
from arq.worker import Worker, Function
import asyncio
async def add(ctx, x, y):
print(f"Adding {x} + {y}")
return x + y
class WorkerSettings:
functions = [add]
redis_settings = RedisSettings()
# To enqueue:
async def main():
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job('add', 4, 6)
result = await job.result(timeout=5)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
from arq import create_pool
from arq.connections import RedisSettings
from arq.worker import Worker, Function
import asyncio
async def add(ctx, x, y):
print(f"Adding {x} + {y}")
return x + y
class WorkerSettings:
functions = [add]
redis_settings = RedisSettings()
# To enqueue:
async def main():
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job('add', 4, 6)
result = await job.result(timeout=5)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
arq tasks.WorkerSettings
Sending Email with Celery
project/
├── app.py
├── tasks.py # Celery
└── worker.py # Worker entrypoint
tasks.py
# tasks.py
from celery import Celery
import smtplib
from email.message import EmailMessage
# Use Redis as the broker
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def send_email(recipient, subject, body):
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = 'noreply@example.com'
msg['To'] = recipient
msg.set_content(body)
with smtplib.SMTP('smtp.yourprovider.com', 587) as smtp:
smtp.starttls()
smtp.login('your_smtp_user', 'your_smtp_password')
smtp.send_message(msg)
# tasks.py
from celery import Celery
import smtplib
from email.message import EmailMessage
# Use Redis as the broker
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def send_email(recipient, subject, body):
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = 'noreply@example.com'
msg['To'] = recipient
msg.set_content(body)
with smtplib.SMTP('smtp.yourprovider.com', 587) as smtp:
smtp.starttls()
smtp.login('your_smtp_user', 'your_smtp_password')
smtp.send_message(msg)
app.py
# app.py
from flask import Flask, request, jsonify
from tasks import send_email
app = Flask(__name__)
@app.route('/register', methods=['POST'])
def register():
data = request.get_json()
email = data['email']
# Fire and forget
send_email.delay(
recipient=email,
subject="Welcome!",
body="Thank you for registering with us!"
)
return jsonify({"message": "User registered! Email will be sent in background."}), 200
if __name__ == '__main__':
app.run(debug=True)
# app.py
from flask import Flask, request, jsonify
from tasks import send_email
app = Flask(__name__)
@app.route('/register', methods=['POST'])
def register():
data = request.get_json()
email = data['email']
# Fire and forget
send_email.delay(
recipient=email,
subject="Welcome!",
body="Thank you for registering with us!"
)
return jsonify({"message": "User registered! Email will be sent in background."}), 200
if __name__ == '__main__':
app.run(debug=True)
Run the worker
celery -A tasks worker --loglevel=info
Sending Email with ARQ
pip install arq aiosmtplib fastapi uvicorn
project/
├── main.py
├── tasks.py
tasks.py
# tasks.py
from arq import create_pool
from arq.connections import RedisSettings
from arq.worker import Worker, Function
import aiosmtplib
from email.message import EmailMessage
async def send_email(ctx, recipient: str, subject: str, body: str):
msg = EmailMessage()
msg['From'] = 'noreply@example.com'
msg['To'] = recipient
msg['Subject'] = subject
msg.set_content(body)
await aiosmtplib.send(
msg,
hostname="smtp.yourprovider.com",
port=587,
start_tls=True,
username="your_smtp_user",
password="your_smtp_password"
)
class WorkerSettings:
functions = [send_email]
redis_settings = RedisSettings()
# tasks.py
from arq import create_pool
from arq.connections import RedisSettings
from arq.worker import Worker, Function
import aiosmtplib
from email.message import EmailMessage
async def send_email(ctx, recipient: str, subject: str, body: str):
msg = EmailMessage()
msg['From'] = 'noreply@example.com'
msg['To'] = recipient
msg['Subject'] = subject
msg.set_content(body)
await aiosmtplib.send(
msg,
hostname="smtp.yourprovider.com",
port=587,
start_tls=True,
username="your_smtp_user",
password="your_smtp_password"
)
class WorkerSettings:
functions = [send_email]
redis_settings = RedisSettings()
main.py
# main.py
from fastapi import FastAPI
from tasks import send_email
from arq import create_pool
from arq.connections import RedisSettings
app = FastAPI()
@app.post("/register")
async def register(data: dict):
email = data["email"]
redis = await create_pool(RedisSettings())
await redis.enqueue_job("send_email", email, "Welcome!", "Thanks for signing up!")
return {"message": "User registered, email will be sent in background!"}
# main.py
from fastapi import FastAPI
from tasks import send_email
from arq import create_pool
from arq.connections import RedisSettings
app = FastAPI()
@app.post("/register")
async def register(data: dict):
email = data["email"]
redis = await create_pool(RedisSettings())
await redis.enqueue_job("send_email", email, "Welcome!", "Thanks for signing up!")
return {"message": "User registered, email will be sent in background!"}
Run your ARQ worker
arq tasks.WorkerSettings
And your API:
uvicorn main:app --reload