Sep 15, 2021
5 tips for writing production-ready Celery tasks
Celery is the go-to distributed task queue solution for most Pythonistas. It’s mature, feature-rich, and properly documented. It’s well suited for scalable Python backend services due to its distributed nature.
At Wolt, we have been running Celery in production for years. The use cases vary from workloads running on a fixed schedule (cron) to “fire-and-forget” tasks. In this blog post, we’ll share 5 key learnings from developing production-ready Celery tasks.
1. Short > long
As a rule of thumb, short tasks are better than long ones. The longer a task can take, the longer it can occupy a worker process and thus block potentially more important work waiting in the queue. See also considerations about deployments in the desired semantics and retry behaviour section below.
The Celery primitives provide a toolbox for splitting the work into smaller chunks. As a practical example, consider a use case in which data is collected from multiple sources and then some report is generated at the end. A naive approach could be to collect the data from each source in a sequence and finally create the report. All in a single task. A better approach would be to use
chord which would allow running the data collection in parallel and report generation after all the data collection has finished. All the individual pieces of work could be implemented as separate tasks. This means both shorter tasks and most likely significantly shorter execution time for the whole operation due to parallelism in data collection. Additionally, it would allow retrying pieces of work individually without affecting the rest of the workload. For example, it’d be wasted work if we’d need to fetch the data again from all the sources if fetching from the last source fails due to some intermittent issue. On the other hand, be aware of potential resource exhaustion while running work in parallel.
Consider configuring global task execution time limit and using task specific hard / soft limits when needed. For example, if the global time limit is 3 minutes but some specific task should never take more than 10 seconds, it’s a good practice to define 10 seconds limit for the task. It both protects the system from the “should never happen but happened anyway” situations and nicely documents the expectation about the time requirement of that specific task for other developers. If you need to do some cleanup when the time runs out, configuring a soft time limit could be helpful.
2. Idempotent & atomic
While implementing Celery tasks, keep idempotence in mind. Simply put, ideally, it should not matter (no harmful side effects) how many times a task is executed with the same arguments. The main motivation for idempotent tasks is that then the tasks can be safely configured to ack late. Acknowledging late (i.e. after task execution) ensures that the tasks are executed til completion at least once – also if a worker process crashes in the middle of execution of a task.
Atomicity is another important feature of production-ready Celery tasks. For example, if the database operations are not atomic, it’s just a matter of time until you find yourself debugging some race condition or data integration issue. Use transactions for database operations and/or guarantee atomicity by other means when needed.
3. Desired semantics & retry behaviour
While implementing a new (or modifying an existing) celery task, think about what can go south. If a task is calling some third-party API, a call will most certainly timeout or otherwise fail one day. What should be done when it happens? It depends on the use case.
For the majority of use cases, “at-least-once” semantics are preferred, especially if the tasks are idempotent. However, there are also use cases in which the desired semantics are “at-most-once” or even something like “now or never”.
Familiarise yourself with the built-in retry capabilities. For example, considering the third-party API call failure example, an effective retry strategy could be something like:
1 2 3 4 5 6 7 8
@celery.task( autoretry_for=(MyThirdPartyApiCallException,), retry_backoff=5, max_retries=7, retry_jitter=False, ) def my_task_which_calls_some_third_party_api(): ...
which would retry after 5, 10, 20, 40, 80, 160, and 320 seconds (up to 7 retries in total) in case of
retry_jitter=True in real world use cases in order to randomise the retry delays.
In addition to the things that can go sideways in the code logic, there are deployments. It’s extremely important to understand what happens during deployments. Ideally, the old workers should receive TERM signal and have enough time to finish whatever they were doing before stopping them more brutally (e.g. with KILL signal). How much is enough time? Ideally, it takes at least as long to execute as the longest task, which should execute at least once but doesn’t ack late. Otherwise there’s a risk of losing work during deployments.
For the “now or never” case, defining
expiration while sending a task can be helpful. As a practical example, consider a celery task which sends a push message to the end user about the order status. It doesn’t make sense to send “order will be delivered in 2 minutes” an hour too late.
As a final tip regarding semantics and retrying, read the Should I use retry or acks_late? section from the Celery docs. If you use
acks_late=True, you’ll most likely want to combine it with
reject_on_worker_lost=True. Additionally, note that
visibility_timeout (supported by Redis and SQS transports) affects in
4. Be careful with signature changes
If you have been involved in a project which uses Celery tasks with ETAs or countdowns, there’s a high probability that you are familiar with the signature change related issues.
In short, this is what can happen in practice:
There’s a changeset which changes the signature of a celery task (e.g. adds argument, changes argument names, moves celery task from a module to another, etc)
While the old code is still running, there are a number of said tasks scheduled with ETA / countdown
The changeset gets deployed: old workers are killed and fresh workers are started
A fresh worker fetches or starts executing an old ETA / countdown task from the queue
The Celery worker logs show something like
[ERROR/MainProcess] Received unregistered task of type old_module.my_task
[ERROR/ForkPoolWorker-1] Task my_task[<task_id>] raised unexpected: TypeError("my_task() missing 1 required positional argument: 'some_new_argument'")
depending on what kind of signature change was made.
Note that the same issue can also happen with tasks which are sent without ETA or countdown as it requires some serious effort to have a deployment setup which would deploy changes to both the Celery workers and the Celery client-side processes at the exact same time (or otherwise make sure that old vs new stays in sync). For example, if the client processes get deployed just before, they can send tasks which the old worker processes don’t understand. A similar phenomena can of course also happen the other way around.
If a task is moved from a module to another, one can add a “proxy task” to the old location which will make sure that the transition goes smoothly. Another way is to give explicit names for tasks and trigger them via
send_task. If you introduce a new argument for a task, provide a default value. If possible, build automated checks which help in spotting such issues. Even if there are great code review practices in place, human eyes are not perfect, no matter how experienced they are.
In conclusion, when you plan to make changes in the signature of a celery task, consider the backwards compatibility as if the task would be part of the public interface of the most downloaded Python package. That should guarantee food for thought.
5. Use multiple queues
By default there’s one queue which is called
celery. If you are using Celery for something more serious than a Saturday evening hobby project, you’ll most likely want to split the work into multiple queues.
For example, if there are some tasks which should execute close to realtime and tasks which are ok to be delayed for some minutes (or even hours), these deserve separate queues; there’s a risk of lower priority tasks keeping the workers otherwise busy, which in turn can result in not meeting the realtime requirements of the realtime tasks. Another benefit from the multi-queue setup is that it enables scaling the amount of workers for each queue independently. Note that there’s also the possibility to define priorities for tasks running in the same queue but even Celery documentation suggests to prefer multi-queue setup over task priorities in the real world use cases.
One can explicitly specify which queues are allowed:
1 2 3 4 5 6
# celeryconfig.py from kombu import Exchange, Queue _ALLOWED_QUEUES = ('hiprio', 'lowprio') task_create_missing_queues = False # default is True task_queues = tuple(Queue(name=q, exchange=Exchange(q), routing_key=q) for q in _ALLOWED_QUEUES)
This kind of configuration, for example, will raise
celery.exceptions.QueueNotFound in case a task is sent into a misspelled queue. Without such configuration, the Celery client-side logic could send tasks into a queue that doesn’t have a Celery worker consuming it, which would be potentially hard to spot in a deployed environment.
In a nutshell
Celery is a powerful beast. It requires notable effort to understand how to effectively utilise it in the use case at hand. Luckily, it’s well documented and mature.
Everything starts from understanding the real world use case: how to split the work, what are the desired semantics (per task), etc. As a final practical tip, make sure you have a proper local Celery playground which you’re confident to use for trying things out. For example, understanding how different retry strategies or primitives actually work, it’s often easiest to try them in a simple playground project before introducing them into a complex, full-fledged web app.