From 6e9f0ac6f7d602500e4d78e798a0ec2355990e2a Mon Sep 17 00:00:00 2001 From: Radoslav Georgiev Date: Tue, 17 May 2022 09:44:13 +0300 Subject: [PATCH 1/2] Improve Celery section --- README.md | 261 +++++++++++++++++++++++++++++------------------------- 1 file changed, 139 insertions(+), 122 deletions(-) diff --git a/README.md b/README.md index 39b12d1..01124d6 100644 --- a/README.md +++ b/README.md @@ -62,12 +62,12 @@ - [Testing](#testing-2) * [Naming conventions](#naming-conventions) - [Celery](#celery) + * [The basics](#the-basics) + * [Error handling](#error-handling) + * [Configuration](#configuration) * [Structure](#structure) - + [Configuration](#configuration) - + [Tasks](#tasks) - + [Circular imports between tasks & services](#circular-imports-between-tasks--services) * [Periodic Tasks](#periodic-tasks) - * [Configuration](#configuration-1) + * [Beyond](#beyond) - [Cookbook](#cookbook) * [Handling updates with a service](#handling-updates-with-a-service) - [DX (Developer Experience)](#dx-developer-experience) @@ -2417,83 +2417,159 @@ We use [Celery](http://www.celeryproject.org/) for the following general cases: * Offloading heavier computational tasks outside the HTTP cycle. * Periodic tasks (using Celery beat) +### The basics + We try to treat Celery as if it's just another interface to our core logic - meaning - **don't put business logic there.** -An example task might look like this: +Lets look at an example of a **service** that sends emails (example taken from `Django-Styleguide-Example`) + +```python +@transaction.atomic +def email_send(email: Email) -> Email: + if email.status != Email.Status.SENDING: + raise ApplicationError(f"Cannot send non-ready emails. Current status is {email.status}") + + subject = email.subject + from_email = "styleguide-example@hacksoft.io" + to = email.to + + html = email.html + plain_text = email.plain_text + + msg = EmailMultiAlternatives(subject, plain_text, from_email, [to]) + msg.attach_alternative(html, "text/html") + + msg.send() + + email, _ = model_update( + instance=email, + fields=["status", "sent_at"], + data={ + "status": Email.Status.SENT, + "sent_at": timezone.now() + } + ) + return email +``` + +Email sending has business logic around it, but we still want to trigger this particular service from a task. + +Our task would look like that: ```python from celery import shared_task -from project.app.services import some_service_name as service +from styleguide_example.emails.models import Email @shared_task -def some_service_name(*args, **kwargs): - service(*args, **kwargs) +def email_send(email_id): + email = Email.objects.get(id=email_id) + + from styleguide_example.emails.services import email_send + email_send(email) ``` -This is a task, having the same name as a service, which holds the actual business logic. -**Of course, we can have more complex situations**, like a chain or chord of tasks, each of them doing different domain related logic. In that case, it's hard to isolate everything in a service, because we now have dependencies between the tasks. +As you can see, we treat the task as an API: -If that happens, we try to expose an interface to our domain & let the tasks work with that interface. +1. Fetch the required data. +2. Call the appropriate service. -One can argue that having an ORM object is an interface by itself, and that's true. Sometimes, you can just update your object from a task & that's OK. +Now, imagine we have a different service, that triggers the email sending. -But there are times where you need to be strict and don't let tasks do database calls straight from the ORM, but rather, via an exposed interface for that. - -**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.** - -### Structure - -#### Configuration - -We put Celery configuration in a Django app called `tasks`. The [Celery config](https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html) itself is located in `apps.py`, in `TasksConfig.ready` method. - -This Django app also holds any additional utilities, related to Celery. - -Here's an example `project/tasks/apps.py` file: +It may look like that: ```python -import os +from django.db import transaction -from celery import Celery +# ... more imports here ... -from django.apps import apps, AppConfig -from django.conf import settings +from styleguide_example.emails.tasks import email_send as email_send_task -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') +@transaction.atomic +def user_complete_onboarding(user: User) -> User: + # ... some code here + email = email_get_onboarding_template(user=user) -app = Celery('project') + transaction.on_commit(lambda: email_send_task.delay(email.id)) - -class TasksConfig(AppConfig): - name = 'project.tasks' - verbose_name = 'Celery Config' - - def ready(self): - app.config_from_object('django.conf:settings', namespace="CELERY") - app.autodiscover_tasks() - - -@app.task(bind=True) -def debug_task(self): - from celery.utils.log import base_logger - base_logger = base_logger - - base_logger.debug('debug message') - base_logger.info('info message') - base_logger.warning('warning message') - base_logger.error('error message') - base_logger.critical('critical message') - - print('Request: {0!r}'.format(self.request)) - - return 42 + return user ``` -#### Tasks +As you can see, we are importing the task (which has the same name as the service), but we are giving it the `_task` suffix. + +And when the transaction commits, we'll call the task. + +**So, in general, the way we use Celery can be described as:** + +1. Tasks call services. +2. We import the service in the function body of the task. +3. When we want to trigger a task, we import the task, giving the `_task` suffix. +4. We execute tasks, as a side effect, whenever our transaction commits. + +This way of mixing tasks & services also **prevents circular imports**, which may occurr often enough when using Celery. + +### Error handling + +Sometimes, our service can fail and we might want to handle the error on the task level. For example - we might want to retry the task again. + +This error handling code needs to live in the task, while also following our rules, for calling the service layer, whenever we need to interact with the core of our application. + +Lets expand the `email_send` task example from above, by adding error handling: + +```python +from celery import shared_task +from celery.utils.log import get_task_logger + +from styleguide_example.emails.models import Email + + +logger = get_task_logger(__name__) + + +def _email_send_failure(self, exc, task_id, args, kwargs, einfo): + email_id = args[0] + email = Email.objects.get(id=email_id) + + from styleguide_example.emails.services import email_failed + + email_failed(email) + + +@shared_task(bind=True, on_failure=_email_send_failure) +def email_send(self, email_id): + email = Email.objects.get(id=email_id) + + from styleguide_example.emails.services import email_send + + try: + email_send(email) + except Exception as exc: + # https://docs.celeryq.dev/en/stable/userguide/tasks.html#retrying + logger.warning(f"Exception occurred while sending email: {exc}") + self.retry(exc=exc, countdown=5) +``` + +As you can see, we do a bunch of retries and if all of them fail, we handle this in the `on_failure` callback. + +The callback follows the naming pattern of `_{task_name}_failuire` and it calls the service layer. + +### Configuration + +We pretty much follow the official guidelines of integrating Celery with Django - + +For a full example, you can check the Celery configuration in the `Django-Styleguide-Example` project: + +- +- + +Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options. + +We constantly do that & find new things or find better approaches to our problems. + +### Structure Tasks are located in `tasks.py` modules in different apps. @@ -2503,69 +2579,6 @@ Meaning, you can end up with `tasks/domain_a.py` and `tasks/domain_b.py`. All yo The general rule of thumb is - split your tasks in a way that'll make sense to you. -#### Circular imports between tasks & services - -In some cases, you need to invoke a task from a service or vice-versa: - -```python -# project/app/services.py - -from project.app.tasks import task_function_1 - - -def service_function_1(): - print('I delay a task!') - task_function_1.delay() - - -def service_function_2(): - print('I do not delay a task!') -``` - -```python -# project/app/tasks.py - -from celery import shared_task - -from project.app.services import service_function_2 - - -@shared_task -def task_function_1(): - print('I do not call a service!') - - -@shared_task -def task_function_2(): - print('I call a service!') - service_function_2() -``` - -Unfortunately, this will result in a circular import. - -What we usually do is we import the service function **inside** the task function: - -```python -# project/app/tasks.py - -from celery import shared_task - - -@shared_task -def task_function_1(): - print('I do not call a service!') - - -@shared_task -def task_function_2(): - from project.app.services import service_function_2 # <-- - - print('I call a service!') - service_function_2() -``` - -* Note: Depending on the case, you may want to import the task function **inside** the service function. This is OK and will still prevent the circular import between service & task functions. - ### Periodic Tasks Managing periodic tasks is quite important, especially when you have tens or hundreds of them. @@ -2640,11 +2653,15 @@ Few key things: * Everything is in one place. * ⚠️ We use, almost exclusively, a cron schedule. **If you plan on using the other schedule objects, provided by Celery, please read thru their documentation** & the important notes - - about pointing to the same schedule object. ⚠️ -### Configuration +### Beyond -Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options. +Celery has powerful tools to implement complex workflows ( ). -We constantly do that & find new things or find better approaches to our problems. +If you decide to use them, the rules still apply. + +You may need to reorganize things a bit, but as long as you have a well-defined interface to your application core, you'll be able to mix and match tasks & services in more complex scenarios. + +**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.** ## Cookbook From d12d6004936b442344ba5063db810d4d93795f51 Mon Sep 17 00:00:00 2001 From: Radoslav Georgiev Date: Tue, 17 May 2022 09:54:05 +0300 Subject: [PATCH 2/2] A bit of rewording --- README.md | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 01124d6..1975db0 100644 --- a/README.md +++ b/README.md @@ -2421,9 +2421,17 @@ We use [Celery](http://www.celeryproject.org/) for the following general cases: We try to treat Celery as if it's just another interface to our core logic - meaning - **don't put business logic there.** -Lets look at an example of a **service** that sends emails (example taken from `Django-Styleguide-Example`) +Lets look at an example of a **service** that sends emails (example taken from [`Django-Styleguide-Example`](https://github.com/HackSoftware/Django-Styleguide)) ```python +from django.db import transaction +from django.core.mail import EmailMultiAlternatives + +from styleguide_example.core.exceptions import ApplicationError +from styleguide_example.common.services import model_update +from styleguide_example.emails.models import Email + + @transaction.atomic def email_send(email: Email) -> Email: if email.status != Email.Status.SENDING: @@ -2452,9 +2460,9 @@ def email_send(email: Email) -> Email: return email ``` -Email sending has business logic around it, but we still want to trigger this particular service from a task. +Email sending has business logic around it, **but we still want to trigger this particular service from a task.** -Our task would look like that: +Our task looks like that: ```python from celery import shared_task @@ -2470,7 +2478,7 @@ def email_send(email_id): email_send(email) ``` -As you can see, we treat the task as an API: +As you can see, **we treat the task as an API:** 1. Fetch the required data. 2. Call the appropriate service. @@ -2498,24 +2506,25 @@ def user_complete_onboarding(user: User) -> User: return user ``` -As you can see, we are importing the task (which has the same name as the service), but we are giving it the `_task` suffix. +2 important things to point out here: -And when the transaction commits, we'll call the task. +1. We are importing the task (which has the same name as the service), but we are giving it a `_task` suffix. +1. And when the transaction commits, we'll call the task. **So, in general, the way we use Celery can be described as:** 1. Tasks call services. 2. We import the service in the function body of the task. -3. When we want to trigger a task, we import the task, giving the `_task` suffix. +3. When we want to trigger a task, we import the task, at module level, giving the `_task` suffix. 4. We execute tasks, as a side effect, whenever our transaction commits. This way of mixing tasks & services also **prevents circular imports**, which may occurr often enough when using Celery. ### Error handling -Sometimes, our service can fail and we might want to handle the error on the task level. For example - we might want to retry the task again. +Sometimes, our service can fail and we might want to handle the error on the task level. For example - we might want to retry the task. -This error handling code needs to live in the task, while also following our rules, for calling the service layer, whenever we need to interact with the core of our application. +This error handling code needs to live in the task. Lets expand the `email_send` task example from above, by adding error handling: @@ -2554,7 +2563,7 @@ def email_send(self, email_id): As you can see, we do a bunch of retries and if all of them fail, we handle this in the `on_failure` callback. -The callback follows the naming pattern of `_{task_name}_failuire` and it calls the service layer. +The callback follows the naming pattern of `_{task_name}_failuire` and it calls the service layer, just like an ordinary task. ### Configuration @@ -2585,7 +2594,7 @@ Managing periodic tasks is quite important, especially when you have tens or hun We use [Celery Beat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) + `django_celery_beat.schedulers:DatabaseScheduler` + [`django-celery-beat`](https://github.com/celery/django-celery-beat) for our periodic tasks. -The extra thing that we do is to have a management command, called `setup_periodic_tasks`, which holds the definition of all periodic tasks within the system. This command is located in the `tasks` app, discussed above. +The extra thing that we do is to have a management command, called [`setup_periodic_tasks`](https://github.com/HackSoftware/Django-Styleguide-Example/blob/master/styleguide_example/tasks/management/commands/setup_periodic_tasks.py), which holds the definition of all periodic tasks within the system. This command is located in the `tasks` app, discussed above. Here's how `project.tasks.management.commands.setup_periodic_tasks.py` looks like: @@ -2655,7 +2664,7 @@ Few key things: ### Beyond -Celery has powerful tools to implement complex workflows ( ). +Celery has powerful tools to implement complex workflows - If you decide to use them, the rules still apply.