import torch from transformers import AutoModelForSequenceClassification from transformers import AutoTokenizer, BertConfig from captum.attr import LayerIntegratedGradients import re import torch import numpy as np from collections import Counter from fastapi import FastAPI from pydantic import BaseModel import pickle from matplotlib.colors import LinearSegmentedColormap from catboost import CatBoostClassifier from pymystem3 import Mystem from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier from matplotlib.colors import LinearSegmentedColormap from sentence_transformers import SentenceTransformer sentence_model = SentenceTransformer('sentence-transformers/LaBSE') catboost = CatBoostClassifier().load_model('catboost') def get_embs(text): embeddings = sentence_model.encode(text) return embeddings cmap = LinearSegmentedColormap.from_list('rg',["w", "g"], N=512) mstm = Mystem() with open('vectorizer.pickle', 'rb') as file: model_tfidf = pickle.load(file) with open('tree.pickle', 'rb') as file: cls = pickle.load(file) def resolve_text(tokens, text): words = text.split() tokens_values = list(map(lambda tok: tok[0], tokens)) tokens_metrics = list(map(lambda tok: tok[1], tokens)) resolved = [] for i, word in enumerate(words): try: if mstm.lemmatize(word)[0] in tokens_values: try: value = tokens_metrics[tokens_values.index(mstm.lemmatize(word)[0])] #color = from_abs_to_rgb(min(tokens_metrics), max(tokens_metrics), value) resolved.append(f'{word}') except: resolved.append(word) else: resolved.append(word) except: resolved.append(word) return ' '.join(resolved) def process_classify(text): if not len(text.replace(' ', '')): return {'ans': 0, 'text': ''} try: normalized = ''.join(mstm.lemmatize(text)[:-1]) except: return {'ans': 0, 'text': ''} tf_idfed = model_tfidf.transform(np.array([normalized]))[0] ans = cls.predict(tf_idfed)[0] return {'ans': ans, 'text': ""} def process_embedding(text): if not len(text.replace(' ', '')): return {'ans': 0, 'text': ''} try: normalized = ''.join(mstm.lemmatize(text)[:-1]) except: return {'ans': 0, 'text': ''} tf_idfed = model_tfidf.transform(np.array([normalized]))[0] values = [] for i in range(5000): values.append(tf_idfed.todense()[0, i]) important_tokens = [] for i, val in enumerate(values): if val > (np.min(values) + np.max(values)) / 3: important_tokens.append((val, i)) tokens = model_tfidf.get_feature_names_out() tokens = list(map(lambda x: (tokens[x[1]], x[0]), reversed(sorted(important_tokens)))) ans = cls.predict(tf_idfed)[0] text = resolve_text(tokens, text) return {'ans': ans, 'text': text} cmap = LinearSegmentedColormap.from_list('rg',["w", "g"], N=512) label2id = { 'AAA(RU)': 0, 'AA(RU)': 1, 'A+(RU)': 2, 'A(RU)': 3, 'A-(RU)': 4, 'BBB+(RU)': 5, 'BBB(RU)': 6, 'AA+(RU)': 7, 'BBB-(RU)': 8, 'AA-(RU)': 9, 'BB+(RU)': 10, 'BB-(RU)': 11, 'B+(RU)': 12, 'BB(RU)': 13, 'B(RU)': 14, 'B-(RU)': 15, 'C(RU)': 16 } id2label = {0: 'AAA(RU)', 1: 'AA(RU)', 2: 'A+(RU)', 3: 'A(RU)', 4: 'A-(RU)', 5: 'BBB+(RU)', 6: 'BBB(RU)', 7: 'AA+(RU)', 8: 'BBB-(RU)', 9: 'AA-(RU)', 10: 'BB+(RU)', 11: 'BB-(RU)', 12: 'B+(RU)', 13: 'BB(RU)', 14: 'B(RU)', 15: 'B-(RU)', 16: 'C(RU)'} cmap = LinearSegmentedColormap.from_list('rg',["w", "g"], N=512) from math import inf from annoy import AnnoyIndex import numpy as np import pickle def get_distance(emb1, emb2): emb2 /= np.sum(emb2**2) emb1 /= np.sum(emb1**2) return 1 / abs(np.dot(emb2-emb1, emb1-emb2)) with open('new_embeddings.pickle', 'rb') as file: new_embeddings = pickle.load(file) with open('annoy_labels.pickle', 'rb') as file: labels = pickle.load(file) with open('n_labels.pickle', 'rb') as file: n_labels = pickle.load(file) index = AnnoyIndex(768, 'angular') index.load('nearest.annoy') def get_nearest_value(embeddings): items = list(map(lambda x: ( labels[x], get_distance(embeddings, new_embeddings[x]), list(n_labels)[x] ), index.get_nns_by_vector(embeddings, 20) )) weights = np.array([0 for _ in range(17)]) refs = [[] for _ in range(17)] s = 0 for item in items: if item[1] == inf: return id2label[item[0]], 100, [item[2]] s += item[1] weights[item[0]] += item[1] refs[item[0]].append(item[2]) return id2label[np.argmax(weights)], (weights[np.argmax(weights)] / s) * 100, refs[np.argmax(weights)] def to_rgb(vals): return f'rgb({int(vals[0]*255)}, {int(vals[1]*255)}, {int(vals[2]*255)})' def from_abs_to_rgb(min, max, value): return to_rgb(cmap((value - min)/ max)) def get_nns_tokens(encoding, attrs, predicted_id): current_array = map( lambda x: (tokenizer.convert_ids_to_tokens(encoding['input_ids'][0][x[0]-5:x[0]+5]), x[1]), list( reversed( sorted( enumerate( attrs[0][predicted_id].numpy() ), key=lambda x: x[1] ) ) )[0:10] ) return list(current_array) def get_description_interpreting(attrs, predicted_id): attrs = attrs.detach().numpy() positive_weights = attrs[0][predicted_id] negative_weights = [0 for _ in range(len(positive_weights))] for i in range(len(attrs[0])): if i == predicted_id: continue for j in range(len(attrs[0][i])): negative_weights[j] += attrs[0][i][j] for i in range(len(negative_weights)): negative_weights[i] /= len(attrs[0]) - 1 return { 'positive_weights': ( positive_weights, { 'min': np.min(positive_weights), 'max': np.max(positive_weights) } ), 'negative_weights': ( negative_weights, { 'min': min(negative_weights), 'max': max(negative_weights) } ) } def transform_token_ids(func_data, token_ids, word): tokens = list(map(lambda x: tokenizer.convert_ids_to_tokens([x])[0].replace('##', ''), token({'text': clean(word)})['input_ids'][0])) weights = [func_data['positive_weights'][0][i] for i in token_ids] wts = [] for i in range(len(weights)): if weights[i] > 0: #color = from_abs_to_rgb(func_data['positive_weights'][1]['min'], func_data['positive_weights'][1]['max'], weights[i]) mn = max(func_data['positive_weights'][1]['min'], 0) mx = func_data['positive_weights'][1]['max'] wts.append((weights[i] - mn)/ mx) #word = word.lower().replace(tokens[i], f'{tokens[i]}') try: if sum(wts) / len(wts) >= 0.2: return f'{word}' except: pass return word def build_text(tokens, func_data, current_text): splitted_text = current_text.split() splitted_text_iterator = 0 current_word = '' current_word_ids = [] for i, token in enumerate(tokens): decoded = tokenizer.convert_ids_to_tokens([token])[0] if decoded == '[CLS]': continue if not len(current_word): current_word = decoded current_word_ids.append(i) elif decoded.startswith('##'): current_word += decoded[2:] current_word_ids.append(i) else: while clean(splitted_text[splitted_text_iterator]) != current_word: splitted_text_iterator += 1 current_word = decoded splitted_text[splitted_text_iterator] = transform_token_ids(func_data, current_word_ids, splitted_text[splitted_text_iterator]) current_word_ids = [] return ' '.join(splitted_text) def squad_pos_forward_func(inputs, token_type_ids=None, attention_mask=None, position=0): pred = predict(inputs.to(torch.long), token_type_ids.to(torch.long), attention_mask.to(torch.long)) pred = pred[position] return pred.max(1).values def predict_press_release(input_ids, token_type_ids, attention_mask): encoding = { 'input_ids': input_ids.to(model.device), 'token_type_ids': token_type_ids.to(model.device), 'attention_mask': attention_mask.to(model.device) } outputs = model(**encoding) return outputs def clean(text): text = re.sub('[^а-яё ]', ' ', str(text).lower()) text = re.sub(r" +", " ", text).strip() return text def get_description_interpreting(attrs): positive_weights = attrs return { 'positive_weights': ( positive_weights, { 'min': np.min(positive_weights), 'max': np.max(positive_weights) } ), } def predict(input_ids, token_type_ids, attention_mask): encoding = { 'input_ids': input_ids.to(model.device), 'token_type_ids': token_type_ids.to(model.device), 'attention_mask': attention_mask.to(model.device) } outputs = model(**encoding) return outputs def batch_tokenize(text): splitted_text = text.split() current_batch = splitted_text[0] batches = [] for word in splitted_text[1:]: if len(tokenizer(current_batch + ' ' + word)['input_ids']) < 512: current_batch += ' ' + word else: batches.append({ 'text': current_batch }) current_batch = word return batches + [{'text': current_batch}] def token(text): return tokenizer(text['text'], padding=True, truncation=True, max_length=512, return_tensors='pt') def tfidf_classify(data): if not len(data.data): return '' data = list(map(lambda x: x['text'], batch_tokenize(clean(data.data)))) predicted_labels = [] predicted_text = "" for item in data: predicted_labels.append(process_classify(item)['ans']) ans = Counter(predicted_labels).most_common()[0][0] score = len(list(filter(lambda x: x == ans, predicted_labels))) / len(predicted_labels) ans = id2label[ans] return {'answer': ans, 'text': predicted_text, 'metric': score, 'extendingLabels': list(map(lambda x: id2label[x], predicted_labels))} def tfidf_embeddings(data): if not len(data.data): return '' data = list(map(lambda x: x['text'], batch_tokenize(clean(data.data)))) predicted_labels = [] predicted_text = "" for item in data: ans = process_embedding(item) predicted_labels.append(ans['ans']) predicted_text += ans['text'] + ' ' ans = Counter(predicted_labels).most_common()[0][0] print(ans, predicted_text) return {'answer': id2label[ans], 'text': predicted_text} def bert_classify(data): data = clean(data) predicted = [] text = '' batched = batch_tokenize(data) for b in batched: print(len(predicted)) embs = token(b) answer = predict_press_release( embs['input_ids'], embs['token_type_ids'], embs['attention_mask'] ).logits[0] answer = torch.softmax(answer, dim=-1).detach().numpy() answer_score = np.max(answer) predicted.append( [id2label[np.argmax(answer)], float(answer_score)] ) ans = {'AA(RU)': [0]} for i in predicted: if i[0] not in ans.keys(): ans.update({i[0]: [i[1]]}) else: ans[i[0]].append(i[1]) selected = 'AA(RU)' score = 0 for candidate in ans.keys(): if sum(ans[candidate]) / len(ans[candidate]) > score: score = sum(ans[candidate]) / len(ans[candidate]) selected = candidate elif sum(ans[candidate]) / len(ans[candidate]) == score and len(ans[candidate]) > len(ans): selected = candidate return { 'answer': selected, 'text': text, 'longAnswer': predicted, 'metric': score } def bert_embeddings(data): data = clean(data) predicted = [] text = '' batched = batch_tokenize(data) for b in batched: embs = token(b) predicted.append(np.argmax(predict_press_release(embs['input_ids'], embs['token_type_ids'], embs['attention_mask']).logits.detach().numpy()[0])) attrs = lig.attribute(embs['input_ids'], additional_forward_args=(embs['attention_mask'], embs['token_type_ids'], 0)) attrs = np.array(list(map(lambda x: x.sum(), attrs[0]))) descr = get_description_interpreting(attrs) text += build_text(embs['input_ids'][0], descr, b['text']) + ' ' return {'answer': id2label[Counter(predicted).most_common()[0][0]], 'text': text} config = BertConfig.from_json_file("./akra_model/checkpoint/config.json") model = AutoModelForSequenceClassification.from_pretrained( "./akra_model/checkpoint", config=config ) tokenizer = AutoTokenizer.from_pretrained("cointegrated/rubert-tiny") lig = LayerIntegratedGradients(squad_pos_forward_func, model.bert.embeddings) app = FastAPI() class Predict(BaseModel): data: str class ListPredict(BaseModel): data: list @app.post('/predict') def predict_(data: Predict): return bert_classify(data) @app.post('/bert/process') def predict_f(data: Predict): return bert_classify(data) @app.get('/interpret') def interpret(): pass @app.post('/tfidf/process') def tfidf_res(data: Predict): return tfidf_classify(data) @app.post('/tfidf/batch') def tfidf_batch(data: ListPredict): res = [] for item in data.data: res.append(tfidf_classify(Predict(data=item))) return res @app.post('/bert/batch') def bert_batch(data: ListPredict): res = [] for item in data: res.append(bert_classify({'data': item})) return res @app.post('/bert/describe') def bert_describe(data: Predict): return bert_embeddings(data) @app.post('/tfidf/describe') def tfidf_describe(data: Predict): return tfidf_embeddings(data) def get_nearest_service(data: Predict): data = clean(data.data) batched = batch_tokenize(data) res = [] scores = {} for key in id2label.values(): scores.update({key: []}) for batch in batched: features = list(get_nearest_value(get_embs(batch['text']))) features[0] = features[0] features[1] /= 100 scores[features[0]].append(features[1] if features[1] < 95 else 100) res.append( { 'text': batch['text'], 'features': features } ) mx = 0 label = 'AA(RU)' for key in scores.keys(): try: if (sum(scores[key]) / len(scores[key])) > mx: label = key mx = (sum(scores[key]) / len(scores[key])) if (sum(scores[key]) / len(scores[key])) == mx: if len(scores[key]) > len(scores[label]): label = key except: pass return {'detailed': res, 'metric': mx, 'answer': label} @app.post('/nearest/nearest') def proccess_text(data: Predict): return get_nearest_service(data) @app.post('/catboost') def catboost_process(data: Predict): tfidf = tfidf_classify(data) bert = bert_classify(data) nearest = get_nearest_service(data) inputs = [label2id[tfidf['answer']], tfidf['metric'], bert['metric'], label2id[bert['answer']], nearest['metric'], label2id[nearest['answer']]] catboost_answer = id2label[catboost.predict([inputs])[0][0]] return { 'bert': bert, 'tfidf': tfidf, 'nearest': nearest, 'total': catboost_answer }