diff --git a/mask_pii_nlp.py b/mask_pii_nlp.py new file mode 100644 index 0000000..f26641e --- /dev/null +++ b/mask_pii_nlp.py @@ -0,0 +1,184 @@ +import warnings +warnings.filterwarnings("ignore", category=UserWarning, module="pymorphy2") + +import re +import sys +from natasha import ( + Segmenter, MorphVocab, NewsEmbedding, NewsNERTagger, + NamesExtractor, Doc +) + + +class RuPIIMasker: + def __init__(self): + self.segmenter = Segmenter() + self.morph_vocab = MorphVocab() + self.emb = NewsEmbedding() + self.ner_tagger = NewsNERTagger(self.emb) + self.names_extractor = NamesExtractor(self.morph_vocab) + + self.regex_patterns = [ + ('EMAIL', re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b')), + ('PHONE', re.compile( + r'(?:\+7|8)[\s\-]?\(?\d{3,4}\)?[\s\-]?\d{2,3}[\s\-]?\d{2}[\s\-]?\d{2}(?:\s*,?\s*доб\.?\s*\d+(?:-\d+)?)?')), + ('PHONE', re.compile(r'(?:тел\.?|т\.)\s*8[-\s]?\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2}')), + ('SNILS', re.compile(r'\b\d{3}[-\s]?\d{3}[-\s]?\d{3}[-\s]?\d{2}\b')), + ('INN', re.compile(r'(?i)\b(?:инн\s*[:\-]?\s*)(\d{10}|\d{12})\b')), + ('PASSPORT', re.compile(r'(?i)(?:паспорт|серия)\s*[:\-]?\s*\d{2}\s?\d{2}\s*(?:№|номер)?\s*\d{6}')), + ('DATE', re.compile(r'\b\d{1,2}[./]\d{1,2}[./]\d{2,4}\s*г?\.?\b')), + ('DOC_NUM', re.compile(r'(?:№|N|#)\s*[A-Za-zА-Яа-яЁё]{0,5}[-\s]?\d{3,}')), + ('DOC_REF', re.compile(r'(?i)(?:в/х|исх|вх|рег)\s*[.№#]\s*\d+')), + ('CONTRACT_12', re.compile(r'\b\d{12}\b')), + ('FIO', re.compile( + r'(?i)(?:уважаем(?:ый|ая|ое|ые)|дорог(?:ой|ая|ое|ие))\s+([А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+){0,2})(?=[!\s,.])')), + ('FIO', re.compile(r'(?i)\bот\s+([А-ЯЁ][а-яё]{2,}\s+[А-ЯЁ]\.?\s*[А-ЯЁ]?\.?)(?=\s|$|[,.])')), + ('FIO', re.compile( + r'(?i)(?:начальник[аеуом]*|директор[ауе]*|менеджер[ауе]*|специалист[ауе]*|инженер[ауе]*|мастер[ауе]*)[^–\-]{0,50}[–\-]\s*([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+(?:\s+[А-ЯЁ][а-яё]+)?)')), + ('FIO', re.compile(r'\b([А-ЯЁ][а-яё]{2,})\s+([А-ЯЁ])\.?\s*([А-ЯЁ])\.?(?=\s|$|[,.\"\'])')), + ('FIO', re.compile(r'\b([А-ЯЁ])\.?\s*([А-ЯЁ])\.?\s+([А-ЯЁ][а-яё]{2,})(?=\s|$|[,.\"\'])')), + ('FIO', re.compile(r'(?i)\bя\s*,\s*([А-ЯЁ][а-яё]{2,}\s+[А-ЯЁ][а-яё]{2,}\s+[А-ЯЁ][а-яё]{2,})\s*,')), + ('FIO', re.compile(r'(?<=[–\-]\s)([А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+\s+[А-ЯЁ][а-яё]+)(?=\s*[,.]|\s*$)')), + ('FIO_FULL', re.compile( + r'(? list: + spans = [] + for pii_type, pattern in self.regex_patterns: + for m in pattern.finditer(text): + spans.append((m.start(), m.end(), pii_type)) + + for m in self.address_markers.finditer(text): + spans.append((m.start(), m.end(), 'ADDRESS')) + + for m in self.address_pattern.finditer(text): + if len(m.group()) > 15: + spans.append((m.start(), m.end(), 'ADDRESS')) + + return spans + + def _find_ner_spans(self, text: str) -> list: + doc = Doc(text) + doc.segment(self.segmenter) + doc.tag_ner(self.ner_tagger) + + spans = [] + for span in doc.spans: + if span.type in ('PER', 'LOC', 'ORG'): + spans.append((span.start, span.stop, span.type)) + return spans + + def _merge_spans(self, spans: list, text_len: int) -> list: + if not spans: + return [] + + spans = sorted(spans, key=lambda x: (x[0], -x[1])) + merged = [] + + for start, end, pii_type in spans: + if merged and start < merged[-1][1]: + prev_start, prev_end, prev_type = merged[-1] + if end > prev_end: + merged[-1] = (prev_start, end, prev_type) + else: + merged.append((start, end, pii_type)) + + return merged + + def mask(self, text: str) -> str: + if not text or not isinstance(text, str): + return text + + regex_spans = self._find_regex_spans(text) + merged_regex = self._merge_spans(regex_spans, len(text)) + + temp_text = text + offset = 0 + placeholder_map = {} + + for i, (start, end, pii_type) in enumerate(sorted(merged_regex, key=lambda x: x[0])): + placeholder = f"__PH{i}__" + placeholder_map[placeholder] = f"[{pii_type}]" + adj_start = start + offset + adj_end = end + offset + temp_text = temp_text[:adj_start] + placeholder + temp_text[adj_end:] + offset += len(placeholder) - (end - start) + + ner_spans = self._find_ner_spans(temp_text) + + for start, end, pii_type in sorted(ner_spans, key=lambda x: x[0], reverse=True): + chunk = temp_text[start:end] + if not chunk.startswith("__PH"): + temp_text = temp_text[:start] + f'[{pii_type}]' + temp_text[end:] + + for placeholder, replacement in placeholder_map.items(): + temp_text = temp_text.replace(placeholder, replacement) + + return temp_text + + +def process_xlsx(in_path: str, out_path: str, sheet_name: str = None, cols: list = None): + from openpyxl import load_workbook + from tqdm import tqdm + + cols = cols or ['A', 'B'] + masker = RuPIIMasker() + + wb = load_workbook(in_path) + ws = wb[sheet_name] if sheet_name else wb.worksheets[0] + + total = ws.max_row * len(cols) + with tqdm(total=total, desc="Masking PII") as pbar: + for r in range(1, ws.max_row + 1): + for c in cols: + cell = ws[f"{c}{r}"] + if isinstance(cell.value, str) and cell.value: + cell.value = masker.mask(cell.value) + pbar.update(1) + + wb.save(out_path) + print(f"Saved: {out_path}") + + +def main(): + if len(sys.argv) < 2: + print("Usage:") + print(" python mask_pii_nlp.py 'text to mask'") + print(" python mask_pii_nlp.py input.xlsx output.xlsx [sheet] [col1 col2 ...]") + print(" echo 'text' | python mask_pii_nlp.py -") + sys.exit(1) + + arg = sys.argv[1] + + if arg == "-": + masker = RuPIIMasker() + for line in sys.stdin: + print(masker.mask(line.rstrip('\n'))) + elif arg.endswith('.xlsx'): + if len(sys.argv) < 3: + print("Need output.xlsx path") + sys.exit(1) + out_path = sys.argv[2] + sheet = sys.argv[3] if len(sys.argv) >= 4 else None + cols = [c.upper() for c in sys.argv[4:]] if len(sys.argv) >= 5 else ['A', 'B'] + process_xlsx(arg, out_path, sheet, cols) + else: + masker = RuPIIMasker() + print(masker.mask(arg)) + + +if __name__ == "__main__": + main() \ No newline at end of file