mirror of
https://github.com/HackSoftware/Django-Styleguide.git
synced 2024-11-25 11:03:46 +03:00
Merge pull request #100 from HackSoftware/celery/improvements
Improve Celery section
This commit is contained in:
commit
ec20b3f0f7
272
README.md
272
README.md
|
@ -62,12 +62,12 @@
|
|||
- [Testing](#testing-2)
|
||||
* [Naming conventions](#naming-conventions)
|
||||
- [Celery](#celery)
|
||||
* [The basics](#the-basics)
|
||||
* [Error handling](#error-handling)
|
||||
* [Configuration](#configuration)
|
||||
* [Structure](#structure)
|
||||
+ [Configuration](#configuration)
|
||||
+ [Tasks](#tasks)
|
||||
+ [Circular imports between tasks & services](#circular-imports-between-tasks--services)
|
||||
* [Periodic Tasks](#periodic-tasks)
|
||||
* [Configuration](#configuration-1)
|
||||
* [Beyond](#beyond)
|
||||
- [Cookbook](#cookbook)
|
||||
* [Handling updates with a service](#handling-updates-with-a-service)
|
||||
- [DX (Developer Experience)](#dx-developer-experience)
|
||||
|
@ -2417,83 +2417,168 @@ We use [Celery](http://www.celeryproject.org/) for the following general cases:
|
|||
* Offloading heavier computational tasks outside the HTTP cycle.
|
||||
* Periodic tasks (using Celery beat)
|
||||
|
||||
### The basics
|
||||
|
||||
We try to treat Celery as if it's just another interface to our core logic - meaning - **don't put business logic there.**
|
||||
|
||||
An example task might look like this:
|
||||
Lets look at an example of a **service** that sends emails (example taken from [`Django-Styleguide-Example`](https://github.com/HackSoftware/Django-Styleguide))
|
||||
|
||||
```python
|
||||
from django.db import transaction
|
||||
from django.core.mail import EmailMultiAlternatives
|
||||
|
||||
from styleguide_example.core.exceptions import ApplicationError
|
||||
from styleguide_example.common.services import model_update
|
||||
from styleguide_example.emails.models import Email
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def email_send(email: Email) -> Email:
|
||||
if email.status != Email.Status.SENDING:
|
||||
raise ApplicationError(f"Cannot send non-ready emails. Current status is {email.status}")
|
||||
|
||||
subject = email.subject
|
||||
from_email = "styleguide-example@hacksoft.io"
|
||||
to = email.to
|
||||
|
||||
html = email.html
|
||||
plain_text = email.plain_text
|
||||
|
||||
msg = EmailMultiAlternatives(subject, plain_text, from_email, [to])
|
||||
msg.attach_alternative(html, "text/html")
|
||||
|
||||
msg.send()
|
||||
|
||||
email, _ = model_update(
|
||||
instance=email,
|
||||
fields=["status", "sent_at"],
|
||||
data={
|
||||
"status": Email.Status.SENT,
|
||||
"sent_at": timezone.now()
|
||||
}
|
||||
)
|
||||
return email
|
||||
```
|
||||
|
||||
Email sending has business logic around it, **but we still want to trigger this particular service from a task.**
|
||||
|
||||
Our task looks like that:
|
||||
|
||||
```python
|
||||
from celery import shared_task
|
||||
|
||||
from project.app.services import some_service_name as service
|
||||
from styleguide_example.emails.models import Email
|
||||
|
||||
|
||||
@shared_task
|
||||
def some_service_name(*args, **kwargs):
|
||||
service(*args, **kwargs)
|
||||
def email_send(email_id):
|
||||
email = Email.objects.get(id=email_id)
|
||||
|
||||
from styleguide_example.emails.services import email_send
|
||||
email_send(email)
|
||||
```
|
||||
This is a task, having the same name as a service, which holds the actual business logic.
|
||||
|
||||
**Of course, we can have more complex situations**, like a chain or chord of tasks, each of them doing different domain related logic. In that case, it's hard to isolate everything in a service, because we now have dependencies between the tasks.
|
||||
As you can see, **we treat the task as an API:**
|
||||
|
||||
If that happens, we try to expose an interface to our domain & let the tasks work with that interface.
|
||||
1. Fetch the required data.
|
||||
2. Call the appropriate service.
|
||||
|
||||
One can argue that having an ORM object is an interface by itself, and that's true. Sometimes, you can just update your object from a task & that's OK.
|
||||
Now, imagine we have a different service, that triggers the email sending.
|
||||
|
||||
But there are times where you need to be strict and don't let tasks do database calls straight from the ORM, but rather, via an exposed interface for that.
|
||||
|
||||
**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.**
|
||||
|
||||
### Structure
|
||||
|
||||
#### Configuration
|
||||
|
||||
We put Celery configuration in a Django app called `tasks`. The [Celery config](https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html) itself is located in `apps.py`, in `TasksConfig.ready` method.
|
||||
|
||||
This Django app also holds any additional utilities, related to Celery.
|
||||
|
||||
Here's an example `project/tasks/apps.py` file:
|
||||
It may look like that:
|
||||
|
||||
```python
|
||||
import os
|
||||
from django.db import transaction
|
||||
|
||||
from celery import Celery
|
||||
# ... more imports here ...
|
||||
|
||||
from django.apps import apps, AppConfig
|
||||
from django.conf import settings
|
||||
from styleguide_example.emails.tasks import email_send as email_send_task
|
||||
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
|
||||
@transaction.atomic
|
||||
def user_complete_onboarding(user: User) -> User:
|
||||
# ... some code here
|
||||
|
||||
email = email_get_onboarding_template(user=user)
|
||||
|
||||
app = Celery('project')
|
||||
transaction.on_commit(lambda: email_send_task.delay(email.id))
|
||||
|
||||
|
||||
class TasksConfig(AppConfig):
|
||||
name = 'project.tasks'
|
||||
verbose_name = 'Celery Config'
|
||||
|
||||
def ready(self):
|
||||
app.config_from_object('django.conf:settings', namespace="CELERY")
|
||||
app.autodiscover_tasks()
|
||||
|
||||
|
||||
@app.task(bind=True)
|
||||
def debug_task(self):
|
||||
from celery.utils.log import base_logger
|
||||
base_logger = base_logger
|
||||
|
||||
base_logger.debug('debug message')
|
||||
base_logger.info('info message')
|
||||
base_logger.warning('warning message')
|
||||
base_logger.error('error message')
|
||||
base_logger.critical('critical message')
|
||||
|
||||
print('Request: {0!r}'.format(self.request))
|
||||
|
||||
return 42
|
||||
return user
|
||||
```
|
||||
|
||||
#### Tasks
|
||||
2 important things to point out here:
|
||||
|
||||
1. We are importing the task (which has the same name as the service), but we are giving it a `_task` suffix.
|
||||
1. And when the transaction commits, we'll call the task.
|
||||
|
||||
**So, in general, the way we use Celery can be described as:**
|
||||
|
||||
1. Tasks call services.
|
||||
2. We import the service in the function body of the task.
|
||||
3. When we want to trigger a task, we import the task, at module level, giving the `_task` suffix.
|
||||
4. We execute tasks, as a side effect, whenever our transaction commits.
|
||||
|
||||
This way of mixing tasks & services also **prevents circular imports**, which may occurr often enough when using Celery.
|
||||
|
||||
### Error handling
|
||||
|
||||
Sometimes, our service can fail and we might want to handle the error on the task level. For example - we might want to retry the task.
|
||||
|
||||
This error handling code needs to live in the task.
|
||||
|
||||
Lets expand the `email_send` task example from above, by adding error handling:
|
||||
|
||||
```python
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from styleguide_example.emails.models import Email
|
||||
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
def _email_send_failure(self, exc, task_id, args, kwargs, einfo):
|
||||
email_id = args[0]
|
||||
email = Email.objects.get(id=email_id)
|
||||
|
||||
from styleguide_example.emails.services import email_failed
|
||||
|
||||
email_failed(email)
|
||||
|
||||
|
||||
@shared_task(bind=True, on_failure=_email_send_failure)
|
||||
def email_send(self, email_id):
|
||||
email = Email.objects.get(id=email_id)
|
||||
|
||||
from styleguide_example.emails.services import email_send
|
||||
|
||||
try:
|
||||
email_send(email)
|
||||
except Exception as exc:
|
||||
# https://docs.celeryq.dev/en/stable/userguide/tasks.html#retrying
|
||||
logger.warning(f"Exception occurred while sending email: {exc}")
|
||||
self.retry(exc=exc, countdown=5)
|
||||
```
|
||||
|
||||
As you can see, we do a bunch of retries and if all of them fail, we handle this in the `on_failure` callback.
|
||||
|
||||
The callback follows the naming pattern of `_{task_name}_failuire` and it calls the service layer, just like an ordinary task.
|
||||
|
||||
### Configuration
|
||||
|
||||
We pretty much follow the official guidelines of integrating Celery with Django - <https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html>
|
||||
|
||||
For a full example, you can check the Celery configuration in the `Django-Styleguide-Example` project:
|
||||
|
||||
- <https://github.com/HackSoftware/Django-Styleguide-Example/tree/master/styleguide_example/tasks>
|
||||
- <https://github.com/HackSoftware/Django-Styleguide-Example/blob/master/styleguide_example/tasks/celery.py>
|
||||
|
||||
Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options.
|
||||
|
||||
We constantly do that & find new things or find better approaches to our problems.
|
||||
|
||||
### Structure
|
||||
|
||||
Tasks are located in `tasks.py` modules in different apps.
|
||||
|
||||
|
@ -2503,76 +2588,13 @@ Meaning, you can end up with `tasks/domain_a.py` and `tasks/domain_b.py`. All yo
|
|||
|
||||
The general rule of thumb is - split your tasks in a way that'll make sense to you.
|
||||
|
||||
#### Circular imports between tasks & services
|
||||
|
||||
In some cases, you need to invoke a task from a service or vice-versa:
|
||||
|
||||
```python
|
||||
# project/app/services.py
|
||||
|
||||
from project.app.tasks import task_function_1
|
||||
|
||||
|
||||
def service_function_1():
|
||||
print('I delay a task!')
|
||||
task_function_1.delay()
|
||||
|
||||
|
||||
def service_function_2():
|
||||
print('I do not delay a task!')
|
||||
```
|
||||
|
||||
```python
|
||||
# project/app/tasks.py
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
from project.app.services import service_function_2
|
||||
|
||||
|
||||
@shared_task
|
||||
def task_function_1():
|
||||
print('I do not call a service!')
|
||||
|
||||
|
||||
@shared_task
|
||||
def task_function_2():
|
||||
print('I call a service!')
|
||||
service_function_2()
|
||||
```
|
||||
|
||||
Unfortunately, this will result in a circular import.
|
||||
|
||||
What we usually do is we import the service function **inside** the task function:
|
||||
|
||||
```python
|
||||
# project/app/tasks.py
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
|
||||
@shared_task
|
||||
def task_function_1():
|
||||
print('I do not call a service!')
|
||||
|
||||
|
||||
@shared_task
|
||||
def task_function_2():
|
||||
from project.app.services import service_function_2 # <--
|
||||
|
||||
print('I call a service!')
|
||||
service_function_2()
|
||||
```
|
||||
|
||||
* Note: Depending on the case, you may want to import the task function **inside** the service function. This is OK and will still prevent the circular import between service & task functions.
|
||||
|
||||
### Periodic Tasks
|
||||
|
||||
Managing periodic tasks is quite important, especially when you have tens or hundreds of them.
|
||||
|
||||
We use [Celery Beat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) + `django_celery_beat.schedulers:DatabaseScheduler` + [`django-celery-beat`](https://github.com/celery/django-celery-beat) for our periodic tasks.
|
||||
|
||||
The extra thing that we do is to have a management command, called `setup_periodic_tasks`, which holds the definition of all periodic tasks within the system. This command is located in the `tasks` app, discussed above.
|
||||
The extra thing that we do is to have a management command, called [`setup_periodic_tasks`](https://github.com/HackSoftware/Django-Styleguide-Example/blob/master/styleguide_example/tasks/management/commands/setup_periodic_tasks.py), which holds the definition of all periodic tasks within the system. This command is located in the `tasks` app, discussed above.
|
||||
|
||||
Here's how `project.tasks.management.commands.setup_periodic_tasks.py` looks like:
|
||||
|
||||
|
@ -2640,11 +2662,15 @@ Few key things:
|
|||
* Everything is in one place.
|
||||
* ⚠️ We use, almost exclusively, a cron schedule. **If you plan on using the other schedule objects, provided by Celery, please read thru their documentation** & the important notes - <https://django-celery-beat.readthedocs.io/en/latest/#example-creating-interval-based-periodic-task> - about pointing to the same schedule object. ⚠️
|
||||
|
||||
### Configuration
|
||||
### Beyond
|
||||
|
||||
Celery is a complex topic, so it's a good idea to invest time reading the documentation & understanding the different configuration options.
|
||||
Celery has powerful tools to implement complex workflows - <https://docs.celeryq.dev/en/stable/userguide/canvas.html>
|
||||
|
||||
We constantly do that & find new things or find better approaches to our problems.
|
||||
If you decide to use them, the rules still apply.
|
||||
|
||||
You may need to reorganize things a bit, but as long as you have a well-defined interface to your application core, you'll be able to mix and match tasks & services in more complex scenarios.
|
||||
|
||||
**More complex scenarios depend on their context. Make sure you are aware of the architecture & the decisions you are making.**
|
||||
|
||||
## Cookbook
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user