refactored pipeliner

This commit is contained in:
Alexander Karpov 2023-08-01 03:20:16 +03:00
parent 76457b27b7
commit ffb327a8d9
17 changed files with 132 additions and 390 deletions

View File

@ -0,0 +1,9 @@
from pydantic import BaseModel
class Input(BaseModel):
...
class Run:
...

View File

@ -1,8 +1,11 @@
# Generated by Django 4.0.8 on 2022-12-06 10:13 # Generated by Django 4.2.3 on 2023-07-09 19:20
from django.conf import settings from django.conf import settings
from django.db import migrations, models from django.db import migrations, models
import django.db.models.deletion import django.db.models.deletion
import django.utils.timezone
import model_utils.fields
import uuid
class Migration(migrations.Migration): class Migration(migrations.Migration):
@ -10,13 +13,13 @@ class Migration(migrations.Migration):
initial = True initial = True
dependencies = [ dependencies = [
("shortener", "0001_initial"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("contenttypes", "0002_remove_content_type_name"),
] ]
operations = [ operations = [
migrations.CreateModel( migrations.CreateModel(
name="Workspace", name="WorkSpace",
fields=[ fields=[
( (
"id", "id",
@ -27,38 +30,83 @@ class Migration(migrations.Migration):
verbose_name="ID", verbose_name="ID",
), ),
), ),
("name", models.CharField(blank=True, max_length=50)), ("slug", models.SlugField(blank=True, max_length=20, unique=True)),
("slug", models.SlugField(max_length=8)),
],
),
migrations.CreateModel(
name="BaseBlock",
fields=[
( (
"id", "created",
models.BigAutoField( model_utils.fields.AutoCreatedField(
auto_created=True, default=django.utils.timezone.now,
primary_key=True, editable=False,
serialize=False, verbose_name="created",
verbose_name="ID",
), ),
), ),
(
"modified",
model_utils.fields.AutoLastModifiedField(
default=django.utils.timezone.now,
editable=False,
verbose_name="modified",
),
),
("name", models.CharField(default="WorkSpace", max_length=200)),
("private", models.BooleanField(default=True)),
( (
"creator", "creator",
models.ForeignKey( models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, on_delete=django.db.models.deletion.CASCADE,
related_name="pipeline_blocks", related_name="workspaces",
to=settings.AUTH_USER_MODEL, to=settings.AUTH_USER_MODEL,
), ),
), ),
( (
"polymorphic_ctype", "short_link",
models.ForeignKey( models.ForeignKey(
editable=False, blank=True,
null=True, null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="shortener.link",
),
),
],
options={
"abstract": False,
},
),
migrations.CreateModel(
name="Block",
fields=[
(
"created",
model_utils.fields.AutoCreatedField(
default=django.utils.timezone.now,
editable=False,
verbose_name="created",
),
),
(
"modified",
model_utils.fields.AutoLastModifiedField(
default=django.utils.timezone.now,
editable=False,
verbose_name="modified",
),
),
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
("type", models.CharField(db_index=True, max_length=250)),
("context", models.JSONField(blank=True, null=True)),
(
"parents",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, on_delete=django.db.models.deletion.CASCADE,
related_name="polymorphic_%(app_label)s.%(class)s_set+", related_name="children",
to="contenttypes.contenttype", to="pipeliner.block",
), ),
), ),
( (
@ -72,7 +120,6 @@ class Migration(migrations.Migration):
], ],
options={ options={
"abstract": False, "abstract": False,
"base_manager_name": "objects",
}, },
), ),
] ]

View File

@ -1,209 +0,0 @@
# Generated by Django 4.1.5 on 2023-01-11 10:30
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
import uuid
class Migration(migrations.Migration):
dependencies = [
("contenttypes", "0002_remove_content_type_name"),
("pipeliner", "0001_initial"),
]
operations = [
migrations.CreateModel(
name="BaseStorage",
fields=[
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"polymorphic_ctype",
models.ForeignKey(
editable=False,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="polymorphic_%(app_label)s.%(class)s_set+",
to="contenttypes.contenttype",
),
),
],
options={
"abstract": False,
"base_manager_name": "objects",
},
),
migrations.CreateModel(
name="ConstantNumberBlock",
fields=[
(
"baseblock_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="pipeliner.baseblock",
),
),
("number", models.DecimalField(decimal_places=2, max_digits=5)),
],
options={
"abstract": False,
},
bases=("pipeliner.baseblock",),
),
migrations.CreateModel(
name="ConstantStringBlock",
fields=[
(
"baseblock_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="pipeliner.baseblock",
),
),
("string", models.TextField()),
],
options={
"abstract": False,
},
bases=("pipeliner.baseblock",),
),
migrations.CreateModel(
name="MultiplicationBlock",
fields=[
(
"baseblock_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="pipeliner.baseblock",
),
),
("by", models.DecimalField(decimal_places=2, max_digits=5)),
],
options={
"abstract": False,
"base_manager_name": "objects",
},
bases=("pipeliner.baseblock",),
),
migrations.AddField(
model_name="baseblock",
name="created",
field=models.DateTimeField(
auto_now_add=True, default=django.utils.timezone.now
),
preserve_default=False,
),
migrations.AddField(
model_name="baseblock",
name="name",
field=models.CharField(blank=True, max_length=100),
),
migrations.AddField(
model_name="baseblock",
name="parent",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="children",
to="pipeliner.baseblock",
),
),
migrations.AddField(
model_name="baseblock",
name="updated",
field=models.DateTimeField(auto_now=True),
),
migrations.CreateModel(
name="RunnerStorage",
fields=[
(
"basestorage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="pipeliner.basestorage",
),
),
("data", models.JSONField(default=dict)),
],
options={
"abstract": False,
"base_manager_name": "objects",
},
bases=("pipeliner.basestorage",),
),
migrations.CreateModel(
name="Storage",
fields=[
(
"basestorage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="pipeliner.basestorage",
),
),
("data", models.JSONField(default=dict)),
],
options={
"abstract": False,
"base_manager_name": "objects",
},
bases=("pipeliner.basestorage",),
),
migrations.CreateModel(
name="TrashBlock",
fields=[
(
"baseblock_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="pipeliner.baseblock",
),
),
(
"storage",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="pipeliner.storage",
),
),
],
options={
"abstract": False,
"base_manager_name": "objects",
},
bases=("pipeliner.baseblock",),
),
]

View File

@ -0,0 +1,31 @@
import uuid
from django.db import models
from model_utils.models import TimeStampedModel
from akarpov.tools.shortener.models import ShortLinkModel
from akarpov.users.services.history import UserHistoryModel
class WorkSpace(TimeStampedModel, ShortLinkModel, UserHistoryModel):
name = models.CharField(default="WorkSpace", max_length=200)
creator = models.ForeignKey(
"users.User", related_name="workspaces", on_delete=models.CASCADE
)
private = models.BooleanField(default=True)
class Block(TimeStampedModel):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
workspace = models.ForeignKey(
"WorkSpace", related_name="blocks", on_delete=models.CASCADE
)
parents = models.ForeignKey(
"self", related_name="children", on_delete=models.CASCADE
)
type = models.CharField(max_length=250, db_index=True)
context = models.JSONField(null=True, blank=True)
def __str__(self):
return f"block {self.id} - {self.type}"

View File

@ -1,3 +0,0 @@
from .base import * # noqa
from .basic import * # noqa
from .manage import * # noqa

View File

@ -1,68 +0,0 @@
import uuid
from django.db import models
from polymorphic.models import PolymorphicModel
class BaseBlock(PolymorphicModel):
"""Base block for pipelines for further explanation check examples"""
TYPE = "Base"
name: models.CharField
created: models.DateTimeField
updated: models.DateTimeField
creator: models.ForeignKey
workspace: models.ForeignKey
parent: models.ForeignKey
name = models.CharField(max_length=100, blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
creator = models.ForeignKey(
"users.User", related_name="pipeline_blocks", on_delete=models.CASCADE
)
workspace = models.ForeignKey(
"Workspace", related_name="blocks", on_delete=models.CASCADE
)
parent = models.ForeignKey(
"self", null=True, related_name="children", on_delete=models.SET_NULL
)
def __str__(self):
return f"{self.TYPE} block"
class ProviderBlock(BaseBlock):
TYPE = "Provider"
parent = None
children: list[BaseBlock]
class Meta:
abstract = True
class EndBlock(BaseBlock):
TYPE = "End"
parent = BaseBlock
children: None
class Meta:
abstract = True
class BaseStorage(PolymorphicModel):
id: uuid.UUID = models.UUIDField(
primary_key=True, default=uuid.uuid4, editable=False
)
class Storage(BaseStorage):
data = models.JSONField(default=dict)
class RunnerStorage(BaseStorage):
# TODO: move to cacheops
data = models.JSONField(default=dict)

View File

@ -1,3 +0,0 @@
from .modifiers import * # noqa
from .output import * # noqa
from .provides import * # noqa

View File

@ -1,9 +0,0 @@
from django.db import models
from akarpov.pipeliner.models import BaseBlock
class MultiplicationBlock(BaseBlock):
TYPE = "Multiply"
by = models.DecimalField(max_digits=5, decimal_places=2)

View File

@ -1,9 +0,0 @@
from django.db import models
from akarpov.pipeliner.models import BaseBlock
class TrashBlock(BaseBlock):
TYPE = "Trash"
storage = models.ForeignKey("Storage", on_delete=models.CASCADE)

View File

@ -1,15 +0,0 @@
from django.db import models
from akarpov.pipeliner.models import ProviderBlock
class ConstantNumberBlock(ProviderBlock):
TYPE = "Number"
number = models.DecimalField(max_digits=5, decimal_places=2)
class ConstantStringBlock(ProviderBlock):
TYPE = "String"
string = models.TextField()

View File

@ -1,13 +0,0 @@
from django.db import models
from akarpov.pipeliner.models import BaseBlock
class Workspace(models.Model):
blocks: list[BaseBlock]
name = models.CharField(max_length=50, blank=True)
slug = models.SlugField(max_length=8)
def __str__(self):
return self.name

View File

@ -0,0 +1,23 @@
from abc import ABC
from akarpov.pipeliner.models import Block
class BlockRunner(ABC):
def __init__(self, block: Block, data: dict):
self.block = block
self.data = data
self.context = self._get_context_data()
def _get_context_data(self):
context = self.block.context # type: dict
if context:
for key, val in context.items():
if val.strarswith("$"):
if key not in self.data:
raise KeyError(
f"No context data found for {key} in block {self.block.id}"
)
context[key] = self.data[key]
return context
return {}

View File

@ -1,33 +0,0 @@
from akarpov.pipeliner.models import BaseBlock
class BlockRunner:
"""iterates over block in tree order"""
def __init__(self, parent_block: BaseBlock):
self.parent_block = parent_block
self.root = self.get_root()
self.order = self.get_order(self.root)
def __iter__(self):
self.block = self.parent_block.get_root()
return self
def __str__(self):
return f"block runner for {self.root}, currently running {self.block}"
def get_order(self, block: BaseBlock) -> list[BaseBlock]:
order = []
for children in block.children.all():
order.extend(self.get_order(children))
return order
def get_root(self):
root = self.parent_block
if root.parent:
while root.parent:
root = root.parent
return root
def __next__(self):
yield from self.order

View File

@ -1,6 +0,0 @@
from celery import shared_task
@shared_task()
def run_pipe_thread(pk: int):
return pk