mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-10-31 07:47:30 +03:00 
			
		
		
		
	Upgrade f-strings with flynt
This commit is contained in:
		
							parent
							
								
									d956eaa3b1
								
							
						
					
					
						commit
						8d7f660309
					
				|  | @ -25,8 +25,8 @@ def main(): | |||
|     for k in sorted(sqlstate_errors): | ||||
|         exc = sqlstate_errors[k] | ||||
|         lines.append(Line( | ||||
|             "``%s``" % k, "`!%s`" % exc.__name__, | ||||
|             "`!%s`" % get_base_exception(exc).__name__, k)) | ||||
|             f"``{k}``", f"`!{exc.__name__}`", | ||||
|             f"`!{get_base_exception(exc).__name__}`", k)) | ||||
| 
 | ||||
|     widths = [max(len(l[c]) for l in lines) for c in range(3)] | ||||
|     h = Line(*(['=' * w for w in widths] + [None])) | ||||
|  | @ -39,7 +39,7 @@ def main(): | |||
|     for l in lines: | ||||
|         cls = l.sqlstate[:2] if l.sqlstate else None | ||||
|         if cls and cls != sqlclass: | ||||
|             print("**Class {}**: {}".format(cls, sqlclasses[cls])) | ||||
|             print(f"**Class {cls}**: {sqlclasses[cls]}") | ||||
|             print(h1) | ||||
|             sqlclass = cls | ||||
| 
 | ||||
|  |  | |||
|  | @ -163,7 +163,7 @@ def _create_json_typecasters(oid, array_oid, loads=None, name='JSON'): | |||
| 
 | ||||
|     JSON = new_type((oid, ), name, typecast_json) | ||||
|     if array_oid is not None: | ||||
|         JSONARRAY = new_array_type((array_oid, ), "%sARRAY" % name, JSON) | ||||
|         JSONARRAY = new_array_type((array_oid, ), f"{name}ARRAY", JSON) | ||||
|     else: | ||||
|         JSONARRAY = None | ||||
| 
 | ||||
|  | @ -194,6 +194,6 @@ def _get_json_oids(conn_or_curs, name='json'): | |||
|         conn.rollback() | ||||
| 
 | ||||
|     if not r: | ||||
|         raise conn.ProgrammingError("%s data type not found" % name) | ||||
|         raise conn.ProgrammingError(f"{name} data type not found") | ||||
| 
 | ||||
|     return r | ||||
|  |  | |||
|  | @ -47,7 +47,7 @@ class Range: | |||
|     def __init__(self, lower=None, upper=None, bounds='[)', empty=False): | ||||
|         if not empty: | ||||
|             if bounds not in ('[)', '(]', '()', '[]'): | ||||
|                 raise ValueError("bound flags not valid: %r" % bounds) | ||||
|                 raise ValueError(f"bound flags not valid: {bounds!r}") | ||||
| 
 | ||||
|             self._lower = lower | ||||
|             self._upper = upper | ||||
|  | @ -57,7 +57,7 @@ class Range: | |||
| 
 | ||||
|     def __repr__(self): | ||||
|         if self._bounds is None: | ||||
|             return "%s(empty=True)" % self.__class__.__name__ | ||||
|             return f"{self.__class__.__name__}(empty=True)" | ||||
|         else: | ||||
|             return "{}({!r}, {!r}, {!r})".format(self.__class__.__name__, | ||||
|                 self._lower, self._upper, self._bounds) | ||||
|  | @ -391,7 +391,7 @@ where typname = %s and ns.nspname = %s; | |||
| 
 | ||||
|         if not rec: | ||||
|             raise ProgrammingError( | ||||
|                 "PostgreSQL type '%s' not found" % name) | ||||
|                 f"PostgreSQL type '{name}' not found") | ||||
| 
 | ||||
|         type, subtype, array = rec | ||||
| 
 | ||||
|  | @ -423,7 +423,7 @@ where typname = %s and ns.nspname = %s; | |||
| 
 | ||||
|         m = self._re_range.match(s) | ||||
|         if m is None: | ||||
|             raise InterfaceError("failed to parse range: '%s'" % s) | ||||
|             raise InterfaceError(f"failed to parse range: '{s}'") | ||||
| 
 | ||||
|         lower = m.group(3) | ||||
|         if lower is None: | ||||
|  | @ -503,8 +503,7 @@ class NumberRangeAdapter(RangeAdapter): | |||
|         else: | ||||
|             upper = '' | ||||
| 
 | ||||
|         return ("'{}{},{}{}'".format( | ||||
|             r._bounds[0], lower, upper, r._bounds[1])).encode('ascii') | ||||
|         return (f"'{r._bounds[0]}{lower},{upper}{r._bounds[1]}'").encode('ascii') | ||||
| 
 | ||||
| 
 | ||||
| # TODO: probably won't work with infs, nans and other tricky cases. | ||||
|  |  | |||
|  | @ -529,7 +529,7 @@ class ReplicationCursor(_replicationCursor): | |||
|     def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): | ||||
|         """Create streaming replication slot.""" | ||||
| 
 | ||||
|         command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self) | ||||
|         command = f"CREATE_REPLICATION_SLOT {quote_ident(slot_name, self)} " | ||||
| 
 | ||||
|         if slot_type is None: | ||||
|             slot_type = self.connection.replication_type | ||||
|  | @ -540,7 +540,7 @@ class ReplicationCursor(_replicationCursor): | |||
|                     "output plugin name is required to create " | ||||
|                     "logical replication slot") | ||||
| 
 | ||||
|             command += "LOGICAL %s" % quote_ident(output_plugin, self) | ||||
|             command += f"LOGICAL {quote_ident(output_plugin, self)}" | ||||
| 
 | ||||
|         elif slot_type == REPLICATION_PHYSICAL: | ||||
|             if output_plugin is not None: | ||||
|  | @ -552,14 +552,14 @@ class ReplicationCursor(_replicationCursor): | |||
| 
 | ||||
|         else: | ||||
|             raise psycopg2.ProgrammingError( | ||||
|                 "unrecognized replication type: %s" % repr(slot_type)) | ||||
|                 f"unrecognized replication type: {repr(slot_type)}") | ||||
| 
 | ||||
|         self.execute(command) | ||||
| 
 | ||||
|     def drop_replication_slot(self, slot_name): | ||||
|         """Drop streaming replication slot.""" | ||||
| 
 | ||||
|         command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self) | ||||
|         command = f"DROP_REPLICATION_SLOT {quote_ident(slot_name, self)}" | ||||
|         self.execute(command) | ||||
| 
 | ||||
|     def start_replication( | ||||
|  | @ -574,7 +574,7 @@ class ReplicationCursor(_replicationCursor): | |||
| 
 | ||||
|         if slot_type == REPLICATION_LOGICAL: | ||||
|             if slot_name: | ||||
|                 command += "SLOT %s " % quote_ident(slot_name, self) | ||||
|                 command += f"SLOT {quote_ident(slot_name, self)} " | ||||
|             else: | ||||
|                 raise psycopg2.ProgrammingError( | ||||
|                     "slot name is required for logical replication") | ||||
|  | @ -583,19 +583,18 @@ class ReplicationCursor(_replicationCursor): | |||
| 
 | ||||
|         elif slot_type == REPLICATION_PHYSICAL: | ||||
|             if slot_name: | ||||
|                 command += "SLOT %s " % quote_ident(slot_name, self) | ||||
|                 command += f"SLOT {quote_ident(slot_name, self)} " | ||||
|             # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX | ||||
| 
 | ||||
|         else: | ||||
|             raise psycopg2.ProgrammingError( | ||||
|                 "unrecognized replication type: %s" % repr(slot_type)) | ||||
|                 f"unrecognized replication type: {repr(slot_type)}") | ||||
| 
 | ||||
|         if type(start_lsn) is str: | ||||
|             lsn = start_lsn.split('/') | ||||
|             lsn = "{:X}/{:08X}".format(int(lsn[0], 16), int(lsn[1], 16)) | ||||
|             lsn = f"{int(lsn[0], 16):X}/{int(lsn[1], 16):08X}" | ||||
|         else: | ||||
|             lsn = "{:X}/{:08X}".format((start_lsn >> 32) & 0xFFFFFFFF, | ||||
|                                start_lsn & 0xFFFFFFFF) | ||||
|             lsn = f"{start_lsn >> 32 & 4294967295:X}/{start_lsn & 4294967295:08X}" | ||||
| 
 | ||||
|         command += lsn | ||||
| 
 | ||||
|  | @ -615,7 +614,7 @@ class ReplicationCursor(_replicationCursor): | |||
|             for k, v in options.items(): | ||||
|                 if not command.endswith('('): | ||||
|                     command += ", " | ||||
|                 command += "{} {}".format(quote_ident(k, self), _A(str(v))) | ||||
|                 command += f"{quote_ident(k, self)} {_A(str(v))}" | ||||
|             command += ")" | ||||
| 
 | ||||
|         self.start_replication_expert( | ||||
|  | @ -643,10 +642,10 @@ class UUID_adapter: | |||
|             return self | ||||
| 
 | ||||
|     def getquoted(self): | ||||
|         return ("'%s'::uuid" % self._uuid).encode('utf8') | ||||
|         return (f"'{self._uuid}'::uuid").encode('utf8') | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "'%s'::uuid" % self._uuid | ||||
|         return f"'{self._uuid}'::uuid" | ||||
| 
 | ||||
| 
 | ||||
| def register_uuid(oids=None, conn_or_curs=None): | ||||
|  | @ -768,7 +767,7 @@ def wait_select(conn): | |||
|             elif state == POLL_WRITE: | ||||
|                 select.select([], [conn.fileno()], []) | ||||
|             else: | ||||
|                 raise conn.OperationalError("bad state from poll: %s" % state) | ||||
|                 raise conn.OperationalError(f"bad state from poll: {state}") | ||||
|         except KeyboardInterrupt: | ||||
|             conn.cancel() | ||||
|             # the loop will be broken by a server error | ||||
|  | @ -909,12 +908,11 @@ class HstoreAdapter: | |||
|         rv0, rv1 = [], [] | ||||
| 
 | ||||
|         # get the oid for the hstore | ||||
|         curs.execute("""\ | ||||
| SELECT t.oid, %s | ||||
|         curs.execute(f"""SELECT t.oid, {typarray} | ||||
| FROM pg_type t JOIN pg_namespace ns | ||||
|     ON typnamespace = ns.oid | ||||
| WHERE typname = 'hstore'; | ||||
| """ % typarray) | ||||
| """) | ||||
|         for oids in curs: | ||||
|             rv0.append(oids[0]) | ||||
|             rv1.append(oids[1]) | ||||
|  | @ -1008,7 +1006,7 @@ class CompositeCaster: | |||
|         self.typecaster = _ext.new_type((oid,), name, self.parse) | ||||
|         if array_oid: | ||||
|             self.array_typecaster = _ext.new_array_type( | ||||
|                 (array_oid,), "%sARRAY" % name, self.typecaster) | ||||
|                 (array_oid,), f"{name}ARRAY", self.typecaster) | ||||
|         else: | ||||
|             self.array_typecaster = None | ||||
| 
 | ||||
|  | @ -1052,7 +1050,7 @@ class CompositeCaster: | |||
|         rv = [] | ||||
|         for m in self._re_tokenize.finditer(s): | ||||
|             if m is None: | ||||
|                 raise psycopg2.InterfaceError("can't parse type: %r" % s) | ||||
|                 raise psycopg2.InterfaceError(f"can't parse type: {s!r}") | ||||
|             if m.group(1) is not None: | ||||
|                 rv.append(None) | ||||
|             elif m.group(2) is not None: | ||||
|  | @ -1107,7 +1105,7 @@ ORDER BY attnum; | |||
| 
 | ||||
|         if not recs: | ||||
|             raise psycopg2.ProgrammingError( | ||||
|                 "PostgreSQL type '%s' not found" % name) | ||||
|                 f"PostgreSQL type '{name}' not found") | ||||
| 
 | ||||
|         type_oid = recs[0][0] | ||||
|         array_oid = recs[0][1] | ||||
|  |  | |||
							
								
								
									
										15
									
								
								lib/sql.py
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								lib/sql.py
									
									
									
									
									
								
							|  | @ -106,7 +106,7 @@ class Composed(Composable): | |||
|         for i in seq: | ||||
|             if not isinstance(i, Composable): | ||||
|                 raise TypeError( | ||||
|                     "Composed elements must be Composable, got %r instead" % i) | ||||
|                     f"Composed elements must be Composable, got {i!r} instead") | ||||
|             wrapped.append(i) | ||||
| 
 | ||||
|         super().__init__(wrapped) | ||||
|  | @ -344,9 +344,7 @@ class Identifier(Composable): | |||
|                 "the Identifier wraps more than one than one string") | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         return "{}({})".format( | ||||
|             self.__class__.__name__, | ||||
|             ', '.join(map(repr, self._wrapped))) | ||||
|         return f"{self.__class__.__name__}({', '.join(map(repr, self._wrapped))})" | ||||
| 
 | ||||
|     def as_string(self, context): | ||||
|         return '.'.join(ext.quote_ident(s, context) for s in self._wrapped) | ||||
|  | @ -427,10 +425,10 @@ class Placeholder(Composable): | |||
|     def __init__(self, name=None): | ||||
|         if isinstance(name, str): | ||||
|             if ')' in name: | ||||
|                 raise ValueError("invalid name: %r" % name) | ||||
|                 raise ValueError(f"invalid name: {name!r}") | ||||
| 
 | ||||
|         elif name is not None: | ||||
|             raise TypeError("expected string or None as name, got %r" % name) | ||||
|             raise TypeError(f"expected string or None as name, got {name!r}") | ||||
| 
 | ||||
|         super().__init__(name) | ||||
| 
 | ||||
|  | @ -440,12 +438,11 @@ class Placeholder(Composable): | |||
|         return self._wrapped | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         return "Placeholder({!r})".format( | ||||
|             self._wrapped if self._wrapped is not None else '') | ||||
|         return f"Placeholder({self._wrapped if self._wrapped is not None else ''!r})" | ||||
| 
 | ||||
|     def as_string(self, context): | ||||
|         if self._wrapped is not None: | ||||
|             return "%%(%s)s" % self._wrapped | ||||
|             return f"%{self._wrapped['%s']}" | ||||
|         else: | ||||
|             return "%s" | ||||
| 
 | ||||
|  |  | |||
|  | @ -439,10 +439,7 @@ def check_libpq_version(): | |||
|         .decode('ascii') | ||||
|         .rstrip() | ||||
|     ) | ||||
|     assert want_ver == got_ver, "libpq version mismatch: {!r} != {!r}".format( | ||||
|         want_ver, | ||||
|         got_ver, | ||||
|     ) | ||||
|     assert want_ver == got_ver, f"libpq version mismatch: {want_ver!r} != {got_ver!r}" | ||||
| 
 | ||||
| 
 | ||||
| def run_test_suite(): | ||||
|  | @ -684,7 +681,7 @@ def which(name): | |||
|             if os.path.isfile(fn): | ||||
|                 return fn | ||||
| 
 | ||||
|     raise Exception("couldn't find program on path: %s" % name) | ||||
|     raise Exception(f"couldn't find program on path: {name}") | ||||
| 
 | ||||
| 
 | ||||
| class Options: | ||||
|  | @ -846,7 +843,7 @@ class Options: | |||
|     def dist_dir(self): | ||||
|         """The directory where to build packages to distribute.""" | ||||
|         return ( | ||||
|             self.package_dir / 'dist' / ('psycopg2-%s' % self.package_version) | ||||
|             self.package_dir / 'dist' / (f'psycopg2-{self.package_version}') | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -25,7 +25,7 @@ from collections import defaultdict | |||
| 
 | ||||
| def main(): | ||||
|     if len(sys.argv) != 2: | ||||
|         print("usage: %s /path/to/errorcodes.py" % sys.argv[0], file=sys.stderr) | ||||
|         print(f"usage: {sys.argv[0]} /path/to/errorcodes.py", file=sys.stderr) | ||||
|         return 2 | ||||
| 
 | ||||
|     filename = sys.argv[1] | ||||
|  | @ -84,7 +84,7 @@ def parse_errors_txt(url): | |||
|             continue | ||||
| 
 | ||||
|         # We don't expect anything else | ||||
|         raise ValueError("unexpected line:\n%s" % line) | ||||
|         raise ValueError(f"unexpected line:\n{line}") | ||||
| 
 | ||||
|     return classes, errors | ||||
| 
 | ||||
|  | @ -101,9 +101,7 @@ def fetch_errors(versions): | |||
|     for version in versions: | ||||
|         print(version, file=sys.stderr) | ||||
|         tver = tuple(map(int, version.split()[0].split('.'))) | ||||
|         tag = '{}{}_STABLE'.format( | ||||
|             (tver[0] >= 10 and 'REL_' or 'REL'), | ||||
|             version.replace('.', '_')) | ||||
|         tag = f"{tver[0] >= 10 and 'REL_' or 'REL'}{version.replace('.', '_')}_STABLE" | ||||
|         c1, e1 = parse_errors_txt(errors_txt_url % tag) | ||||
|         classes.update(c1) | ||||
| 
 | ||||
|  | @ -141,11 +139,11 @@ def generate_module_data(classes, errors): | |||
| 
 | ||||
|     for clscode, clslabel in sorted(classes.items()): | ||||
|         yield "" | ||||
|         yield "# %s" % clslabel | ||||
|         yield f"# {clslabel}" | ||||
| 
 | ||||
|         for errcode, errlabel in sorted(errors[clscode].items()): | ||||
|             if errlabel in seen: | ||||
|                 raise Exception("error label already seen: %s" % errlabel) | ||||
|                 raise Exception(f"error label already seen: {errlabel}") | ||||
|             seen.add(errlabel) | ||||
|             yield f"{errlabel} = {errcode!r}" | ||||
| 
 | ||||
|  |  | |||
|  | @ -68,7 +68,7 @@ def parse_errors_txt(url): | |||
|             continue | ||||
| 
 | ||||
|         # We don't expect anything else | ||||
|         raise ValueError("unexpected line:\n%s" % line) | ||||
|         raise ValueError(f"unexpected line:\n{line}") | ||||
| 
 | ||||
|     return classes, errors | ||||
| 
 | ||||
|  | @ -85,9 +85,7 @@ def fetch_errors(versions): | |||
|     for version in versions: | ||||
|         print(version, file=sys.stderr) | ||||
|         tver = tuple(map(int, version.split()[0].split('.'))) | ||||
|         tag = '{}{}_STABLE'.format( | ||||
|             (tver[0] >= 10 and 'REL_' or 'REL'), | ||||
|             version.replace('.', '_')) | ||||
|         tag = f"{tver[0] >= 10 and 'REL_' or 'REL'}{version.replace('.', '_')}_STABLE" | ||||
|         c1, e1 = parse_errors_txt(errors_txt_url % tag) | ||||
|         classes.update(c1) | ||||
| 
 | ||||
|  | @ -118,7 +116,7 @@ def generate_module_data(classes, errors): | |||
|             # success and warning - never raised | ||||
|             continue | ||||
| 
 | ||||
|         yield "\n/* %s */" % clslabel | ||||
|         yield f"\n/* {clslabel} */" | ||||
| 
 | ||||
|         for errcode, errlabel in sorted(errors[clscode].items()): | ||||
|             if errcode in specific: | ||||
|  | @ -126,7 +124,7 @@ def generate_module_data(classes, errors): | |||
|             else: | ||||
|                 clsname = errlabel.title().replace('_', '') | ||||
|             if clsname in seen: | ||||
|                 raise Exception("class already existing: %s" % clsname) | ||||
|                 raise Exception(f"class already existing: {clsname}") | ||||
|             seen.add(clsname) | ||||
| 
 | ||||
|             yield tmpl % { | ||||
|  |  | |||
|  | @ -36,7 +36,7 @@ def main(): | |||
|     if opt.suite: | ||||
|         test = getattr(test, opt.suite) | ||||
| 
 | ||||
|     sys.stdout.write("test suite %s\n" % test.__name__) | ||||
|     sys.stdout.write(f"test suite {test.__name__}\n") | ||||
| 
 | ||||
|     for i in range(1, opt.nruns + 1): | ||||
|         sys.stdout.write("test suite run %d of %d\n" % (i, opt.nruns)) | ||||
|  |  | |||
							
								
								
									
										10
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								setup.py
									
									
									
									
									
								
							|  | @ -114,8 +114,7 @@ For further information please check the 'doc/src/install.rst' file (also at | |||
|                 stdout=subprocess.PIPE, | ||||
|                 stderr=subprocess.PIPE) | ||||
|         except OSError: | ||||
|             raise Warning("Unable to find 'pg_config' file in '%s'" % | ||||
|                           self.pg_config_exe) | ||||
|             raise Warning(f"Unable to find 'pg_config' file in '{self.pg_config_exe}'") | ||||
|         pg_config_process.stdin.close() | ||||
|         result = pg_config_process.stdout.readline().strip() | ||||
|         if not result: | ||||
|  | @ -396,8 +395,7 @@ For further information please check the 'doc/src/install.rst' file (also at | |||
|                 pgpatch = int(pgpatch) | ||||
|             else: | ||||
|                 sys.stderr.write( | ||||
|                     "Error: could not determine PostgreSQL version from '%s'" | ||||
|                     % pgversion) | ||||
|                     f"Error: could not determine PostgreSQL version from '{pgversion}'") | ||||
|                 sys.exit(1) | ||||
| 
 | ||||
|             define_macros.append(("PG_VERSION_NUM", "%d%02d%02d" % | ||||
|  | @ -419,7 +417,7 @@ For further information please check the 'doc/src/install.rst' file (also at | |||
| 
 | ||||
|         except Warning: | ||||
|             w = sys.exc_info()[1]  # work around py 2/3 different syntax | ||||
|             sys.stderr.write("Error: %s\n" % w) | ||||
|             sys.stderr.write(f"Error: {w}\n") | ||||
|             sys.exit(1) | ||||
| 
 | ||||
|         if hasattr(self, "finalize_" + sys.platform): | ||||
|  | @ -512,7 +510,7 @@ version_flags.append('pq3')     # no more a choice | |||
| version_flags.append('ext')     # no more a choice | ||||
| 
 | ||||
| if version_flags: | ||||
|     PSYCOPG_VERSION_EX = PSYCOPG_VERSION + " (%s)" % ' '.join(version_flags) | ||||
|     PSYCOPG_VERSION_EX = PSYCOPG_VERSION + f" ({' '.join(version_flags)})" | ||||
| else: | ||||
|     PSYCOPG_VERSION_EX = PSYCOPG_VERSION | ||||
| 
 | ||||
|  |  | |||
|  | @ -101,10 +101,10 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|     connect_kw_args = {} # Keyword arguments for connect | ||||
|     table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables | ||||
| 
 | ||||
|     ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix | ||||
|     ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix | ||||
|     xddl1 = 'drop table %sbooze' % table_prefix | ||||
|     xddl2 = 'drop table %sbarflys' % table_prefix | ||||
|     ddl1 = f'create table {table_prefix}booze (name varchar(20))' | ||||
|     ddl2 = f'create table {table_prefix}barflys (name varchar(20))' | ||||
|     xddl1 = f'drop table {table_prefix}booze' | ||||
|     xddl2 = f'drop table {table_prefix}barflys' | ||||
| 
 | ||||
|     lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase | ||||
| 
 | ||||
|  | @ -265,7 +265,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             cur1.execute("insert into %sbooze values ('Victoria Bitter')" % ( | ||||
|                 self.table_prefix | ||||
|                 )) | ||||
|             cur2.execute("select name from %sbooze" % self.table_prefix) | ||||
|             cur2.execute(f"select name from {self.table_prefix}booze") | ||||
|             booze = cur2.fetchall() | ||||
|             self.assertEqual(len(booze),1) | ||||
|             self.assertEqual(len(booze[0]),1) | ||||
|  | @ -282,7 +282,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|                 'cursor.description should be none after executing a ' | ||||
|                 'statement that can return no rows (such as DDL)' | ||||
|                 ) | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             self.assertEqual(len(cur.description),1, | ||||
|                 'cursor.description describes too many columns' | ||||
|                 ) | ||||
|  | @ -322,7 +322,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|                 'cursor.rowcount should == number or rows inserted, or ' | ||||
|                 'set to -1 after executing an insert statement' | ||||
|                 ) | ||||
|             cur.execute("select name from %sbooze" % self.table_prefix) | ||||
|             cur.execute(f"select name from {self.table_prefix}booze") | ||||
|             self.failUnless(cur.rowcount in (-1,1), | ||||
|                 'cursor.rowcount should == number of rows returned, or ' | ||||
|                 'set to -1 after executing a select statement' | ||||
|  | @ -385,24 +385,22 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
| 
 | ||||
|     def _paraminsert(self,cur): | ||||
|         self.executeDDL1(cur) | ||||
|         cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( | ||||
|             self.table_prefix | ||||
|             )) | ||||
|         cur.execute(f"insert into {self.table_prefix}booze values ('Victoria Bitter')") | ||||
|         self.failUnless(cur.rowcount in (-1,1)) | ||||
| 
 | ||||
|         if self.driver.paramstyle == 'qmark': | ||||
|             cur.execute( | ||||
|                 'insert into %sbooze values (?)' % self.table_prefix, | ||||
|                 f'insert into {self.table_prefix}booze values (?)', | ||||
|                 ("Cooper's",) | ||||
|                 ) | ||||
|         elif self.driver.paramstyle == 'numeric': | ||||
|             cur.execute( | ||||
|                 'insert into %sbooze values (:1)' % self.table_prefix, | ||||
|                 f'insert into {self.table_prefix}booze values (:1)', | ||||
|                 ("Cooper's",) | ||||
|                 ) | ||||
|         elif self.driver.paramstyle == 'named': | ||||
|             cur.execute( | ||||
|                 'insert into %sbooze values (:beer)' % self.table_prefix, | ||||
|                 f'insert into {self.table_prefix}booze values (:beer)', | ||||
|                 {'beer':"Cooper's"} | ||||
|                 ) | ||||
|         elif self.driver.paramstyle == 'format': | ||||
|  | @ -412,14 +410,14 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|                 ) | ||||
|         elif self.driver.paramstyle == 'pyformat': | ||||
|             cur.execute( | ||||
|                 'insert into %sbooze values (%%(beer)s)' % self.table_prefix, | ||||
|                 f"insert into %sbooze values (%{self.table_prefix['beer']})", | ||||
|                 {'beer':"Cooper's"} | ||||
|                 ) | ||||
|         else: | ||||
|             self.fail('Invalid paramstyle') | ||||
|         self.failUnless(cur.rowcount in (-1,1)) | ||||
| 
 | ||||
|         cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|         cur.execute(f'select name from {self.table_prefix}booze') | ||||
|         res = cur.fetchall() | ||||
|         self.assertEqual(len(res),2,'cursor.fetchall returned too few rows') | ||||
|         beers = [res[0][0],res[1][0]] | ||||
|  | @ -442,17 +440,17 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ] | ||||
|             if self.driver.paramstyle == 'qmark': | ||||
|                 cur.executemany( | ||||
|                     'insert into %sbooze values (?)' % self.table_prefix, | ||||
|                     f'insert into {self.table_prefix}booze values (?)', | ||||
|                     largs | ||||
|                     ) | ||||
|             elif self.driver.paramstyle == 'numeric': | ||||
|                 cur.executemany( | ||||
|                     'insert into %sbooze values (:1)' % self.table_prefix, | ||||
|                     f'insert into {self.table_prefix}booze values (:1)', | ||||
|                     largs | ||||
|                     ) | ||||
|             elif self.driver.paramstyle == 'named': | ||||
|                 cur.executemany( | ||||
|                     'insert into %sbooze values (:beer)' % self.table_prefix, | ||||
|                     f'insert into {self.table_prefix}booze values (:beer)', | ||||
|                     margs | ||||
|                     ) | ||||
|             elif self.driver.paramstyle == 'format': | ||||
|  | @ -462,9 +460,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|                     ) | ||||
|             elif self.driver.paramstyle == 'pyformat': | ||||
|                 cur.executemany( | ||||
|                     'insert into %sbooze values (%%(beer)s)' % ( | ||||
|                         self.table_prefix | ||||
|                         ), | ||||
|                     f"insert into %sbooze values (%{self.table_prefix['beer']})", | ||||
|                     margs | ||||
|                     ) | ||||
|             else: | ||||
|  | @ -473,7 +469,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|                 'insert using cursor.executemany set cursor.rowcount to ' | ||||
|                 'incorrect value %r' % cur.rowcount | ||||
|                 ) | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             res = cur.fetchall() | ||||
|             self.assertEqual(len(res),2, | ||||
|                 'cursor.fetchall retrieved incorrect number of rows' | ||||
|  | @ -499,7 +495,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             self.executeDDL1(cur) | ||||
|             self.assertRaises(self.driver.Error,cur.fetchone) | ||||
| 
 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             self.assertEqual(cur.fetchone(),None, | ||||
|                 'cursor.fetchone should return None if a query retrieves ' | ||||
|                 'no rows' | ||||
|  | @ -513,7 +509,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|                 )) | ||||
|             self.assertRaises(self.driver.Error,cur.fetchone) | ||||
| 
 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             r = cur.fetchone() | ||||
|             self.assertEqual(len(r),1, | ||||
|                 'cursor.fetchone should have retrieved a single row' | ||||
|  | @ -560,7 +556,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             for sql in self._populate(): | ||||
|                 cur.execute(sql) | ||||
| 
 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             r = cur.fetchmany() | ||||
|             self.assertEqual(len(r),1, | ||||
|                 'cursor.fetchmany retrieved incorrect number of rows, ' | ||||
|  | @ -584,7 +580,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
| 
 | ||||
|             # Same as above, using cursor.arraysize | ||||
|             cur.arraysize=4 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             r = cur.fetchmany() # Should get 4 rows | ||||
|             self.assertEqual(len(r),4, | ||||
|                 'cursor.arraysize not being honoured by fetchmany' | ||||
|  | @ -596,7 +592,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             self.failUnless(cur.rowcount in (-1,6)) | ||||
| 
 | ||||
|             cur.arraysize=6 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             rows = cur.fetchmany() # Should get all rows | ||||
|             self.failUnless(cur.rowcount in (-1,6)) | ||||
|             self.assertEqual(len(rows),6) | ||||
|  | @ -618,7 +614,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             self.failUnless(cur.rowcount in (-1,6)) | ||||
| 
 | ||||
|             self.executeDDL2(cur) | ||||
|             cur.execute('select name from %sbarflys' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}barflys') | ||||
|             r = cur.fetchmany() # Should get empty sequence | ||||
|             self.assertEqual(len(r),0, | ||||
|                 'cursor.fetchmany should return an empty sequence if ' | ||||
|  | @ -646,7 +642,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             # after executing a a statement that cannot return rows | ||||
|             self.assertRaises(self.driver.Error,cur.fetchall) | ||||
| 
 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             rows = cur.fetchall() | ||||
|             self.failUnless(cur.rowcount in (-1,len(self.samples))) | ||||
|             self.assertEqual(len(rows),len(self.samples), | ||||
|  | @ -667,7 +663,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             self.failUnless(cur.rowcount in (-1,len(self.samples))) | ||||
| 
 | ||||
|             self.executeDDL2(cur) | ||||
|             cur.execute('select name from %sbarflys' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}barflys') | ||||
|             rows = cur.fetchall() | ||||
|             self.failUnless(cur.rowcount in (-1,0)) | ||||
|             self.assertEqual(len(rows),0, | ||||
|  | @ -686,7 +682,7 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|             for sql in self._populate(): | ||||
|                 cur.execute(sql) | ||||
| 
 | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             rows1  = cur.fetchone() | ||||
|             rows23 = cur.fetchmany(2) | ||||
|             rows4  = cur.fetchone() | ||||
|  | @ -803,8 +799,8 @@ class DatabaseAPI20Test(unittest.TestCase): | |||
|         try: | ||||
|             cur = con.cursor() | ||||
|             self.executeDDL1(cur) | ||||
|             cur.execute('insert into %sbooze values (NULL)' % self.table_prefix) | ||||
|             cur.execute('select name from %sbooze' % self.table_prefix) | ||||
|             cur.execute(f'insert into {self.table_prefix}booze values (NULL)') | ||||
|             cur.execute(f'select name from {self.table_prefix}booze') | ||||
|             r = cur.fetchall() | ||||
|             self.assertEqual(len(r),1) | ||||
|             self.assertEqual(len(r[0]),1) | ||||
|  |  | |||
|  | @ -383,8 +383,7 @@ class ConnectionTests(ConnectingTestCase): | |||
|         dir = tempfile.mkdtemp() | ||||
|         try: | ||||
|             with open(os.path.join(dir, "mptest.py"), 'w') as f: | ||||
|                 f.write("""\ | ||||
| import time | ||||
|                 f.write(f"""import time | ||||
| import psycopg2 | ||||
| 
 | ||||
| def thread(): | ||||
|  | @ -396,7 +395,7 @@ def thread(): | |||
| 
 | ||||
| def process(): | ||||
|     time.sleep(0.2) | ||||
| """.format(dsn=dsn)) | ||||
| """) | ||||
| 
 | ||||
|             script = ("""\ | ||||
| import sys | ||||
|  | @ -1732,8 +1731,7 @@ class SignalTestCase(ConnectingTestCase): | |||
|             """) | ||||
| 
 | ||||
|     def _test_bug_551(self, query): | ||||
|         script = ("""\ | ||||
| import os | ||||
|         script = f"""import os | ||||
| import sys | ||||
| import time | ||||
| import signal | ||||
|  | @ -1766,7 +1764,7 @@ t.start() | |||
| 
 | ||||
| while True: | ||||
|     cur.execute({query!r}, ("Hello, world!",)) | ||||
| """.format(dsn=dsn, query=query)) | ||||
| """ | ||||
| 
 | ||||
|         proc = sp.Popen([sys.executable, '-c', script], | ||||
|             stdout=sp.PIPE, stderr=sp.PIPE) | ||||
|  |  | |||
|  | @ -317,8 +317,7 @@ class CopyTests(ConnectingTestCase): | |||
|     @slow | ||||
|     def test_copy_from_segfault(self): | ||||
|         # issue #219 | ||||
|         script = ("""\ | ||||
| import psycopg2 | ||||
|         script = f"""import psycopg2 | ||||
| conn = psycopg2.connect({dsn!r}) | ||||
| curs = conn.cursor() | ||||
| curs.execute("create table copy_segf (id int)") | ||||
|  | @ -327,7 +326,7 @@ try: | |||
| except psycopg2.ProgrammingError: | ||||
|     pass | ||||
| conn.close() | ||||
| """.format(dsn=dsn)) | ||||
| """ | ||||
| 
 | ||||
|         proc = Popen([sys.executable, '-c', script]) | ||||
|         proc.communicate() | ||||
|  | @ -336,8 +335,7 @@ conn.close() | |||
|     @slow | ||||
|     def test_copy_to_segfault(self): | ||||
|         # issue #219 | ||||
|         script = ("""\ | ||||
| import psycopg2 | ||||
|         script = f"""import psycopg2 | ||||
| conn = psycopg2.connect({dsn!r}) | ||||
| curs = conn.cursor() | ||||
| curs.execute("create table copy_segf (id int)") | ||||
|  | @ -346,7 +344,7 @@ try: | |||
| except psycopg2.ProgrammingError: | ||||
|     pass | ||||
| conn.close() | ||||
| """.format(dsn=dsn)) | ||||
| """ | ||||
| 
 | ||||
|         proc = Popen([sys.executable, '-c', script], stdout=PIPE) | ||||
|         proc.communicate() | ||||
|  |  | |||
|  | @ -88,21 +88,21 @@ class CursorTests(ConnectingTestCase): | |||
|                 return s | ||||
| 
 | ||||
|         # unicode query with non-ascii data | ||||
|         cur.execute("SELECT '%s';" % snowman) | ||||
|         cur.execute(f"SELECT '{snowman}';") | ||||
|         self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0])) | ||||
|         self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'), | ||||
|             cur.mogrify("SELECT '%s';" % snowman)) | ||||
|         self.assertQuotedEqual(f"SELECT '{snowman}';".encode('utf8'), | ||||
|             cur.mogrify(f"SELECT '{snowman}';")) | ||||
| 
 | ||||
|         # unicode args | ||||
|         cur.execute("SELECT %s;", (snowman,)) | ||||
|         self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0])) | ||||
|         self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'), | ||||
|         self.assertQuotedEqual(f"SELECT '{snowman}';".encode('utf8'), | ||||
|             cur.mogrify("SELECT %s;", (snowman,))) | ||||
| 
 | ||||
|         # unicode query and args | ||||
|         cur.execute("SELECT %s;", (snowman,)) | ||||
|         self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0])) | ||||
|         self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'), | ||||
|         self.assertQuotedEqual(f"SELECT '{snowman}';".encode('utf8'), | ||||
|             cur.mogrify("SELECT %s;", (snowman,))) | ||||
| 
 | ||||
|     def test_mogrify_decimal_explodes(self): | ||||
|  | @ -282,12 +282,12 @@ class CursorTests(ConnectingTestCase): | |||
|         cur = self.conn.cursor() | ||||
| 
 | ||||
|         # Set up the temporary function | ||||
|         cur.execute(''' | ||||
|             CREATE FUNCTION {}({} INT) | ||||
|         cur.execute(f''' | ||||
|             CREATE FUNCTION {procname}({escaped_paramname} INT) | ||||
|             RETURNS INT AS | ||||
|                 'SELECT $1 * $1' | ||||
|             LANGUAGE SQL | ||||
|         '''.format(procname, escaped_paramname)) | ||||
|         ''') | ||||
| 
 | ||||
|         # Make sure callproc works right | ||||
|         cur.callproc(procname, {paramname: 2}) | ||||
|  | @ -573,8 +573,7 @@ class NamedCursorTests(ConnectingTestCase): | |||
|         time.sleep(0.2) | ||||
|         t2 = next(i)[0] | ||||
|         self.assert_((t2 - t1).microseconds * 1e-6 < 0.1, | ||||
|             "named cursor records fetched in 2 roundtrips (delta: %s)" | ||||
|             % (t2 - t1)) | ||||
|             f"named cursor records fetched in 2 roundtrips (delta: {t2 - t1})") | ||||
| 
 | ||||
|     @skip_before_postgres(8, 0) | ||||
|     def test_iter_named_cursor_default_itersize(self): | ||||
|  |  | |||
|  | @ -379,7 +379,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): | |||
|             cur) | ||||
| 
 | ||||
|         def f(val): | ||||
|             cur.execute("select '%s'::text" % val) | ||||
|             cur.execute(f"select '{val}'::text") | ||||
|             return cur.fetchone()[0] | ||||
| 
 | ||||
|         self.assertRaises(OverflowError, f, '100000000000000000:00:00') | ||||
|  |  | |||
|  | @ -621,7 +621,7 @@ class NamedTupleCursorTest(ConnectingTestCase): | |||
|             recs = [] | ||||
|             curs = self.conn.cursor() | ||||
|             for i in range(10): | ||||
|                 curs.execute("select 1 as f%s" % i) | ||||
|                 curs.execute(f"select 1 as f{i}") | ||||
|                 recs.append(curs.fetchone()) | ||||
| 
 | ||||
|             # Still in cache | ||||
|  |  | |||
|  | @ -137,7 +137,7 @@ class GreenTestCase(ConnectingTestCase): | |||
|                 elif state == POLL_WRITE: | ||||
|                     select.select([], [conn.fileno()], [], 0.1) | ||||
|                 else: | ||||
|                     raise conn.OperationalError("bad state from poll: %s" % state) | ||||
|                     raise conn.OperationalError(f"bad state from poll: {state}") | ||||
| 
 | ||||
|         stub = self.set_stub_wait_callback(self.conn, wait) | ||||
|         cur = self.conn.cursor() | ||||
|  | @ -182,7 +182,7 @@ class CallbackErrorTestCase(ConnectingTestCase): | |||
|                 elif state == POLL_WRITE: | ||||
|                     select.select([], [conn.fileno()], []) | ||||
|                 else: | ||||
|                     raise conn.OperationalError("bad state from poll: %s" % state) | ||||
|                     raise conn.OperationalError(f"bad state from poll: {state}") | ||||
|             except KeyboardInterrupt: | ||||
|                 conn.cancel() | ||||
|                 # the loop will be broken by a server error | ||||
|  |  | |||
|  | @ -207,7 +207,7 @@ class LargeObjectTests(LargeObjectTestCase): | |||
|         data1 = lo.read() | ||||
|         # avoid dumping megacraps in the console in case of error | ||||
|         self.assert_(data == data1, | ||||
|             "{!r}... != {!r}...".format(data[:100], data1[:100])) | ||||
|             f"{data[:100]!r}... != {data1[:100]!r}...") | ||||
| 
 | ||||
|     def test_seek_tell(self): | ||||
|         lo = self.conn.lobject() | ||||
|  |  | |||
|  | @ -120,7 +120,7 @@ class ConnectTestCase(unittest.TestCase): | |||
| 
 | ||||
|     def test_int_port_param(self): | ||||
|         psycopg2.connect(database='sony', port=6543) | ||||
|         dsn = " %s " % self.args[0] | ||||
|         dsn = f" {self.args[0]} " | ||||
|         self.assert_(" dbname=sony " in dsn, dsn) | ||||
|         self.assert_(" port=6543 " in dsn, dsn) | ||||
| 
 | ||||
|  | @ -327,12 +327,12 @@ class TestExtensionModule(unittest.TestCase): | |||
|         pkgdir = os.path.dirname(psycopg2.__file__) | ||||
|         pardir = os.path.dirname(pkgdir) | ||||
|         self.assert_(pardir in sys.path) | ||||
|         script = (""" | ||||
|         script = f""" | ||||
| import sys | ||||
| sys.path.remove({!r}) | ||||
| sys.path.insert(0, {!r}) | ||||
| sys.path.remove({pardir!r}) | ||||
| sys.path.insert(0, {pkgdir!r}) | ||||
| import _psycopg | ||||
| """.format(pardir, pkgdir)) | ||||
| """ | ||||
| 
 | ||||
|         proc = Popen([sys.executable, '-c', script]) | ||||
|         proc.communicate() | ||||
|  |  | |||
|  | @ -56,7 +56,7 @@ class NotifiesTests(ConnectingTestCase): | |||
|         if payload is None: | ||||
|             payload = '' | ||||
|         else: | ||||
|             payload = ", %r" % payload | ||||
|             payload = f", {payload!r}" | ||||
| 
 | ||||
|         script = ("""\ | ||||
| import time | ||||
|  |  | |||
|  | @ -98,8 +98,7 @@ class QuotingTestCase(ConnectingTestCase): | |||
|         server_encoding = curs.fetchone()[0] | ||||
|         if server_encoding != "UTF8": | ||||
|             return self.skipTest( | ||||
|                 "Unicode test skipped since server encoding is %s" | ||||
|                 % server_encoding) | ||||
|                 f"Unicode test skipped since server encoding is {server_encoding}") | ||||
| 
 | ||||
|         data = """some data with \t chars | ||||
|         to escape into, 'quotes', \u20ac euro sign and \\ a backslash too. | ||||
|  |  | |||
|  | @ -244,9 +244,9 @@ class AsyncReplicationTest(ReplicationTestCase): | |||
| 
 | ||||
|         def consume(msg): | ||||
|             # just check the methods | ||||
|             "{}: {}".format(cur.io_timestamp, repr(msg)) | ||||
|             "{}: {}".format(cur.feedback_timestamp, repr(msg)) | ||||
|             "{}: {}".format(cur.wal_end, repr(msg)) | ||||
|             f"{cur.io_timestamp}: {repr(msg)}" | ||||
|             f"{cur.feedback_timestamp}: {repr(msg)}" | ||||
|             f"{cur.wal_end}: {repr(msg)}" | ||||
| 
 | ||||
|             self.msg_count += 1 | ||||
|             if self.msg_count > 3: | ||||
|  |  | |||
|  | @ -711,7 +711,7 @@ class AdaptTypeTestCase(ConnectingTestCase): | |||
|     def _create_type(self, name, fields): | ||||
|         curs = self.conn.cursor() | ||||
|         try: | ||||
|             curs.execute("drop type %s cascade;" % name) | ||||
|             curs.execute(f"drop type {name} cascade;") | ||||
|         except psycopg2.ProgrammingError: | ||||
|             self.conn.rollback() | ||||
| 
 | ||||
|  | @ -1300,14 +1300,14 @@ class RangeCasterTestCase(ConnectingTestCase): | |||
|     def test_cast_null(self): | ||||
|         cur = self.conn.cursor() | ||||
|         for type in self.builtin_ranges: | ||||
|             cur.execute("select NULL::%s" % type) | ||||
|             cur.execute(f"select NULL::{type}") | ||||
|             r = cur.fetchone()[0] | ||||
|             self.assertEqual(r, None) | ||||
| 
 | ||||
|     def test_cast_empty(self): | ||||
|         cur = self.conn.cursor() | ||||
|         for type in self.builtin_ranges: | ||||
|             cur.execute("select 'empty'::%s" % type) | ||||
|             cur.execute(f"select 'empty'::{type}") | ||||
|             r = cur.fetchone()[0] | ||||
|             self.assert_(isinstance(r, Range), type) | ||||
|             self.assert_(r.isempty) | ||||
|  | @ -1315,7 +1315,7 @@ class RangeCasterTestCase(ConnectingTestCase): | |||
|     def test_cast_inf(self): | ||||
|         cur = self.conn.cursor() | ||||
|         for type in self.builtin_ranges: | ||||
|             cur.execute("select '(,)'::%s" % type) | ||||
|             cur.execute(f"select '(,)'::{type}") | ||||
|             r = cur.fetchone()[0] | ||||
|             self.assert_(isinstance(r, Range), type) | ||||
|             self.assert_(not r.isempty) | ||||
|  | @ -1325,7 +1325,7 @@ class RangeCasterTestCase(ConnectingTestCase): | |||
|     def test_cast_numbers(self): | ||||
|         cur = self.conn.cursor() | ||||
|         for type in ('int4range', 'int8range'): | ||||
|             cur.execute("select '(10,20)'::%s" % type) | ||||
|             cur.execute(f"select '(10,20)'::{type}") | ||||
|             r = cur.fetchone()[0] | ||||
|             self.assert_(isinstance(r, NumericRange)) | ||||
|             self.assert_(not r.isempty) | ||||
|  |  | |||
|  | @ -23,15 +23,15 @@ if green: | |||
|     psycopg2.extensions.set_wait_callback(wait_callback) | ||||
| 
 | ||||
| # Construct a DSN to connect to the test database: | ||||
| dsn = 'dbname=%s' % dbname | ||||
| dsn = f'dbname={dbname}' | ||||
| if dbhost is not None: | ||||
|     dsn += ' host=%s' % dbhost | ||||
|     dsn += f' host={dbhost}' | ||||
| if dbport is not None: | ||||
|     dsn += ' port=%s' % dbport | ||||
|     dsn += f' port={dbport}' | ||||
| if dbuser is not None: | ||||
|     dsn += ' user=%s' % dbuser | ||||
|     dsn += f' user={dbuser}' | ||||
| if dbpass is not None: | ||||
|     dsn += ' password=%s' % dbpass | ||||
|     dsn += f' password={dbpass}' | ||||
| 
 | ||||
| # Don't run replication tests if REPL_DSN is not set, default to normal DSN if | ||||
| # set to empty string. | ||||
|  |  | |||
|  | @ -100,8 +100,7 @@ class ConnectingTestCase(unittest.TestCase): | |||
|             self._conns | ||||
|         except AttributeError as e: | ||||
|             raise AttributeError( | ||||
|                 "%s (did you forget to call ConnectingTestCase.setUp()?)" | ||||
|                 % e) | ||||
|                 f"{e} (did you forget to call ConnectingTestCase.setUp()?)") | ||||
| 
 | ||||
|         if 'dsn' in kwargs: | ||||
|             conninfo = kwargs.pop('dsn') | ||||
|  | @ -134,7 +133,7 @@ class ConnectingTestCase(unittest.TestCase): | |||
|             # Otherwise we tried to run some bad operation in the connection | ||||
|             # (e.g. bug #482) and we'd rather know that. | ||||
|             if e.pgcode is None: | ||||
|                 return self.skipTest("replication db not configured: %s" % e) | ||||
|                 return self.skipTest(f"replication db not configured: {e}") | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|  | @ -324,7 +323,7 @@ def skip_after_libpq(*ver): | |||
|         v = libpq_version() | ||||
|         decorator = unittest.skipIf( | ||||
|             v >= int("%d%02d%02d" % ver), | ||||
|             "skipped because libpq %s" % v, | ||||
|             f"skipped because libpq {v}", | ||||
|         ) | ||||
|         return decorator(cls) | ||||
|     return skip_after_libpq_ | ||||
|  | @ -335,8 +334,7 @@ def skip_before_python(*ver): | |||
|     def skip_before_python_(cls): | ||||
|         decorator = unittest.skipIf( | ||||
|             sys.version_info[:len(ver)] < ver, | ||||
|             "skipped because Python %s" | ||||
|             % ".".join(map(str, sys.version_info[:len(ver)])), | ||||
|             f"skipped because Python {'.'.join(map(str, sys.version_info[:len(ver)]))}", | ||||
|         ) | ||||
|         return decorator(cls) | ||||
|     return skip_before_python_ | ||||
|  | @ -347,8 +345,7 @@ def skip_from_python(*ver): | |||
|     def skip_from_python_(cls): | ||||
|         decorator = unittest.skipIf( | ||||
|             sys.version_info[:len(ver)] >= ver, | ||||
|             "skipped because Python %s" | ||||
|             % ".".join(map(str, sys.version_info[:len(ver)])), | ||||
|             f"skipped because Python {'.'.join(map(str, sys.version_info[:len(ver)]))}", | ||||
|         ) | ||||
|         return decorator(cls) | ||||
|     return skip_from_python_ | ||||
|  | @ -415,7 +412,7 @@ def crdb_version(conn, __crdb_version=[]): | |||
|         m = re.search(r"\bv(\d+)\.(\d+)\.(\d+)", sver) | ||||
|         if not m: | ||||
|             raise ValueError( | ||||
|                 "can't parse CockroachDB version from %s" % sver) | ||||
|                 f"can't parse CockroachDB version from {sver}") | ||||
| 
 | ||||
|         ver = int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3)) | ||||
|         __crdb_version.append(ver) | ||||
|  | @ -439,7 +436,7 @@ def skip_if_crdb(reason, conn=None, version=None): | |||
| 
 | ||||
|     """ | ||||
|     if not isinstance(reason, str): | ||||
|         raise TypeError("reason should be a string, got %r instead" % reason) | ||||
|         raise TypeError(f"reason should be a string, got {reason!r} instead") | ||||
| 
 | ||||
|     if conn is not None: | ||||
|         ver = crdb_version(conn) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	Block a user