browniebroke.com

Making Celery work nicely with Django transactions

June 25, 2019 • 5 min read
Edit on Github

I’ve been using Celery in my Django projects for about 5 years now. Along the way, I made some mistakes, learned from them and picked up a few best practices. I will try to share a few of them here.

Avoid serialising complex objects

This is a pretty simple best practice, but was easy to miss as the default serialization was pickle in before v4 of Celery, the default is now JSON and enforces this. Failing that may cause hard to catch issues like when the pickle protocol changes. This happened when we migrated from Python 2 to 3 for instance. It’s pretty much impossible to catch by your test suite, but will happen for sure in production.

Worker throwing ModelInstanceDoesNotExist

In a Django project, a complex object is very likely to be a model instance, that’s where your data lives after all. The usual way to work with the previous best practice is to pass the id of a model instance when submitting the task and fetch the instance again from DB when handling the task on the worker.

# Send task as
your_task.delay(user_id=user.id)

# and run as
@app.task()
def your_task(user_id):
    user = User.objects.get(
        id=user_id
    )
    ...

However, when doing this in a naive way, this can lead to exceptions due to missing instances. It took me a while to get this one the first time it happened to me, especially given that the instance with the offending id was always there when checking manually, and there was no issues in my unit tests.

Experienced readers probably know the answer: transactions. Basically, the task is picked up by the worker before the main transaction is committed in the database. We need to delay task submission until the transaction is committed. Luckily, Django provides a hook just for that type of things: transaction.on_commit(). This is well documented in the Celery and the since 1.10 even Django uses Celery as an example in their documentation.

Problems with this solution

This hook is great, but causes a bit of boilerplate. Each time you want to use that, you need to define a lambda when you call the task. It can get quite repetitive when you start to have many tasks. Not only its longer to write and hurts readability, it may also cause some weird bugs depending on how it’s used. Here is a very simplified example:

# tasks.py
@app.task()
def print_value(val):
    print(val)


# views.py
def my_view(request):
    for index in range(5):
        transaction.on_commit(
            lambda: print_value.delay(index)
        )

What would this print on the worker process? The answer is 5 times the number ‘4’. If you found it, well done! It’s a documented behaviour of lambdas, but in a more realistic code example with longer functions and more complex parameters, it would easily be missed.

A better API

One idea that may come to mind is to wrap this in a decorator. This was suggested as an improvement to Django but was rejected after some discussion. I agree with the counter points: a decorator would hurt the readability of the code calling the decorated function.

What if we could wrap this boilerplate into a helper function to handle this nicely? Or better, attach this helper to our tasks? Ideally, we want to be able to call our tasks like this:

print_value.delay_on_commit(5)

That’s what I’m talking about. A simple API that is easy to use, and clear to read. This also addresses the drawbacks of the previous decorator idea. The good news is that Celery provides a proper way to extend the default behaviour making it not very hard to implement.

Under the hood, the @task decorator run your task body in the run() method of the Celery base task class. This class is where the delay() callable comes from. It’s possible to override the base class either when defining the task function or more globally, when defining the Celery app:

from celery import Celery


app = Celery(
    'tasks',
    task_cls='your_task:BetterTask',
)

And your base task would look like this:

from celery import Task
from django.db import transaction


class BetterTask(Task):

    def delay_on_commit(self, *args, **kwargs):
        return transaction.on_commit(
            lambda: self.delay(*args, **kwargs)
        )

And that’s all you need! Now you have a nicer API, less error prone, you just need to remember to use it. How would you use it? If we modify our previous view, we would get the following:


def my_view(request):
    for index in range(5):
        print_value.delay_on_commit(index)

If we run it, we can confirm that it works now as intended, printing the numbers 1 to 5. Nice!

Closing words

As Django developers, we are generally less used to the asynchronous way of thinking than in other programming languages, as Django does not requires us to think about that (yet). On the other hand, Celery is asynchronous by its very nature, and it’s easy to cause some unusual bugs in your application, if not careful. By adding a wrapper around a common pattern, to expose a better API, we can make our code easier to write, read and avoid bugs.

Liked it? Please share it!

© 2024, Built with Gatsby