mirror of
https://github.com/HackSoftware/Django-Styleguide.git
synced 2024-11-25 02:53:45 +03:00
Improve Celery section
This commit is contained in:
parent
fb1850a84d
commit
6e9f0ac6f7
261
README.md
261
README.md
|
@ -62,12 +62,12 @@
|
||||||
- [Testing](#testing-2)
|
- [Testing](#testing-2)
|
||||||
* [Naming conventions](#naming-conventions)
|
* [Naming conventions](#naming-conventions)
|
||||||
- [Celery](#celery)
|
- [Celery](#celery)
|
||||||
|
* [The basics](#the-basics)
|
||||||
|
* [Error handling](#error-handling)
|
||||||
|
* [Configuration](#configuration)
|
||||||
* [Structure](#structure)
|
* [Structure](#structure)
|
||||||
+ [Configuration](#configuration)
|
|
||||||
+ [Tasks](#tasks)
|
|
||||||
+ [Circular imports between tasks & services](#circular-imports-between-tasks--services)
|
|
||||||
* [Periodic Tasks](#periodic-tasks)
|
* [Periodic Tasks](#periodic-tasks)
|
||||||
* [Configuration](#configuration-1)
|
* [Beyond](#beyond)
|
||||||
- [Cookbook](#cookbook)
|
- [Cookbook](#cookbook)
|
||||||
* [Handling updates with a service](#handling-updates-with-a-service)
|
* [Handling updates with a service](#handling-updates-with-a-service)
|
||||||
- [DX (Developer Experience)](#dx-developer-experience)
|
- [DX (Developer Experience)](#dx-developer-experience)
|
||||||
|
@ -2417,83 +2417,159 @@ We use [Celery](http://www.celeryproject.org/) for the following general cases:
|
||||||
* Offloading heavier computational tasks outside the HTTP cycle.
|
* Offloading heavier computational tasks outside the HTTP cycle.
|
||||||
* Periodic tasks (using Celery beat)
|
* Periodic tasks (using Celery beat)
|
||||||
|
|
||||||
|
### The basics
|
||||||
|
|
||||||
We try to treat Celery as if it's just another interface to our core logic - meaning - **don't put business logic there.**
|
We try to treat Celery as if it's just another interface to our core logic - meaning - **don't put business logic there.**
|
||||||
|
|
||||||
An example task might look like this:
|
Lets look at an example of a **service** that sends emails (example taken from `Django-Styleguide-Example`)
|
||||||
|
|
||||||
|
```python
|
||||||
|
@transaction.atomic
|
||||||
|
def email_send(email: Email) -> Email:
|
||||||
|
if email.status != Email.Status.SENDING:
|
||||||
|
raise ApplicationError(f"Cannot send non-ready emails. Current status is {email.status}")
|
||||||
|
|
||||||
|
subject = email.subject
|
||||||
|
from_email = "styleguide-example@hacksoft.io"
|
||||||
|
to = email.to
|
||||||
|
|
||||||
|
html = email.html
|
||||||
|
plain_text = email.plain_text
|
||||||
|
|
||||||
|
msg = EmailMultiAlternatives(subject, plain_text, from_email, [to])
|
||||||
|
msg.attach_alternative(html, "text/html")
|
||||||
|
|
||||||
|
msg.send()
|
||||||
|
|
||||||
|
email, _ = model_update(
|
||||||
|
instance=email,
|
||||||
|
fields=["status", "sent_at"],
|
||||||
|
data={
|
||||||
|
"status": Email.Status.SENT,
|
||||||
|
"sent_at": timezone.now()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return email
|
||||||
|
```
|
||||||
|
|
||||||
|
Email sending has business logic around it, but we still want to trigger this particular service from a task.
|
||||||
|
|
||||||
|
Our task would look like that:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from celery import shared_task
|
from celery import shared_task
|
||||||
|
|
||||||
from project.app.services import some_service_name as service
|
from styleguide_example.emails.models import Email
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def some_service_name(*args, **kwargs):
|
def email_send(email_id):
|
||||||
service(*args, **kwargs)
|
email = Email.objects.get(id=email_id)
|
||||||
|
|
||||||
|
from styleguide_example.emails.services import email_send
|
||||||
|
email_send(email)
|
||||||
```
|
```
|
||||||
This is a task, having the same name as a service, which holds the actual business logic.
|
|
||||||
|
|
||||||
**Of course, we can have more complex situations**, like a chain or chord of tasks, each of them doing different domain related logic. In that case, it's hard to isolate everything in a service, because we now have dependencies between the tasks.
|
As you can see, we treat the task as an API:
|
||||||
|
|
||||||
If that happens, we try to expose an interface to our domain & let the tasks work with that interface.
|
1. Fetch the required data.
|
||||||
|
2. Call the appropriate service.
|
||||||
|
|
||||||
One can argue that having an ORM object is an interface by itself, and that's true. Sometimes, you can just update your object from a task & that's OK.
|
Now, imagine we have a different service, that triggers the email sending.
|
||||||
|
|
||||||
But there are times where you need to be strict and don't let tasks do database calls straight from the ORM, but rather, via an exposed interface for that.
|
It may look like that:
|
||||||
|
|
||||||
**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.**
|
|
||||||
|
|
||||||
### Structure
|
|
||||||
|
|
||||||
#### Configuration
|
|
||||||
|
|
||||||
We put Celery configuration in a Django app called `tasks`. The [Celery config](https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html) itself is located in `apps.py`, in `TasksConfig.ready` method.
|
|
||||||
|
|
||||||
This Django app also holds any additional utilities, related to Celery.
|
|
||||||
|
|
||||||
Here's an example `project/tasks/apps.py` file:
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import os
|
from django.db import transaction
|
||||||
|
|
||||||
from celery import Celery
|
# ... more imports here ...
|
||||||
|
|
||||||
from django.apps import apps, AppConfig
|
from styleguide_example.emails.tasks import email_send as email_send_task
|
||||||
from django.conf import settings
|
|
||||||
|
|
||||||
|
|
||||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
|
@transaction.atomic
|
||||||
|
def user_complete_onboarding(user: User) -> User:
|
||||||
|
# ... some code here
|
||||||
|
|
||||||
|
email = email_get_onboarding_template(user=user)
|
||||||
|
|
||||||
app = Celery('project')
|
transaction.on_commit(lambda: email_send_task.delay(email.id))
|
||||||
|
|
||||||
|
return user
|
||||||
class TasksConfig(AppConfig):
|
|
||||||
name = 'project.tasks'
|
|
||||||
verbose_name = 'Celery Config'
|
|
||||||
|
|
||||||
def ready(self):
|
|
||||||
app.config_from_object('django.conf:settings', namespace="CELERY")
|
|
||||||
app.autodiscover_tasks()
|
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True)
|
|
||||||
def debug_task(self):
|
|
||||||
from celery.utils.log import base_logger
|
|
||||||
base_logger = base_logger
|
|
||||||
|
|
||||||
base_logger.debug('debug message')
|
|
||||||
base_logger.info('info message')
|
|
||||||
base_logger.warning('warning message')
|
|
||||||
base_logger.error('error message')
|
|
||||||
base_logger.critical('critical message')
|
|
||||||
|
|
||||||
print('Request: {0!r}'.format(self.request))
|
|
||||||
|
|
||||||
return 42
|
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Tasks
|
As you can see, we are importing the task (which has the same name as the service), but we are giving it the `_task` suffix.
|
||||||
|
|
||||||
|
And when the transaction commits, we'll call the task.
|
||||||
|
|
||||||
|
**So, in general, the way we use Celery can be described as:**
|
||||||
|
|
||||||
|
1. Tasks call services.
|
||||||
|
2. We import the service in the function body of the task.
|
||||||
|
3. When we want to trigger a task, we import the task, giving the `_task` suffix.
|
||||||
|
4. We execute tasks, as a side effect, whenever our transaction commits.
|
||||||
|
|
||||||
|
This way of mixing tasks & services also **prevents circular imports**, which may occurr often enough when using Celery.
|
||||||
|
|
||||||
|
### Error handling
|
||||||
|
|
||||||
|
Sometimes, our service can fail and we might want to handle the error on the task level. For example - we might want to retry the task again.
|
||||||
|
|
||||||
|
This error handling code needs to live in the task, while also following our rules, for calling the service layer, whenever we need to interact with the core of our application.
|
||||||
|
|
||||||
|
Lets expand the `email_send` task example from above, by adding error handling:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from celery import shared_task
|
||||||
|
from celery.utils.log import get_task_logger
|
||||||
|
|
||||||
|
from styleguide_example.emails.models import Email
|
||||||
|
|
||||||
|
|
||||||
|
logger = get_task_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _email_send_failure(self, exc, task_id, args, kwargs, einfo):
|
||||||
|
email_id = args[0]
|
||||||
|
email = Email.objects.get(id=email_id)
|
||||||
|
|
||||||
|
from styleguide_example.emails.services import email_failed
|
||||||
|
|
||||||
|
email_failed(email)
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(bind=True, on_failure=_email_send_failure)
|
||||||
|
def email_send(self, email_id):
|
||||||
|
email = Email.objects.get(id=email_id)
|
||||||
|
|
||||||
|
from styleguide_example.emails.services import email_send
|
||||||
|
|
||||||
|
try:
|
||||||
|
email_send(email)
|
||||||
|
except Exception as exc:
|
||||||
|
# https://docs.celeryq.dev/en/stable/userguide/tasks.html#retrying
|
||||||
|
logger.warning(f"Exception occurred while sending email: {exc}")
|
||||||
|
self.retry(exc=exc, countdown=5)
|
||||||
|
```
|
||||||
|
|
||||||
|
As you can see, we do a bunch of retries and if all of them fail, we handle this in the `on_failure` callback.
|
||||||
|
|
||||||
|
The callback follows the naming pattern of `_{task_name}_failuire` and it calls the service layer.
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
We pretty much follow the official guidelines of integrating Celery with Django - <https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html>
|
||||||
|
|
||||||
|
For a full example, you can check the Celery configuration in the `Django-Styleguide-Example` project:
|
||||||
|
|
||||||
|
- <https://github.com/HackSoftware/Django-Styleguide-Example/tree/master/styleguide_example/tasks>
|
||||||
|
- <https://github.com/HackSoftware/Django-Styleguide-Example/blob/master/styleguide_example/tasks/celery.py>
|
||||||
|
|
||||||
|
Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options.
|
||||||
|
|
||||||
|
We constantly do that & find new things or find better approaches to our problems.
|
||||||
|
|
||||||
|
### Structure
|
||||||
|
|
||||||
Tasks are located in `tasks.py` modules in different apps.
|
Tasks are located in `tasks.py` modules in different apps.
|
||||||
|
|
||||||
|
@ -2503,69 +2579,6 @@ Meaning, you can end up with `tasks/domain_a.py` and `tasks/domain_b.py`. All yo
|
||||||
|
|
||||||
The general rule of thumb is - split your tasks in a way that'll make sense to you.
|
The general rule of thumb is - split your tasks in a way that'll make sense to you.
|
||||||
|
|
||||||
#### Circular imports between tasks & services
|
|
||||||
|
|
||||||
In some cases, you need to invoke a task from a service or vice-versa:
|
|
||||||
|
|
||||||
```python
|
|
||||||
# project/app/services.py
|
|
||||||
|
|
||||||
from project.app.tasks import task_function_1
|
|
||||||
|
|
||||||
|
|
||||||
def service_function_1():
|
|
||||||
print('I delay a task!')
|
|
||||||
task_function_1.delay()
|
|
||||||
|
|
||||||
|
|
||||||
def service_function_2():
|
|
||||||
print('I do not delay a task!')
|
|
||||||
```
|
|
||||||
|
|
||||||
```python
|
|
||||||
# project/app/tasks.py
|
|
||||||
|
|
||||||
from celery import shared_task
|
|
||||||
|
|
||||||
from project.app.services import service_function_2
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
|
||||||
def task_function_1():
|
|
||||||
print('I do not call a service!')
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
|
||||||
def task_function_2():
|
|
||||||
print('I call a service!')
|
|
||||||
service_function_2()
|
|
||||||
```
|
|
||||||
|
|
||||||
Unfortunately, this will result in a circular import.
|
|
||||||
|
|
||||||
What we usually do is we import the service function **inside** the task function:
|
|
||||||
|
|
||||||
```python
|
|
||||||
# project/app/tasks.py
|
|
||||||
|
|
||||||
from celery import shared_task
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
|
||||||
def task_function_1():
|
|
||||||
print('I do not call a service!')
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
|
||||||
def task_function_2():
|
|
||||||
from project.app.services import service_function_2 # <--
|
|
||||||
|
|
||||||
print('I call a service!')
|
|
||||||
service_function_2()
|
|
||||||
```
|
|
||||||
|
|
||||||
* Note: Depending on the case, you may want to import the task function **inside** the service function. This is OK and will still prevent the circular import between service & task functions.
|
|
||||||
|
|
||||||
### Periodic Tasks
|
### Periodic Tasks
|
||||||
|
|
||||||
Managing periodic tasks is quite important, especially when you have tens or hundreds of them.
|
Managing periodic tasks is quite important, especially when you have tens or hundreds of them.
|
||||||
|
@ -2640,11 +2653,15 @@ Few key things:
|
||||||
* Everything is in one place.
|
* Everything is in one place.
|
||||||
* ⚠️ We use, almost exclusively, a cron schedule. **If you plan on using the other schedule objects, provided by Celery, please read thru their documentation** & the important notes - <https://django-celery-beat.readthedocs.io/en/latest/#example-creating-interval-based-periodic-task> - about pointing to the same schedule object. ⚠️
|
* ⚠️ We use, almost exclusively, a cron schedule. **If you plan on using the other schedule objects, provided by Celery, please read thru their documentation** & the important notes - <https://django-celery-beat.readthedocs.io/en/latest/#example-creating-interval-based-periodic-task> - about pointing to the same schedule object. ⚠️
|
||||||
|
|
||||||
### Configuration
|
### Beyond
|
||||||
|
|
||||||
Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options.
|
Celery has powerful tools to implement complex workflows ( <https://docs.celeryq.dev/en/stable/userguide/canvas.html> ).
|
||||||
|
|
||||||
We constantly do that & find new things or find better approaches to our problems.
|
If you decide to use them, the rules still apply.
|
||||||
|
|
||||||
|
You may need to reorganize things a bit, but as long as you have a well-defined interface to your application core, you'll be able to mix and match tasks & services in more complex scenarios.
|
||||||
|
|
||||||
|
**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.**
|
||||||
|
|
||||||
## Cookbook
|
## Cookbook
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user