Celery
Celery is a framework that allows the creation and management of asynchronous tasks. It supports executions at a certain time or periodically.
It is composed by three actors:
The beater, which schedules the task at the defined time or at the end of the selected interval that has been specified
The worker, which executes the scheduled tasks
The message broker, an external application to share messages between both. It can be Redis, RabbitMQ, MongoDB, Kafka, etc
Setting up Celery
To use Celery, you must have already a message broker configured.
First install Celery
pip install celery
Create a new file app.py
and write the following code to set up a basic Celery application
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
Worker execution
celery worker -A app.celery_app --loglevel=info
Beater execution
celery beat -A app.celery_app --loglevel=info
Remove all scheduled tasks
celery purge -A app.celery_app
Asynchronous task execution
Celery allows to execute tasks asynchronously.
Once a task is submitted, the worker processes it in the background, freeing the main process to handle other requests.
@celery_app.task(name='scheduled_task')
def scheduled_task(text):
return text
@celery_app.task(name='periodic_task')
def periodic_task():
return 'Task completed'
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(minute=0, hour=6, day_of_week=4), scheduled_task.s('hello world')) # execute the Thursday at 06.00
sender.add_periodic_task(10.0, periodic_task.s()) # execute every 10 seconds
Task retries and failure handling
Celery supports automatic retries for tasks.
You can configure the number of retries and backoff intervals in case of task failure.
@celery.task(bind=True, max_retries=3)
def some_task_with_retry(self):
try:
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=60) # Retry after 60 seconds
Using Redis as message broker
The configuration is set through the broker URL in Celery's setup:
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
Using RabbitMQ as message broker
To configure RabbitMQ, change the broker URL:
celery = Celery(
'tasks',
broker='pyamqp://guest@localhost//',
backend='redis://localhost:6379/0'
)
Using Kafka as message broker
While not as commonly used as Redis or RabbitMQ, Celery can integrate with Kafka for message brokering.
You will need the celery[redis]
and celery[kafka]
extra dependencies installed:
pip install celery[kafka]
Then, configure the broker as follows:
celery = Celery(
'tasks',
broker='kafka://localhost:9092',
backend='redis://localhost:6379/0'
)
Integrating Celery with Flask
You can integrate Celery with Flask by configuring the Celery object with the Flask app and initializing it.
Example:
from celery import Celery
from flask import Flask
app = Flask(__name__)
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
return celery
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)
celery = make_celery(app)
Integrating Celery with FastAPI
FastAPI integrates with Celery similarly. You need to configure Celery as a background task manager and start the worker to handle tasks.
Example:
from celery import Celery
from fastapi import FastAPI
app = FastAPI()
celery = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@app.post("/background_task/")
def background_task():
celery.send_task('tasks.some_long_task')
Last modified: 17 March 2025