mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-01-31 09:24:07 +03:00
Several tests ported to Python 3.
This commit is contained in:
parent
2fa9117835
commit
b78ff4a273
|
@ -3,6 +3,7 @@
|
|||
import unittest
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
from psycopg2.extensions import b
|
||||
from testconfig import dsn
|
||||
|
||||
class CursorTests(unittest.TestCase):
|
||||
|
@ -32,28 +33,28 @@ class CursorTests(unittest.TestCase):
|
|||
# unicode query containing only ascii data
|
||||
cur.execute(u"SELECT 'foo';")
|
||||
self.assertEqual('foo', cur.fetchone()[0])
|
||||
self.assertEqual("SELECT 'foo';", cur.mogrify(u"SELECT 'foo';"))
|
||||
self.assertEqual(b("SELECT 'foo';"), cur.mogrify(u"SELECT 'foo';"))
|
||||
|
||||
conn.set_client_encoding('UTF8')
|
||||
snowman = u"\u2603"
|
||||
|
||||
# unicode query with non-ascii data
|
||||
cur.execute(u"SELECT '%s';" % snowman)
|
||||
self.assertEqual(snowman.encode('utf8'), cur.fetchone()[0])
|
||||
self.assertEqual("SELECT '%s';" % snowman.encode('utf8'),
|
||||
cur.mogrify(u"SELECT '%s';" % snowman).replace("E'", "'"))
|
||||
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
|
||||
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
|
||||
cur.mogrify(u"SELECT '%s';" % snowman).replace(b("E'"), b("'")))
|
||||
|
||||
# unicode args
|
||||
cur.execute("SELECT %s;", (snowman,))
|
||||
self.assertEqual(snowman.encode("utf-8"), cur.fetchone()[0])
|
||||
self.assertEqual("SELECT '%s';" % snowman.encode('utf8'),
|
||||
cur.mogrify("SELECT %s;", (snowman,)).replace("E'", "'"))
|
||||
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
|
||||
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
|
||||
cur.mogrify("SELECT %s;", (snowman,)).replace(b("E'"), b("'")))
|
||||
|
||||
# unicode query and args
|
||||
cur.execute(u"SELECT %s;", (snowman,))
|
||||
self.assertEqual(snowman.encode("utf-8"), cur.fetchone()[0])
|
||||
self.assertEqual("SELECT '%s';" % snowman.encode('utf8'),
|
||||
cur.mogrify(u"SELECT %s;", (snowman,)).replace("E'", "'"))
|
||||
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
|
||||
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
|
||||
cur.mogrify(u"SELECT %s;", (snowman,)).replace(b("E'"), b("'")))
|
||||
|
||||
def test_mogrify_decimal_explodes(self):
|
||||
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2
|
||||
|
@ -64,7 +65,7 @@ class CursorTests(unittest.TestCase):
|
|||
|
||||
conn = self.conn
|
||||
cur = conn.cursor()
|
||||
self.assertEqual('SELECT 10.3;',
|
||||
self.assertEqual(b('SELECT 10.3;'),
|
||||
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
|
||||
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import tempfile
|
|||
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
from psycopg2.extensions import b
|
||||
from testconfig import dsn, green
|
||||
from testutils import unittest, decorate_all_tests
|
||||
|
||||
|
@ -72,7 +73,7 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
lo = self.conn.lobject()
|
||||
lo2 = self.conn.lobject(lo.oid, "w")
|
||||
self.assertEqual(lo2.mode, "w")
|
||||
lo2.write("some data")
|
||||
lo2.write(b("some data"))
|
||||
|
||||
def test_open_mode_n(self):
|
||||
# Openning an object in mode "n" gives us a closed lobject.
|
||||
|
@ -103,11 +104,11 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
self.tmpdir = tempfile.mkdtemp()
|
||||
filename = os.path.join(self.tmpdir, "data.txt")
|
||||
fp = open(filename, "wb")
|
||||
fp.write("some data")
|
||||
fp.write(b("some data"))
|
||||
fp.close()
|
||||
|
||||
lo = self.conn.lobject(0, "r", 0, filename)
|
||||
self.assertEqual(lo.read(), "some data")
|
||||
self.assertEqual(lo.read(), b("some data"))
|
||||
|
||||
def test_close(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -117,7 +118,7 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
|
||||
def test_write(self):
|
||||
lo = self.conn.lobject()
|
||||
self.assertEqual(lo.write("some data"), len("some data"))
|
||||
self.assertEqual(lo.write(b("some data")), len("some data"))
|
||||
|
||||
def test_write_large(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -126,43 +127,43 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
|
||||
def test_read(self):
|
||||
lo = self.conn.lobject()
|
||||
length = lo.write("some data")
|
||||
length = lo.write(b("some data"))
|
||||
lo.close()
|
||||
|
||||
lo = self.conn.lobject(lo.oid)
|
||||
self.assertEqual(lo.read(4), "some")
|
||||
self.assertEqual(lo.read(), " data")
|
||||
self.assertEqual(lo.read(4), b("some"))
|
||||
self.assertEqual(lo.read(), b(" data"))
|
||||
|
||||
def test_read_large(self):
|
||||
lo = self.conn.lobject()
|
||||
data = "data" * 1000000
|
||||
length = lo.write("some"+data)
|
||||
data = b("data") * 1000000
|
||||
length = lo.write(b("some") + data)
|
||||
lo.close()
|
||||
|
||||
lo = self.conn.lobject(lo.oid)
|
||||
self.assertEqual(lo.read(4), "some")
|
||||
self.assertEqual(lo.read(4), b("some"))
|
||||
self.assertEqual(lo.read(), data)
|
||||
|
||||
def test_seek_tell(self):
|
||||
lo = self.conn.lobject()
|
||||
length = lo.write("some data")
|
||||
length = lo.write(b("some data"))
|
||||
self.assertEqual(lo.tell(), length)
|
||||
lo.close()
|
||||
lo = self.conn.lobject(lo.oid)
|
||||
|
||||
self.assertEqual(lo.seek(5, 0), 5)
|
||||
self.assertEqual(lo.tell(), 5)
|
||||
self.assertEqual(lo.read(), "data")
|
||||
self.assertEqual(lo.read(), b("data"))
|
||||
|
||||
# SEEK_CUR: relative current location
|
||||
lo.seek(5)
|
||||
self.assertEqual(lo.seek(2, 1), 7)
|
||||
self.assertEqual(lo.tell(), 7)
|
||||
self.assertEqual(lo.read(), "ta")
|
||||
self.assertEqual(lo.read(), b("ta"))
|
||||
|
||||
# SEEK_END: relative to end of file
|
||||
self.assertEqual(lo.seek(-2, 2), length - 2)
|
||||
self.assertEqual(lo.read(), "ta")
|
||||
self.assertEqual(lo.read(), b("ta"))
|
||||
|
||||
def test_unlink(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -175,13 +176,13 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
|
||||
def test_export(self):
|
||||
lo = self.conn.lobject()
|
||||
lo.write("some data")
|
||||
lo.write(b("some data"))
|
||||
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
filename = os.path.join(self.tmpdir, "data.txt")
|
||||
lo.export(filename)
|
||||
self.assertTrue(os.path.exists(filename))
|
||||
self.assertEqual(open(filename, "rb").read(), "some data")
|
||||
self.assertEqual(open(filename, "rb").read(), b("some data"))
|
||||
|
||||
def test_close_twice(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -191,7 +192,7 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
def test_write_after_close(self):
|
||||
lo = self.conn.lobject()
|
||||
lo.close()
|
||||
self.assertRaises(psycopg2.InterfaceError, lo.write, "some data")
|
||||
self.assertRaises(psycopg2.InterfaceError, lo.write, b("some data"))
|
||||
|
||||
def test_read_after_close(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -216,14 +217,14 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
|
||||
def test_export_after_close(self):
|
||||
lo = self.conn.lobject()
|
||||
lo.write("some data")
|
||||
lo.write(b("some data"))
|
||||
lo.close()
|
||||
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
filename = os.path.join(self.tmpdir, "data.txt")
|
||||
lo.export(filename)
|
||||
self.assertTrue(os.path.exists(filename))
|
||||
self.assertEqual(open(filename, "rb").read(), "some data")
|
||||
self.assertEqual(open(filename, "rb").read(), b("some data"))
|
||||
|
||||
def test_close_after_commit(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -238,7 +239,7 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
self.lo_oid = lo.oid
|
||||
self.conn.commit()
|
||||
|
||||
self.assertRaises(psycopg2.ProgrammingError, lo.write, "some data")
|
||||
self.assertRaises(psycopg2.ProgrammingError, lo.write, b("some data"))
|
||||
|
||||
def test_read_after_commit(self):
|
||||
lo = self.conn.lobject()
|
||||
|
@ -271,14 +272,14 @@ class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
|
|||
|
||||
def test_export_after_commit(self):
|
||||
lo = self.conn.lobject()
|
||||
lo.write("some data")
|
||||
lo.write(b("some data"))
|
||||
self.conn.commit()
|
||||
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
filename = os.path.join(self.tmpdir, "data.txt")
|
||||
lo.export(filename)
|
||||
self.assertTrue(os.path.exists(filename))
|
||||
self.assertEqual(open(filename, "rb").read(), "some data")
|
||||
self.assertEqual(open(filename, "rb").read(), b("some data"))
|
||||
|
||||
decorate_all_tests(LargeObjectTests, skip_if_no_lo)
|
||||
decorate_all_tests(LargeObjectTests, skip_if_green)
|
||||
|
@ -300,7 +301,7 @@ def skip_if_no_truncate(f):
|
|||
class LargeObjectTruncateTests(LargeObjectMixin, unittest.TestCase):
|
||||
def test_truncate(self):
|
||||
lo = self.conn.lobject()
|
||||
lo.write("some data")
|
||||
lo.write(b("some data"))
|
||||
lo.close()
|
||||
|
||||
lo = self.conn.lobject(lo.oid, "w")
|
||||
|
@ -309,17 +310,17 @@ class LargeObjectTruncateTests(LargeObjectMixin, unittest.TestCase):
|
|||
# seek position unchanged
|
||||
self.assertEqual(lo.tell(), 0)
|
||||
# data truncated
|
||||
self.assertEqual(lo.read(), "some")
|
||||
self.assertEqual(lo.read(), b("some"))
|
||||
|
||||
lo.truncate(6)
|
||||
lo.seek(0)
|
||||
# large object extended with zeroes
|
||||
self.assertEqual(lo.read(), "some\x00\x00")
|
||||
self.assertEqual(lo.read(), b("some\x00\x00"))
|
||||
|
||||
lo.truncate()
|
||||
lo.seek(0)
|
||||
# large object empty
|
||||
self.assertEqual(lo.read(), "")
|
||||
self.assertEqual(lo.read(), b(""))
|
||||
|
||||
def test_truncate_after_close(self):
|
||||
lo = self.conn.lobject()
|
||||
|
|
|
@ -5,6 +5,7 @@ from testconfig import dsn
|
|||
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
from psycopg2.extensions import b
|
||||
|
||||
class QuotingTestCase(unittest.TestCase):
|
||||
r"""Checks the correct quoting of strings and binary objects.
|
||||
|
@ -44,14 +45,20 @@ class QuotingTestCase(unittest.TestCase):
|
|||
self.assert_(not self.conn.notices)
|
||||
|
||||
def test_binary(self):
|
||||
data = """some data with \000\013 binary
|
||||
data = b("""some data with \000\013 binary
|
||||
stuff into, 'quotes' and \\ a backslash too.
|
||||
"""
|
||||
""")
|
||||
if sys.version_info[0] < 3:
|
||||
data += "".join(map(chr, range(256)))
|
||||
else:
|
||||
data += bytes(range(256))
|
||||
|
||||
curs = self.conn.cursor()
|
||||
curs.execute("SELECT %s::bytea;", (psycopg2.Binary(data),))
|
||||
if sys.version_info[0] < 3:
|
||||
res = str(curs.fetchone()[0])
|
||||
else:
|
||||
res = curs.fetchone()[0].tobytes()
|
||||
|
||||
self.assertEqual(res, data)
|
||||
self.assert_(not self.conn.notices)
|
||||
|
|
|
@ -29,9 +29,10 @@ except:
|
|||
import sys
|
||||
import testutils
|
||||
from testutils import unittest
|
||||
from testconfig import dsn
|
||||
|
||||
import psycopg2
|
||||
from testconfig import dsn
|
||||
from psycopg2.extensions import b
|
||||
|
||||
|
||||
class TypesBasicTests(unittest.TestCase):
|
||||
|
@ -231,7 +232,7 @@ class AdaptSubclassTest(unittest.TestCase):
|
|||
|
||||
register_adapter(A, lambda a: AsIs("a"))
|
||||
register_adapter(B, lambda b: AsIs("b"))
|
||||
self.assertEqual('b', adapt(C()).getquoted())
|
||||
self.assertEqual(b('b'), adapt(C()).getquoted())
|
||||
|
||||
@testutils.skip_on_python3
|
||||
def test_no_mro_no_joy(self):
|
||||
|
@ -251,7 +252,7 @@ class AdaptSubclassTest(unittest.TestCase):
|
|||
class B(A): pass
|
||||
|
||||
register_adapter(A, lambda a: AsIs("a"))
|
||||
self.assertEqual("a", adapt(B()).getquoted())
|
||||
self.assertEqual(b("a"), adapt(B()).getquoted())
|
||||
|
||||
|
||||
def test_suite():
|
||||
|
|
Loading…
Reference in New Issue
Block a user