mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-25 02:13:44 +03:00
5283a835dc
Url to fetch source changed from the official Postgres one to the Github mirror because the former throttled us.
137 lines
4.1 KiB
Python
Executable File
137 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Generate the errors module from PostgreSQL source code.
|
|
|
|
The script can be run at a new PostgreSQL release to refresh the module.
|
|
"""
|
|
|
|
# Copyright (C) 2018-2019 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
|
# Copyright (C) 2020-2021 The Psycopg Team
|
|
#
|
|
# psycopg2 is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Lesser General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
|
# License for more details.
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
from urllib.request import urlopen
|
|
from collections import defaultdict
|
|
|
|
|
|
def main():
|
|
filename = os.path.join(
|
|
os.path.dirname(__file__), "../psycopg/sqlstate_errors.h")
|
|
|
|
# If you add a version to the list fix the docs (in errors.rst)
|
|
classes, errors = fetch_errors("11 12 13 14 15 16 17".split())
|
|
|
|
f = open(filename, "w")
|
|
print("/*\n * Autogenerated by 'scripts/make_errors.py'.\n */\n", file=f)
|
|
for line in generate_module_data(classes, errors):
|
|
print(line, file=f)
|
|
|
|
|
|
def parse_errors_txt(url):
|
|
classes = {}
|
|
errors = defaultdict(dict)
|
|
|
|
page = urlopen(url)
|
|
for line in page:
|
|
# Strip comments and skip blanks
|
|
line = line.decode('ascii').split('#')[0].strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Parse a section
|
|
m = re.match(r"Section: (Class (..) - .+)", line)
|
|
if m:
|
|
label, class_ = m.groups()
|
|
classes[class_] = label
|
|
continue
|
|
|
|
# Parse an error
|
|
m = re.match(r"(.....)\s+(?:E|W|S)\s+ERRCODE_(\S+)(?:\s+(\S+))?$", line)
|
|
if m:
|
|
errcode, macro, spec = m.groups()
|
|
# skip errcodes without specs as they are not publicly visible
|
|
if not spec:
|
|
continue
|
|
errlabel = spec.upper()
|
|
errors[class_][errcode] = errlabel
|
|
continue
|
|
|
|
# We don't expect anything else
|
|
raise ValueError(f"unexpected line:\n{line}")
|
|
|
|
return classes, errors
|
|
|
|
|
|
errors_txt_url = \
|
|
"https://raw.githubusercontent.com/postgres/postgres/refs/heads/%s" \
|
|
"/src/backend/utils/errcodes.txt"
|
|
|
|
|
|
def fetch_errors(versions):
|
|
classes = {}
|
|
errors = defaultdict(dict)
|
|
|
|
for version in versions:
|
|
print(version, file=sys.stderr)
|
|
tver = tuple(map(int, version.split()[0].split('.')))
|
|
tag = f"{tver[0] >= 10 and 'REL_' or 'REL'}{version.replace('.', '_')}_STABLE"
|
|
c1, e1 = parse_errors_txt(errors_txt_url % tag)
|
|
classes.update(c1)
|
|
|
|
for c, cerrs in e1.items():
|
|
errors[c].update(cerrs)
|
|
|
|
return classes, errors
|
|
|
|
|
|
def generate_module_data(classes, errors):
|
|
tmpl = '{"%(errcode)s", "%(cls)s"},'
|
|
specific = {
|
|
'38002': 'ModifyingSqlDataNotPermittedExt',
|
|
'38003': 'ProhibitedSqlStatementAttemptedExt',
|
|
'38004': 'ReadingSqlDataNotPermittedExt',
|
|
'39004': 'NullValueNotAllowedExt',
|
|
'XX000': 'InternalError_',
|
|
}
|
|
|
|
seen = set("""
|
|
Error Warning InterfaceError DataError DatabaseError ProgrammingError
|
|
IntegrityError InternalError NotSupportedError OperationalError
|
|
QueryCanceledError TransactionRollbackError
|
|
""".split())
|
|
|
|
for clscode, clslabel in sorted(classes.items()):
|
|
if clscode in ('00', '01'):
|
|
# success and warning - never raised
|
|
continue
|
|
|
|
yield f"\n/* {clslabel} */"
|
|
|
|
for errcode, errlabel in sorted(errors[clscode].items()):
|
|
if errcode in specific:
|
|
clsname = specific[errcode]
|
|
else:
|
|
clsname = errlabel.title().replace('_', '')
|
|
if clsname in seen:
|
|
raise Exception(f"class already existing: {clsname}")
|
|
seen.add(clsname)
|
|
|
|
yield tmpl % {
|
|
'cls': clsname,
|
|
'errcode': errcode
|
|
}
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|