2010-02-13 01:34:53 +03:00
|
|
|
#!/usr/bin/env python
|
2008-05-06 13:04:26 +04:00
|
|
|
import os
|
|
|
|
import shutil
|
|
|
|
import tempfile
|
2008-05-05 11:33:44 +04:00
|
|
|
import unittest
|
2010-03-29 02:42:56 +04:00
|
|
|
import warnings
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
import psycopg2
|
|
|
|
import psycopg2.extensions
|
|
|
|
import tests
|
|
|
|
|
|
|
|
|
|
|
|
class LargeObjectTests(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.conn = psycopg2.connect(tests.dsn)
|
|
|
|
self.lo_oid = None
|
2008-05-06 13:04:26 +04:00
|
|
|
self.tmpdir = None
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
def tearDown(self):
|
2008-05-06 13:04:26 +04:00
|
|
|
if self.tmpdir:
|
|
|
|
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
2008-05-05 11:33:44 +04:00
|
|
|
if self.lo_oid is not None:
|
2008-05-06 13:04:26 +04:00
|
|
|
self.conn.rollback()
|
|
|
|
try:
|
|
|
|
lo = self.conn.lobject(self.lo_oid, "n")
|
|
|
|
except psycopg2.OperationalError:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
lo.unlink()
|
2009-04-01 22:49:00 +04:00
|
|
|
self.conn.close()
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
def test_create(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.assertNotEqual(lo, None)
|
|
|
|
self.assertEqual(lo.mode, "w")
|
|
|
|
|
|
|
|
def test_open_non_existent(self):
|
2008-05-06 13:04:26 +04:00
|
|
|
# By creating then removing a large object, we get an Oid that
|
|
|
|
# should be unused.
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.unlink()
|
|
|
|
self.assertRaises(psycopg2.OperationalError, self.conn.lobject, lo.oid)
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
def test_open_existing(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo2 = self.conn.lobject(lo.oid)
|
|
|
|
self.assertNotEqual(lo2, None)
|
|
|
|
self.assertEqual(lo2.oid, lo.oid)
|
|
|
|
self.assertEqual(lo2.mode, "r")
|
|
|
|
|
2008-05-06 13:04:26 +04:00
|
|
|
def test_open_for_write(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo2 = self.conn.lobject(lo.oid, "w")
|
|
|
|
self.assertEqual(lo2.mode, "w")
|
|
|
|
lo2.write("some data")
|
|
|
|
|
|
|
|
def test_open_mode_n(self):
|
|
|
|
# Openning an object in mode "n" gives us a closed lobject.
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
lo2 = self.conn.lobject(lo.oid, "n")
|
|
|
|
self.assertEqual(lo2.oid, lo.oid)
|
|
|
|
self.assertEqual(lo2.closed, True)
|
|
|
|
|
|
|
|
def test_create_with_oid(self):
|
|
|
|
# Create and delete a large object to get an unused Oid.
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
oid = lo.oid
|
|
|
|
lo.unlink()
|
|
|
|
|
|
|
|
lo = self.conn.lobject(0, "w", oid)
|
|
|
|
self.assertEqual(lo.oid, oid)
|
|
|
|
|
|
|
|
def test_create_with_existing_oid(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
self.assertRaises(psycopg2.OperationalError,
|
|
|
|
self.conn.lobject, 0, "w", lo.oid)
|
|
|
|
|
|
|
|
def test_import(self):
|
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
|
filename = os.path.join(self.tmpdir, "data.txt")
|
|
|
|
fp = open(filename, "wb")
|
|
|
|
fp.write("some data")
|
|
|
|
fp.close()
|
|
|
|
|
|
|
|
lo = self.conn.lobject(0, "r", 0, filename)
|
|
|
|
self.assertEqual(lo.read(), "some data")
|
|
|
|
|
2008-05-05 11:33:44 +04:00
|
|
|
def test_close(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.assertEqual(lo.closed, False)
|
|
|
|
lo.close()
|
|
|
|
self.assertEqual(lo.closed, True)
|
|
|
|
|
|
|
|
def test_write(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.assertEqual(lo.write("some data"), len("some data"))
|
|
|
|
|
2009-08-09 19:05:16 +04:00
|
|
|
def test_write_large(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
data = "data" * 1000000
|
|
|
|
self.assertEqual(lo.write(data), len(data))
|
|
|
|
|
2008-05-06 13:04:26 +04:00
|
|
|
def test_read(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
length = lo.write("some data")
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
lo = self.conn.lobject(lo.oid)
|
|
|
|
self.assertEqual(lo.read(4), "some")
|
|
|
|
self.assertEqual(lo.read(), " data")
|
|
|
|
|
2009-08-09 19:05:16 +04:00
|
|
|
def test_read_large(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
data = "data" * 1000000
|
|
|
|
length = lo.write("some"+data)
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
lo = self.conn.lobject(lo.oid)
|
|
|
|
self.assertEqual(lo.read(4), "some")
|
|
|
|
self.assertEqual(lo.read(), data)
|
|
|
|
|
2008-05-05 11:33:44 +04:00
|
|
|
def test_seek_tell(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
length = lo.write("some data")
|
|
|
|
self.assertEqual(lo.tell(), length)
|
|
|
|
lo.close()
|
|
|
|
lo = self.conn.lobject(lo.oid)
|
|
|
|
|
|
|
|
self.assertEqual(lo.seek(5, 0), 5)
|
|
|
|
self.assertEqual(lo.tell(), 5)
|
2008-05-06 13:04:26 +04:00
|
|
|
self.assertEqual(lo.read(), "data")
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
# SEEK_CUR: relative current location
|
2008-05-06 13:04:26 +04:00
|
|
|
lo.seek(5)
|
2008-05-05 11:33:44 +04:00
|
|
|
self.assertEqual(lo.seek(2, 1), 7)
|
|
|
|
self.assertEqual(lo.tell(), 7)
|
2008-05-06 13:04:26 +04:00
|
|
|
self.assertEqual(lo.read(), "ta")
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
# SEEK_END: relative to end of file
|
|
|
|
self.assertEqual(lo.seek(-2, 2), length - 2)
|
2008-05-06 13:04:26 +04:00
|
|
|
self.assertEqual(lo.read(), "ta")
|
2008-05-05 11:33:44 +04:00
|
|
|
|
2008-05-06 13:04:26 +04:00
|
|
|
def test_unlink(self):
|
2008-05-05 11:33:44 +04:00
|
|
|
lo = self.conn.lobject()
|
2008-05-06 13:04:26 +04:00
|
|
|
lo.unlink()
|
|
|
|
|
|
|
|
# the object doesn't exist now, so we can't reopen it.
|
|
|
|
self.assertRaises(psycopg2.OperationalError, self.conn.lobject, lo.oid)
|
|
|
|
# And the object has been closed.
|
|
|
|
self.assertEquals(lo.closed, True)
|
|
|
|
|
|
|
|
def test_export(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.write("some data")
|
|
|
|
|
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
|
filename = os.path.join(self.tmpdir, "data.txt")
|
|
|
|
lo.export(filename)
|
|
|
|
self.assertTrue(os.path.exists(filename))
|
|
|
|
self.assertEqual(open(filename, "rb").read(), "some data")
|
|
|
|
|
|
|
|
def test_close_twice(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
2008-05-05 11:33:44 +04:00
|
|
|
lo.close()
|
|
|
|
|
2008-05-06 13:04:26 +04:00
|
|
|
def test_write_after_close(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
self.assertRaises(psycopg2.InterfaceError, lo.write, "some data")
|
2008-05-05 11:33:44 +04:00
|
|
|
|
2008-05-06 13:04:26 +04:00
|
|
|
def test_read_after_close(self):
|
2008-05-05 11:33:44 +04:00
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
2008-05-06 13:04:26 +04:00
|
|
|
self.assertRaises(psycopg2.InterfaceError, lo.read, 5)
|
|
|
|
|
|
|
|
def test_seek_after_close(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
self.assertRaises(psycopg2.InterfaceError, lo.seek, 0)
|
|
|
|
|
|
|
|
def test_tell_after_close(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
self.assertRaises(psycopg2.InterfaceError, lo.tell)
|
|
|
|
|
|
|
|
def test_unlink_after_close(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
# Unlink works on closed files.
|
2008-05-05 11:33:44 +04:00
|
|
|
lo.unlink()
|
|
|
|
|
2008-05-06 13:04:26 +04:00
|
|
|
def test_export_after_close(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.write("some data")
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
|
filename = os.path.join(self.tmpdir, "data.txt")
|
|
|
|
lo.export(filename)
|
|
|
|
self.assertTrue(os.path.exists(filename))
|
|
|
|
self.assertEqual(open(filename, "rb").read(), "some data")
|
|
|
|
|
|
|
|
def test_close_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
# Closing outside of the transaction is okay.
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
def test_write_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
self.assertRaises(psycopg2.ProgrammingError, lo.write, "some data")
|
|
|
|
|
|
|
|
def test_read_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
self.assertRaises(psycopg2.ProgrammingError, lo.read, 5)
|
|
|
|
|
|
|
|
def test_seek_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
self.assertRaises(psycopg2.ProgrammingError, lo.seek, 0)
|
|
|
|
|
|
|
|
def test_tell_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
self.assertRaises(psycopg2.ProgrammingError, lo.tell)
|
|
|
|
|
|
|
|
def test_unlink_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
# Unlink of stale lobject is okay
|
|
|
|
lo.unlink()
|
|
|
|
|
|
|
|
def test_export_after_commit(self):
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.write("some data")
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
|
filename = os.path.join(self.tmpdir, "data.txt")
|
|
|
|
lo.export(filename)
|
|
|
|
self.assertTrue(os.path.exists(filename))
|
|
|
|
self.assertEqual(open(filename, "rb").read(), "some data")
|
2008-05-05 11:33:44 +04:00
|
|
|
|
|
|
|
|
2010-03-29 02:42:56 +04:00
|
|
|
class LargeObjectTruncateTests(LargeObjectTests):
|
|
|
|
|
|
|
|
skip = None
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
LargeObjectTests.setUp(self)
|
|
|
|
|
|
|
|
if self.skip is None:
|
|
|
|
self.skip = False
|
|
|
|
if self.conn.server_version < 80300:
|
|
|
|
warnings.warn("Large object truncate tests skipped, "
|
|
|
|
"the server does not support them")
|
|
|
|
self.skip = True
|
|
|
|
|
|
|
|
if not hasattr(psycopg2.extensions.lobject, 'truncate'):
|
|
|
|
warnings.warn("Large object truncate tests skipped, "
|
|
|
|
"psycopg2 has been built against an old library")
|
|
|
|
self.skip = True
|
|
|
|
|
|
|
|
def test_truncate(self):
|
|
|
|
if self.skip:
|
|
|
|
return
|
|
|
|
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.write("some data")
|
|
|
|
lo.close()
|
|
|
|
|
|
|
|
lo = self.conn.lobject(lo.oid, "w")
|
|
|
|
lo.truncate(4)
|
|
|
|
|
|
|
|
# seek position unchanged
|
|
|
|
self.assertEqual(lo.tell(), 0)
|
|
|
|
# data truncated
|
|
|
|
self.assertEqual(lo.read(), "some")
|
|
|
|
|
|
|
|
lo.truncate(6)
|
|
|
|
lo.seek(0)
|
|
|
|
# large object extended with zeroes
|
|
|
|
self.assertEqual(lo.read(), "some\x00\x00")
|
|
|
|
|
|
|
|
lo.truncate()
|
|
|
|
lo.seek(0)
|
|
|
|
# large object empty
|
|
|
|
self.assertEqual(lo.read(), "")
|
|
|
|
|
|
|
|
def test_truncate_after_close(self):
|
|
|
|
if self.skip:
|
|
|
|
return
|
|
|
|
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
lo.close()
|
|
|
|
self.assertRaises(psycopg2.InterfaceError, lo.truncate)
|
|
|
|
|
|
|
|
def test_truncate_after_commit(self):
|
|
|
|
if self.skip:
|
|
|
|
return
|
|
|
|
|
|
|
|
lo = self.conn.lobject()
|
|
|
|
self.lo_oid = lo.oid
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
self.assertRaises(psycopg2.ProgrammingError, lo.truncate)
|
|
|
|
|
2008-05-05 11:33:44 +04:00
|
|
|
def test_suite():
|
|
|
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
2010-02-13 01:34:53 +03:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2010-03-29 02:42:56 +04:00
|
|
|
unittest.main()
|