diff --git a/README.md b/README.md index 39b12d1..1975db0 100644 --- a/README.md +++ b/README.md @@ -62,12 +62,12 @@ - [Testing](#testing-2) * [Naming conventions](#naming-conventions) - [Celery](#celery) + * [The basics](#the-basics) + * [Error handling](#error-handling) + * [Configuration](#configuration) * [Structure](#structure) - + [Configuration](#configuration) - + [Tasks](#tasks) - + [Circular imports between tasks & services](#circular-imports-between-tasks--services) * [Periodic Tasks](#periodic-tasks) - * [Configuration](#configuration-1) + * [Beyond](#beyond) - [Cookbook](#cookbook) * [Handling updates with a service](#handling-updates-with-a-service) - [DX (Developer Experience)](#dx-developer-experience) @@ -2417,83 +2417,168 @@ We use [Celery](http://www.celeryproject.org/) for the following general cases: * Offloading heavier computational tasks outside the HTTP cycle. * Periodic tasks (using Celery beat) +### The basics + We try to treat Celery as if it's just another interface to our core logic - meaning - **don't put business logic there.** -An example task might look like this: +Lets look at an example of a **service** that sends emails (example taken from [`Django-Styleguide-Example`](https://github.com/HackSoftware/Django-Styleguide)) + +```python +from django.db import transaction +from django.core.mail import EmailMultiAlternatives + +from styleguide_example.core.exceptions import ApplicationError +from styleguide_example.common.services import model_update +from styleguide_example.emails.models import Email + + +@transaction.atomic +def email_send(email: Email) -> Email: + if email.status != Email.Status.SENDING: + raise ApplicationError(f"Cannot send non-ready emails. Current status is {email.status}") + + subject = email.subject + from_email = "styleguide-example@hacksoft.io" + to = email.to + + html = email.html + plain_text = email.plain_text + + msg = EmailMultiAlternatives(subject, plain_text, from_email, [to]) + msg.attach_alternative(html, "text/html") + + msg.send() + + email, _ = model_update( + instance=email, + fields=["status", "sent_at"], + data={ + "status": Email.Status.SENT, + "sent_at": timezone.now() + } + ) + return email +``` + +Email sending has business logic around it, **but we still want to trigger this particular service from a task.** + +Our task looks like that: ```python from celery import shared_task -from project.app.services import some_service_name as service +from styleguide_example.emails.models import Email @shared_task -def some_service_name(*args, **kwargs): - service(*args, **kwargs) +def email_send(email_id): + email = Email.objects.get(id=email_id) + + from styleguide_example.emails.services import email_send + email_send(email) ``` -This is a task, having the same name as a service, which holds the actual business logic. -**Of course, we can have more complex situations**, like a chain or chord of tasks, each of them doing different domain related logic. In that case, it's hard to isolate everything in a service, because we now have dependencies between the tasks. +As you can see, **we treat the task as an API:** -If that happens, we try to expose an interface to our domain & let the tasks work with that interface. +1. Fetch the required data. +2. Call the appropriate service. -One can argue that having an ORM object is an interface by itself, and that's true. Sometimes, you can just update your object from a task & that's OK. +Now, imagine we have a different service, that triggers the email sending. -But there are times where you need to be strict and don't let tasks do database calls straight from the ORM, but rather, via an exposed interface for that. - -**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.** - -### Structure - -#### Configuration - -We put Celery configuration in a Django app called `tasks`. The [Celery config](https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html) itself is located in `apps.py`, in `TasksConfig.ready` method. - -This Django app also holds any additional utilities, related to Celery. - -Here's an example `project/tasks/apps.py` file: +It may look like that: ```python -import os +from django.db import transaction -from celery import Celery +# ... more imports here ... -from django.apps import apps, AppConfig -from django.conf import settings +from styleguide_example.emails.tasks import email_send as email_send_task -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') +@transaction.atomic +def user_complete_onboarding(user: User) -> User: + # ... some code here + email = email_get_onboarding_template(user=user) -app = Celery('project') + transaction.on_commit(lambda: email_send_task.delay(email.id)) - -class TasksConfig(AppConfig): - name = 'project.tasks' - verbose_name = 'Celery Config' - - def ready(self): - app.config_from_object('django.conf:settings', namespace="CELERY") - app.autodiscover_tasks() - - -@app.task(bind=True) -def debug_task(self): - from celery.utils.log import base_logger - base_logger = base_logger - - base_logger.debug('debug message') - base_logger.info('info message') - base_logger.warning('warning message') - base_logger.error('error message') - base_logger.critical('critical message') - - print('Request: {0!r}'.format(self.request)) - - return 42 + return user ``` -#### Tasks +2 important things to point out here: + +1. We are importing the task (which has the same name as the service), but we are giving it a `_task` suffix. +1. And when the transaction commits, we'll call the task. + +**So, in general, the way we use Celery can be described as:** + +1. Tasks call services. +2. We import the service in the function body of the task. +3. When we want to trigger a task, we import the task, at module level, giving the `_task` suffix. +4. We execute tasks, as a side effect, whenever our transaction commits. + +This way of mixing tasks & services also **prevents circular imports**, which may occurr often enough when using Celery. + +### Error handling + +Sometimes, our service can fail and we might want to handle the error on the task level. For example - we might want to retry the task. + +This error handling code needs to live in the task. + +Lets expand the `email_send` task example from above, by adding error handling: + +```python +from celery import shared_task +from celery.utils.log import get_task_logger + +from styleguide_example.emails.models import Email + + +logger = get_task_logger(__name__) + + +def _email_send_failure(self, exc, task_id, args, kwargs, einfo): + email_id = args[0] + email = Email.objects.get(id=email_id) + + from styleguide_example.emails.services import email_failed + + email_failed(email) + + +@shared_task(bind=True, on_failure=_email_send_failure) +def email_send(self, email_id): + email = Email.objects.get(id=email_id) + + from styleguide_example.emails.services import email_send + + try: + email_send(email) + except Exception as exc: + # https://docs.celeryq.dev/en/stable/userguide/tasks.html#retrying + logger.warning(f"Exception occurred while sending email: {exc}") + self.retry(exc=exc, countdown=5) +``` + +As you can see, we do a bunch of retries and if all of them fail, we handle this in the `on_failure` callback. + +The callback follows the naming pattern of `_{task_name}_failuire` and it calls the service layer, just like an ordinary task. + +### Configuration + +We pretty much follow the official guidelines of integrating Celery with Django - + +For a full example, you can check the Celery configuration in the `Django-Styleguide-Example` project: + +- +- + +Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options. + +We constantly do that & find new things or find better approaches to our problems. + +### Structure Tasks are located in `tasks.py` modules in different apps. @@ -2503,76 +2588,13 @@ Meaning, you can end up with `tasks/domain_a.py` and `tasks/domain_b.py`. All yo The general rule of thumb is - split your tasks in a way that'll make sense to you. -#### Circular imports between tasks & services - -In some cases, you need to invoke a task from a service or vice-versa: - -```python -# project/app/services.py - -from project.app.tasks import task_function_1 - - -def service_function_1(): - print('I delay a task!') - task_function_1.delay() - - -def service_function_2(): - print('I do not delay a task!') -``` - -```python -# project/app/tasks.py - -from celery import shared_task - -from project.app.services import service_function_2 - - -@shared_task -def task_function_1(): - print('I do not call a service!') - - -@shared_task -def task_function_2(): - print('I call a service!') - service_function_2() -``` - -Unfortunately, this will result in a circular import. - -What we usually do is we import the service function **inside** the task function: - -```python -# project/app/tasks.py - -from celery import shared_task - - -@shared_task -def task_function_1(): - print('I do not call a service!') - - -@shared_task -def task_function_2(): - from project.app.services import service_function_2 # <-- - - print('I call a service!') - service_function_2() -``` - -* Note: Depending on the case, you may want to import the task function **inside** the service function. This is OK and will still prevent the circular import between service & task functions. - ### Periodic Tasks Managing periodic tasks is quite important, especially when you have tens or hundreds of them. We use [Celery Beat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) + `django_celery_beat.schedulers:DatabaseScheduler` + [`django-celery-beat`](https://github.com/celery/django-celery-beat) for our periodic tasks. -The extra thing that we do is to have a management command, called `setup_periodic_tasks`, which holds the definition of all periodic tasks within the system. This command is located in the `tasks` app, discussed above. +The extra thing that we do is to have a management command, called [`setup_periodic_tasks`](https://github.com/HackSoftware/Django-Styleguide-Example/blob/master/styleguide_example/tasks/management/commands/setup_periodic_tasks.py), which holds the definition of all periodic tasks within the system. This command is located in the `tasks` app, discussed above. Here's how `project.tasks.management.commands.setup_periodic_tasks.py` looks like: @@ -2640,11 +2662,15 @@ Few key things: * Everything is in one place. * ⚠️ We use, almost exclusively, a cron schedule. **If you plan on using the other schedule objects, provided by Celery, please read thru their documentation** & the important notes - - about pointing to the same schedule object. ⚠️ -### Configuration +### Beyond -Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options. +Celery has powerful tools to implement complex workflows - -We constantly do that & find new things or find better approaches to our problems. +If you decide to use them, the rules still apply. + +You may need to reorganize things a bit, but as long as you have a well-defined interface to your application core, you'll be able to mix and match tasks & services in more complex scenarios. + +**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.** ## Cookbook