Reformed project, improved text search

This commit is contained in:
Alexander Karpov 2022-10-23 12:11:53 +03:00
parent d74c515f4e
commit 08a442d8f3
9 changed files with 274 additions and 251 deletions

View File

@ -15,7 +15,7 @@ from search.api.serializers import (
) )
from search.models import Product from search.models import Product
from search.services.colors import group from search.services.colors import group
from search.services.search import process_search from search.services.search.main import process_search
from search.services.autocomplete_schema import autocomplete_schema from search.services.autocomplete_schema import autocomplete_schema
from search.services.hints import get_hints from search.services.hints import get_hints

View File

@ -8,6 +8,17 @@ def autocomplete_schema(val: str, exclude: List[Dict]):
name_exclude = [x["value"] for x in exclude if x["type"] == "Name"] name_exclude = [x["value"] for x in exclude if x["type"] == "Name"]
category_exclude = [x["value"] for x in exclude if x["type"] == "Category"] category_exclude = [x["value"] for x in exclude if x["type"] == "Category"]
schema = [] schema = []
schema.extend(
[
{
"coordinate": char["name"].lower().replace("ё", "е").index(val.lower()),
"value": {"type": char["name"] + "_numeric", "value": char["name"]},
}
for char in UnitCharacteristic.objects.filter(
name__unaccent__icontains=val
)[:20].values("name", "value")
]
)
if not category_exclude: if not category_exclude:
schema.extend( schema.extend(
[ [
@ -64,15 +75,4 @@ def autocomplete_schema(val: str, exclude: List[Dict]):
.values("name", "value") .values("name", "value")
] ]
) )
schema.extend(
[
{
"coordinate": char["name"].lower().replace("ё", "е").index(val.lower()),
"value": {"type": char["name"] + "_numeric", "value": char["name"]},
}
for char in UnitCharacteristic.objects.filter(
name__unaccent__icontains=val
)[:20].values("name", "value")
]
)
return schema return schema

View File

@ -1,236 +0,0 @@
from search.models import (
Product,
Characteristic,
ProductCharacteristic,
ProductUnitCharacteristic,
UnitCharacteristic,
Category,
)
from typing import List
from search.services.hints import get_hints
from search.services.spell_check import spell_check_ru as spell_check, lemmatize
def process_unit_operation(unit: ProductUnitCharacteristic.objects, operation: str):
if operation.startswith("<=") or operation.startswith("=<"):
return unit.filter(
characteristic__numeric_value_max__lte=int(float(operation[2:]))
)
elif operation.startswith("=>") or operation.startswith(">="):
return unit.filter(
characteristic__numeric_value_min__gte=int(float(operation[2:]))
)
elif operation.startswith(">"):
return unit.filter(
characteristic__numeric_value_min__gt=int(float(operation[1:]))
)
elif operation.startswith("<"):
return unit.filter(
characteristic__numeric_value_max__lt=int(float(operation[1:]))
)
elif operation.startswith("="):
return unit.filter(
characteristic__numeric_value_min__gte=int(float(operation[1:])),
characteristic__numeric_value_max__lte=int(float(operation[1:])),
)
return unit
def _clean_text(text: str) -> List[str]:
for st in [".", ",", "!", "?"]:
text = text.replace(st, " ")
text = text.split()
re = []
for word in text:
re.append(word)
return re
def apply_qs_search(text: str):
text = _clean_text(text)
products = Product.objects.none()
for word in text:
products = (
products
| Product.objects.filter(name__unaccent__trigram_similar=word)
| Product.objects.filter(name__unaccent__icontains=word)
)
products = products.order_by("-score")
return products
def apply_all_qs_search(orig_qs, text: str):
# words
text = _clean_text(text)
u_qs = None
# try to find Unit characteristics
if any(x.isnumeric() for x in text):
u_qs = ProductUnitCharacteristic.objects.filter()
for i in range(len(text)):
el = text[i]
if el.isnumeric():
if i == len(text) - 1:
if ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i - 1]
).exists():
unit = ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i - 1]
)
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i]
del text[i - 1]
break
elif len(text) - 1 > i >= 1:
if ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i - 1]
).exists():
unit = ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i - 1]
)[0]
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i]
del text[i - 1]
break
elif ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i + 1]
).exists():
unit = UnitCharacteristic.objects.filter(
ProductUnitCharacteristic=text[i + 1]
)[0]
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i]
del text[i + 1]
break
else:
if ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i + 1]
).exists():
unit = ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i + 1]
)[0]
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i]
del text[i + 1]
break
prod = Product.objects.filter()
for word in text:
car = ProductCharacteristic.objects.filter(
characteristic__value__icontains=word,
)
qs = (
Product.objects.filter(name__icontains=word)
| Product.objects.filter(name__unaccent__trigram_similar=word)
| Product.objects.filter(category__name__icontains=word)
| Product.objects.filter(characteristics__in=car)
)
prod = prod & qs
if u_qs:
prod = prod & Product.objects.filter(unit_characteristics__in=u_qs)
return prod
def process_search(data: List[dict], limit=5, offset=0) -> List[dict]:
prep_data = []
prep_dict = {}
prep_dict_char_type = {}
# --------------------------------------- prepare filters -------------------------------------------------------- #
for x in data:
dat = dict(x)
if x["type"] in ["Name", "Category", "Characteristic", "All"]:
prep_data.append(
{
"type": dat["type"],
"value": spell_check(
dat["value"],
),
}
)
elif x["type"] == "Unknown":
type = get_hints(dat["value"])
prep_data.append(
{
"type": type,
"value": spell_check(
dat["value"],
),
}
)
else:
val = spell_check(
dat["value"],
)
if x["type"] in list(prep_dict.keys()):
if x["type"].startswith("*"):
unit = ProductUnitCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
)
prep_dict[x["type"]] = prep_dict[
x["type"]
] | process_unit_operation(unit, x["value"])
else:
prep_dict[x["type"]] = (
prep_dict[x["type"]]
| ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__unaccent__trigram_similar=val,
)
| ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__icontains=val,
)
)
else:
if x["type"].startswith("*"):
prep_dict_char_type[x["type"]] = UnitCharacteristic.objects.filter(
name__unaccent__trigram_similar=x["type"]
) | UnitCharacteristic.objects.filter(name__icontains=x["type"])
unit = ProductUnitCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
)
prep_dict[x["type"]] = process_unit_operation(unit, x["value"])
else:
prep_dict_char_type[x["type"]] = Characteristic.objects.filter(
name__unaccent__trigram_similar=x["type"]
) | Characteristic.objects.filter(name__icontains=x["type"])
prep_dict[x["type"]] = ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__unaccent__trigram_similar=val,
) | ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__icontains=val,
)
for el, val in prep_dict.items():
prep_data.append({"type": el, "value": val})
# ----------------------------------- apply filters on QuerySet -------------------------------------------------- #
qs = Product.objects.filter()
for x in prep_data:
typ = x["type"]
val = x["value"]
if typ == "Name":
qs = qs & apply_qs_search(val)
qs = qs.order_by("-score")
elif typ == "All":
qs = apply_all_qs_search(qs, val) & qs
elif typ == "Category":
qs = qs.filter(category__name__icontains=val)
qs = qs.order_by("-score")
elif typ == "Characteristic":
char = ProductCharacteristic.objects.filter(product__in=qs)
char = char.filter(characteristic__value__icontains=val) | char.filter(
characteristic__value__unaccent__trigram_similar=val
)
qs = qs.filter(characteristics__in=char)
qs = qs.order_by("-score")
elif typ == "Unknown":
continue
else:
if typ.startswith("*"):
qs = qs.filter(unit_characteristics__in=val)
else:
qs = qs.filter(characteristics__in=val)
return [x.serialize_self() for x in qs.distinct()[offset : offset + limit]]

View File

View File

@ -0,0 +1,38 @@
from typing import List
from search.services.search.methods import (
apply_qs_search,
apply_all_qs_search,
apply_qs_category,
appy_qs_characteristic,
)
from search.services.search.prepare import apply_union
from search.models import Product
def process_search(data: List[dict], limit=5, offset=0) -> List[dict]:
prep_data = apply_union(data)
# ----------------------------------- apply filters on QuerySet -------------------------------------------------- #
qs = Product.objects.filter()
for x in prep_data:
typ = x["type"]
val = x["value"]
if typ == "Name":
qs = qs & apply_qs_search(val)
qs = qs.order_by("-score")
elif typ == "All":
qs = apply_all_qs_search(qs, val) & qs
elif typ == "Category":
qs = apply_qs_category(qs, val)
qs = qs.order_by("-score")
elif typ == "Characteristic":
qs = appy_qs_characteristic(qs, val)
qs = qs.order_by("-score")
elif typ == "Unknown":
continue
else:
if typ.startswith("*"):
qs = qs.filter(unit_characteristics__in=val)
else:
qs = qs.filter(characteristics__in=val)
return [x.serialize_self() for x in qs.distinct()[offset: offset + limit]]

View File

@ -0,0 +1,130 @@
from typing import List
from search.models import (
Product,
ProductCharacteristic,
ProductUnitCharacteristic,
)
from search.services.spell_check import pos
def _clean_text(text: str) -> List[str]:
for st in [".", ",", "!", "?"]:
text = text.replace(st, " ")
text = text.split()
functors_pos = {"INTJ", "PRCL", "CONJ", "PREP"} # function words
return [word for word in text if pos(word) not in functors_pos]
def process_unit_operation(unit: ProductUnitCharacteristic.objects, operation: str):
if operation.startswith("<=") or operation.startswith("=<"):
return unit.filter(
characteristic__numeric_value_max__lte=int(float(operation[2:]))
)
elif operation.startswith("=>") or operation.startswith(">="):
return unit.filter(
characteristic__numeric_value_min__gte=int(float(operation[2:]))
)
elif operation.startswith(">"):
return unit.filter(
characteristic__numeric_value_min__gt=int(float(operation[1:]))
)
elif operation.startswith("<"):
return unit.filter(
characteristic__numeric_value_max__lt=int(float(operation[1:]))
)
elif operation.startswith("="):
return unit.filter(
characteristic__numeric_value_min__gte=int(float(operation[1:])),
characteristic__numeric_value_max__lte=int(float(operation[1:])),
)
return unit
def apply_qs_search(text: str):
text = _clean_text(text)
products = Product.objects.none()
for word in text:
products = (
products
| Product.objects.filter(name__unaccent__icontains=word)
| Product.objects.filter(name__unaccent__trigram_similar=word)
)
products = products.order_by("-score")
return products
def apply_all_qs_search(orig_qs, text: str):
# words
text = _clean_text(text)
u_qs = None
# try to find Unit characteristics
if any(x.isnumeric() for x in text):
u_qs = ProductUnitCharacteristic.objects.filter()
for i in range(len(text)):
el = text[i]
if el.isnumeric():
if i == len(text) - 1:
if unit := ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i - 1]
):
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i - 1]
del text[i - 1]
break
elif len(text) - 1 > i >= 1:
if unit := ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i + 1]
):
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i]
del text[i]
break
elif unit := ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i - 1]
):
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i - 1]
del text[i - 1]
break
else:
if unit := ProductUnitCharacteristic.objects.filter(
characteristic__name__icontains=text[i + 1]
):
u_qs = u_qs & process_unit_operation(unit, f"={text[i]}")
del text[i]
del text[i]
break
prod = Product.objects.filter()
for word in text:
car = ProductCharacteristic.objects.filter(
characteristic__value__icontains=word,
)
qs = (
Product.objects.filter(name__icontains=word)
| Product.objects.filter(category__name__icontains=word)
| Product.objects.filter(characteristics__in=car)
)
prod = prod & qs
if u_qs:
prod = prod & Product.objects.filter(unit_characteristics__in=u_qs)
return prod
def apply_qs_category(qs, name: str):
qs = qs.filter(category__name__icontains=name)
return qs
def appy_qs_characteristic(qs, name: str):
char = ProductCharacteristic.objects.filter(product__in=qs)
char = char.filter(characteristic__value__icontains=name) | char.filter(
characteristic__value__unaccent__trigram_similar=name
)
qs = qs.filter(characteristics__in=char)
return qs

View File

@ -0,0 +1,87 @@
from typing import List, Dict
from rest_framework.exceptions import ValidationError
from search.models import Characteristic, ProductCharacteristic, ProductUnitCharacteristic, UnitCharacteristic
from search.services.hints import get_hints
from search.services.search.methods import process_unit_operation
)
from search.services.spell_check import spell_check_ru as spell_check
def apply_union(data: List[Dict]) -> List[Dict]:
prep_data = []
prep_dict = {}
prep_dict_char_type = {}
# --------------------------------------- prepare filters -------------------------------------------------------- #
for x in data:
dat = dict(x)
if "type" not in dat or "value" not in dat:
raise ValidationError("Improper body structure")
if x["type"] in ["Name", "Category", "Characteristic", "All"]:
prep_data.append(
{
"type": dat["type"],
"value": spell_check(
dat["value"],
),
}
)
elif x["type"] == "Unknown":
type = get_hints(dat["value"])
prep_data.append(
{
"type": type,
"value": spell_check(
dat["value"],
),
}
)
else:
val = spell_check(
dat["value"],
)
if x["type"] in list(prep_dict.keys()):
if x["type"].startswith("*"):
unit = ProductUnitCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
)
prep_dict[x["type"]] = prep_dict[
x["type"]
] | process_unit_operation(unit, x["value"])
else:
prep_dict[x["type"]] = (
prep_dict[x["type"]]
| ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__unaccent__trigram_similar=val,
)
| ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__icontains=val,
)
)
else:
if x["type"].startswith("*"):
prep_dict_char_type[x["type"]] = UnitCharacteristic.objects.filter(
name__unaccent__trigram_similar=x["type"]
) | UnitCharacteristic.objects.filter(name__icontains=x["type"])
unit = ProductUnitCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
)
prep_dict[x["type"]] = process_unit_operation(unit, x["value"])
else:
prep_dict_char_type[x["type"]] = Characteristic.objects.filter(
name__unaccent__trigram_similar=x["type"]
) | Characteristic.objects.filter(name__icontains=x["type"])
prep_dict[x["type"]] = ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__unaccent__trigram_similar=val,
) | ProductCharacteristic.objects.filter(
characteristic__in=prep_dict_char_type[x["type"]],
characteristic__value__icontains=val,
)
for el, val in prep_dict.items():
prep_data.append({"type": el, "value": val})
return prep_data

View File

@ -23,5 +23,8 @@ morph = pymorphy2.MorphAnalyzer()
def lemmatize(word): def lemmatize(word):
p = morph.parse(word)[0] return morph.parse(word)[0].normal_form
return p.normal_form
def pos(word):
return morph.parse(word)[0].tag.POS

View File

@ -12,3 +12,4 @@ psycopg2-binary==2.9.4
celery==5.2.7 celery==5.2.7
pyspellchecker==0.7.0 pyspellchecker==0.7.0
pymorphy2