2011-11-18 00:18:36 +04:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
|
|
# test_module.py - unit test for the module interface
|
|
|
|
#
|
2019-02-17 04:34:52 +03:00
|
|
|
# Copyright (C) 2011-2019 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
2020-01-18 00:10:44 +03:00
|
|
|
# Copyright (C) 2020 The Psycopg Team
|
2011-11-18 00:18:36 +04:00
|
|
|
#
|
|
|
|
# psycopg2 is free software: you can redistribute it and/or modify it
|
|
|
|
# under the terms of the GNU Lesser General Public License as published
|
|
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# In addition, as a special exception, the copyright holders give
|
|
|
|
# permission to link this program with the OpenSSL library (or with
|
|
|
|
# modified versions of OpenSSL that use the same license as OpenSSL),
|
|
|
|
# and distribute linked combinations including the two.
|
|
|
|
#
|
|
|
|
# You must obey the GNU Lesser General Public License in all respects for
|
|
|
|
# all of the code used other than OpenSSL.
|
|
|
|
#
|
|
|
|
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
|
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
|
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
|
|
|
# License for more details.
|
|
|
|
|
2019-03-16 18:30:15 +03:00
|
|
|
import gc
|
2014-08-26 06:42:34 +04:00
|
|
|
import os
|
|
|
|
import sys
|
2019-03-16 21:56:56 +03:00
|
|
|
import pickle
|
2014-08-26 06:42:34 +04:00
|
|
|
from subprocess import Popen
|
2019-03-16 18:30:15 +03:00
|
|
|
from weakref import ref
|
2014-08-26 06:42:34 +04:00
|
|
|
|
2017-12-02 04:59:53 +03:00
|
|
|
import unittest
|
2017-12-04 05:47:19 +03:00
|
|
|
from .testutils import (skip_before_postgres,
|
2020-07-22 04:43:19 +03:00
|
|
|
ConnectingTestCase, skip_copy_if_green, skip_if_crdb, slow, StringIO)
|
2011-11-18 00:18:36 +04:00
|
|
|
|
|
|
|
import psycopg2
|
|
|
|
|
2016-03-03 18:07:38 +03:00
|
|
|
|
2011-11-18 00:18:36 +04:00
|
|
|
class ConnectTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.args = None
|
2016-03-03 18:07:38 +03:00
|
|
|
|
2017-02-03 07:28:27 +03:00
|
|
|
def connect_stub(dsn, connection_factory=None, async_=False):
|
|
|
|
self.args = (dsn, connection_factory, async_)
|
2011-11-18 00:18:36 +04:00
|
|
|
|
|
|
|
self._connect_orig = psycopg2._connect
|
2017-02-03 07:28:27 +03:00
|
|
|
psycopg2._connect = connect_stub
|
2011-11-18 00:18:36 +04:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
psycopg2._connect = self._connect_orig
|
|
|
|
|
2021-05-20 13:36:40 +03:00
|
|
|
def test_there_might_be_nothing(self):
|
|
|
|
psycopg2.connect()
|
|
|
|
self.assertEqual(self.args[0], '')
|
|
|
|
self.assertEqual(self.args[1], None)
|
|
|
|
self.assertEqual(self.args[2], False)
|
|
|
|
|
|
|
|
psycopg2.connect(
|
2017-02-03 07:28:27 +03:00
|
|
|
connection_factory=lambda dsn, async_=False: None)
|
2021-05-20 13:36:40 +03:00
|
|
|
self.assertEqual(self.args[0], '')
|
|
|
|
self.assertNotEqual(self.args[1], None)
|
|
|
|
self.assertEqual(self.args[2], False)
|
|
|
|
|
|
|
|
psycopg2.connect(async_=True)
|
|
|
|
self.assertEqual(self.args[0], '')
|
|
|
|
self.assertEqual(self.args[1], None)
|
|
|
|
self.assertEqual(self.args[2], True)
|
2011-11-18 01:41:50 +04:00
|
|
|
|
2011-11-18 00:18:36 +04:00
|
|
|
def test_no_keywords(self):
|
|
|
|
psycopg2.connect('')
|
|
|
|
self.assertEqual(self.args[0], '')
|
|
|
|
self.assertEqual(self.args[1], None)
|
|
|
|
self.assertEqual(self.args[2], False)
|
|
|
|
|
|
|
|
def test_dsn(self):
|
2016-03-03 19:52:53 +03:00
|
|
|
psycopg2.connect('dbname=blah host=y')
|
|
|
|
self.assertEqual(self.args[0], 'dbname=blah host=y')
|
2011-11-18 00:18:36 +04:00
|
|
|
self.assertEqual(self.args[1], None)
|
|
|
|
self.assertEqual(self.args[2], False)
|
|
|
|
|
|
|
|
def test_supported_keywords(self):
|
|
|
|
psycopg2.connect(database='foo')
|
|
|
|
self.assertEqual(self.args[0], 'dbname=foo')
|
|
|
|
psycopg2.connect(user='postgres')
|
|
|
|
self.assertEqual(self.args[0], 'user=postgres')
|
|
|
|
psycopg2.connect(password='secret')
|
|
|
|
self.assertEqual(self.args[0], 'password=secret')
|
|
|
|
psycopg2.connect(port=5432)
|
|
|
|
self.assertEqual(self.args[0], 'port=5432')
|
|
|
|
psycopg2.connect(sslmode='require')
|
|
|
|
self.assertEqual(self.args[0], 'sslmode=require')
|
|
|
|
|
|
|
|
psycopg2.connect(database='foo',
|
|
|
|
user='postgres', password='secret', port=5432)
|
|
|
|
self.assert_('dbname=foo' in self.args[0])
|
|
|
|
self.assert_('user=postgres' in self.args[0])
|
|
|
|
self.assert_('password=secret' in self.args[0])
|
|
|
|
self.assert_('port=5432' in self.args[0])
|
|
|
|
self.assertEqual(len(self.args[0].split()), 4)
|
|
|
|
|
|
|
|
def test_generic_keywords(self):
|
2016-03-03 19:52:53 +03:00
|
|
|
psycopg2.connect(options='stuff')
|
|
|
|
self.assertEqual(self.args[0], 'options=stuff')
|
2011-11-18 00:18:36 +04:00
|
|
|
|
|
|
|
def test_factory(self):
|
2017-02-03 07:28:27 +03:00
|
|
|
def f(dsn, async_=False):
|
2011-11-18 00:18:36 +04:00
|
|
|
pass
|
|
|
|
|
2016-03-03 19:52:53 +03:00
|
|
|
psycopg2.connect(database='foo', host='baz', connection_factory=f)
|
2017-02-06 21:56:50 +03:00
|
|
|
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
|
2011-11-18 00:18:36 +04:00
|
|
|
self.assertEqual(self.args[1], f)
|
|
|
|
self.assertEqual(self.args[2], False)
|
|
|
|
|
2016-03-03 19:52:53 +03:00
|
|
|
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
|
2017-02-06 21:56:50 +03:00
|
|
|
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
|
2011-11-18 00:18:36 +04:00
|
|
|
self.assertEqual(self.args[1], f)
|
|
|
|
self.assertEqual(self.args[2], False)
|
|
|
|
|
|
|
|
def test_async(self):
|
2017-02-03 07:28:27 +03:00
|
|
|
psycopg2.connect(database='foo', host='baz', async_=1)
|
2017-02-06 21:56:50 +03:00
|
|
|
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
|
2011-11-18 00:18:36 +04:00
|
|
|
self.assertEqual(self.args[1], None)
|
|
|
|
self.assert_(self.args[2])
|
|
|
|
|
2017-02-03 07:28:27 +03:00
|
|
|
psycopg2.connect("dbname=foo host=baz", async_=True)
|
2017-02-06 21:56:50 +03:00
|
|
|
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
|
2011-11-18 00:18:36 +04:00
|
|
|
self.assertEqual(self.args[1], None)
|
|
|
|
self.assert_(self.args[2])
|
|
|
|
|
2015-10-27 14:54:10 +03:00
|
|
|
def test_int_port_param(self):
|
|
|
|
psycopg2.connect(database='sony', port=6543)
|
|
|
|
dsn = " %s " % self.args[0]
|
2016-12-24 02:18:22 +03:00
|
|
|
self.assert_(" dbname=sony " in dsn, dsn)
|
|
|
|
self.assert_(" port=6543 " in dsn, dsn)
|
2015-10-27 14:54:10 +03:00
|
|
|
|
2011-11-18 01:19:01 +04:00
|
|
|
def test_empty_param(self):
|
|
|
|
psycopg2.connect(database='sony', password='')
|
2017-02-06 21:56:50 +03:00
|
|
|
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
|
2011-11-18 01:19:01 +04:00
|
|
|
|
|
|
|
def test_escape(self):
|
|
|
|
psycopg2.connect(database='hello world')
|
|
|
|
self.assertEqual(self.args[0], "dbname='hello world'")
|
|
|
|
|
|
|
|
psycopg2.connect(database=r'back\slash')
|
|
|
|
self.assertEqual(self.args[0], r"dbname=back\\slash")
|
|
|
|
|
|
|
|
psycopg2.connect(database="quo'te")
|
|
|
|
self.assertEqual(self.args[0], r"dbname=quo\'te")
|
|
|
|
|
|
|
|
psycopg2.connect(database="with\ttab")
|
|
|
|
self.assertEqual(self.args[0], "dbname='with\ttab'")
|
|
|
|
|
|
|
|
psycopg2.connect(database=r"\every thing'")
|
|
|
|
self.assertEqual(self.args[0], r"dbname='\\every thing\''")
|
|
|
|
|
2016-03-03 07:33:59 +03:00
|
|
|
def test_params_merging(self):
|
|
|
|
psycopg2.connect('dbname=foo', database='bar')
|
|
|
|
self.assertEqual(self.args[0], 'dbname=bar')
|
|
|
|
|
|
|
|
psycopg2.connect('dbname=foo', user='postgres')
|
2017-02-06 21:56:50 +03:00
|
|
|
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
|
2012-09-26 02:46:46 +04:00
|
|
|
|
2011-11-18 00:18:36 +04:00
|
|
|
|
2013-04-07 03:23:30 +04:00
|
|
|
class ExceptionsTestCase(ConnectingTestCase):
|
2012-01-14 21:34:09 +04:00
|
|
|
def test_attributes(self):
|
|
|
|
cur = self.conn.cursor()
|
2012-02-24 03:15:42 +04:00
|
|
|
try:
|
|
|
|
cur.execute("select * from nonexist")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2012-02-24 03:15:42 +04:00
|
|
|
e = exc
|
2012-01-14 21:34:09 +04:00
|
|
|
|
|
|
|
self.assertEqual(e.pgcode, '42P01')
|
|
|
|
self.assert_(e.pgerror)
|
|
|
|
self.assert_(e.cursor is cur)
|
|
|
|
|
2013-03-18 03:31:53 +04:00
|
|
|
def test_diagnostics_attributes(self):
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.execute("select * from nonexist")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-18 03:31:53 +04:00
|
|
|
e = exc
|
|
|
|
|
|
|
|
diag = e.diag
|
|
|
|
self.assert_(isinstance(diag, psycopg2.extensions.Diagnostics))
|
|
|
|
for attr in [
|
|
|
|
'column_name', 'constraint_name', 'context', 'datatype_name',
|
|
|
|
'internal_position', 'internal_query', 'message_detail',
|
|
|
|
'message_hint', 'message_primary', 'schema_name', 'severity',
|
2018-10-04 18:13:46 +03:00
|
|
|
'severity_nonlocalized', 'source_file', 'source_function',
|
|
|
|
'source_line', 'sqlstate', 'statement_position', 'table_name', ]:
|
2013-03-18 03:31:53 +04:00
|
|
|
v = getattr(diag, attr)
|
|
|
|
if v is not None:
|
|
|
|
self.assert_(isinstance(v, str))
|
|
|
|
|
|
|
|
def test_diagnostics_values(self):
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.execute("select * from nonexist")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-18 03:31:53 +04:00
|
|
|
e = exc
|
|
|
|
|
|
|
|
self.assertEqual(e.diag.sqlstate, '42P01')
|
|
|
|
self.assertEqual(e.diag.severity, 'ERROR')
|
|
|
|
|
2013-03-18 04:24:46 +04:00
|
|
|
def test_diagnostics_life(self):
|
|
|
|
def tmp():
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.execute("select * from nonexist")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-18 04:24:46 +04:00
|
|
|
return cur, exc
|
|
|
|
|
|
|
|
cur, e = tmp()
|
|
|
|
diag = e.diag
|
|
|
|
w = ref(cur)
|
|
|
|
|
|
|
|
del e, cur
|
|
|
|
gc.collect()
|
|
|
|
assert(w() is not None)
|
|
|
|
|
|
|
|
self.assertEqual(diag.sqlstate, '42P01')
|
|
|
|
|
|
|
|
del diag
|
2016-03-03 18:07:38 +03:00
|
|
|
gc.collect()
|
|
|
|
gc.collect()
|
2013-03-18 04:24:46 +04:00
|
|
|
assert(w() is None)
|
|
|
|
|
2020-07-28 00:58:43 +03:00
|
|
|
@skip_if_crdb("copy")
|
2013-03-18 14:06:07 +04:00
|
|
|
@skip_copy_if_green
|
2013-03-18 04:31:25 +04:00
|
|
|
def test_diagnostics_copy(self):
|
|
|
|
f = StringIO()
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.copy_to(f, 'nonexist')
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-18 04:31:25 +04:00
|
|
|
diag = exc.diag
|
|
|
|
|
|
|
|
self.assertEqual(diag.sqlstate, '42P01')
|
|
|
|
|
2013-03-20 03:33:12 +04:00
|
|
|
def test_diagnostics_independent(self):
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.execute("l'acqua e' poca e 'a papera nun galleggia")
|
2017-11-21 07:00:35 +03:00
|
|
|
except Exception as exc:
|
2013-03-20 03:33:12 +04:00
|
|
|
diag1 = exc.diag
|
|
|
|
|
|
|
|
self.conn.rollback()
|
|
|
|
|
|
|
|
try:
|
|
|
|
cur.execute("select level from water where ducks > 1")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-20 03:33:12 +04:00
|
|
|
diag2 = exc.diag
|
|
|
|
|
|
|
|
self.assertEqual(diag1.sqlstate, '42601')
|
|
|
|
self.assertEqual(diag2.sqlstate, '42P01')
|
|
|
|
|
2020-07-28 00:58:43 +03:00
|
|
|
@skip_if_crdb("deferrable")
|
2013-03-20 03:33:12 +04:00
|
|
|
def test_diagnostics_from_commit(self):
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
cur.execute("""
|
|
|
|
create temp table test_deferred (
|
|
|
|
data int primary key,
|
|
|
|
ref int references test_deferred (data)
|
|
|
|
deferrable initially deferred)
|
|
|
|
""")
|
|
|
|
cur.execute("insert into test_deferred values (1,2)")
|
|
|
|
try:
|
|
|
|
self.conn.commit()
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-20 03:33:12 +04:00
|
|
|
e = exc
|
|
|
|
self.assertEqual(e.diag.sqlstate, '23503')
|
|
|
|
|
2020-07-28 00:58:43 +03:00
|
|
|
@skip_if_crdb("diagnostic")
|
2013-03-17 20:41:15 +04:00
|
|
|
@skip_before_postgres(9, 3)
|
2013-03-18 03:31:53 +04:00
|
|
|
def test_9_3_diagnostics(self):
|
2013-03-17 20:41:15 +04:00
|
|
|
cur = self.conn.cursor()
|
|
|
|
cur.execute("""
|
|
|
|
create temp table test_exc (
|
|
|
|
data int constraint chk_eq1 check (data = 1)
|
|
|
|
)""")
|
|
|
|
try:
|
|
|
|
cur.execute("insert into test_exc values(2)")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-03-17 20:41:15 +04:00
|
|
|
e = exc
|
|
|
|
self.assertEqual(e.pgcode, '23514')
|
|
|
|
self.assertEqual(e.diag.schema_name[:7], "pg_temp")
|
|
|
|
self.assertEqual(e.diag.table_name, "test_exc")
|
|
|
|
self.assertEqual(e.diag.column_name, None)
|
|
|
|
self.assertEqual(e.diag.constraint_name, "chk_eq1")
|
|
|
|
self.assertEqual(e.diag.datatype_name, None)
|
|
|
|
|
2018-10-04 18:13:46 +03:00
|
|
|
@skip_before_postgres(9, 6)
|
|
|
|
def test_9_6_diagnostics(self):
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.execute("select 1 from nosuchtable")
|
|
|
|
except psycopg2.Error as exc:
|
|
|
|
e = exc
|
|
|
|
self.assertEqual(e.diag.severity_nonlocalized, 'ERROR')
|
|
|
|
|
2012-01-14 21:34:09 +04:00
|
|
|
def test_pickle(self):
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
|
|
cur.execute("select * from nonexist")
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2012-02-24 03:15:42 +04:00
|
|
|
e = exc
|
2012-01-14 21:34:09 +04:00
|
|
|
|
|
|
|
e1 = pickle.loads(pickle.dumps(e))
|
|
|
|
|
|
|
|
self.assertEqual(e.pgerror, e1.pgerror)
|
|
|
|
self.assertEqual(e.pgcode, e1.pgcode)
|
|
|
|
self.assert_(e1.cursor is None)
|
|
|
|
|
2020-07-28 00:58:43 +03:00
|
|
|
@skip_if_crdb("connect any db")
|
2013-07-19 18:57:27 +04:00
|
|
|
def test_pickle_connection_error(self):
|
|
|
|
# segfaults on psycopg 2.5.1 - see ticket #170
|
|
|
|
try:
|
|
|
|
psycopg2.connect('dbname=nosuchdatabasemate')
|
2017-11-21 07:00:35 +03:00
|
|
|
except psycopg2.Error as exc:
|
2013-07-19 18:57:27 +04:00
|
|
|
e = exc
|
|
|
|
|
|
|
|
e1 = pickle.loads(pickle.dumps(e))
|
|
|
|
|
|
|
|
self.assertEqual(e.pgerror, e1.pgerror)
|
|
|
|
self.assertEqual(e.pgcode, e1.pgcode)
|
|
|
|
self.assert_(e1.cursor is None)
|
|
|
|
|
2012-01-14 21:34:09 +04:00
|
|
|
|
2014-08-26 06:42:34 +04:00
|
|
|
class TestExtensionModule(unittest.TestCase):
|
2017-02-02 05:58:22 +03:00
|
|
|
@slow
|
2014-08-26 06:42:34 +04:00
|
|
|
def test_import_internal(self):
|
|
|
|
# check that the internal package can be imported "naked"
|
|
|
|
# we may break this property if there is a compelling reason to do so,
|
|
|
|
# however having it allows for some import juggling such as the one
|
|
|
|
# required in ticket #201.
|
|
|
|
pkgdir = os.path.dirname(psycopg2.__file__)
|
|
|
|
pardir = os.path.dirname(pkgdir)
|
|
|
|
self.assert_(pardir in sys.path)
|
|
|
|
script = ("""
|
|
|
|
import sys
|
|
|
|
sys.path.remove(%r)
|
|
|
|
sys.path.insert(0, %r)
|
|
|
|
import _psycopg
|
|
|
|
""" % (pardir, pkgdir))
|
|
|
|
|
2017-11-29 07:58:41 +03:00
|
|
|
proc = Popen([sys.executable, '-c', script])
|
2014-08-26 06:42:34 +04:00
|
|
|
proc.communicate()
|
|
|
|
self.assertEqual(0, proc.returncode)
|
|
|
|
|
|
|
|
|
2015-06-02 12:12:16 +03:00
|
|
|
class TestVersionDiscovery(unittest.TestCase):
|
|
|
|
def test_libpq_version(self):
|
|
|
|
self.assertTrue(type(psycopg2.__libpq_version__) is int)
|
|
|
|
try:
|
|
|
|
self.assertTrue(type(psycopg2.extensions.libpq_version()) is int)
|
2016-03-03 18:07:38 +03:00
|
|
|
except psycopg2.NotSupportedError:
|
2015-06-02 12:12:16 +03:00
|
|
|
self.assertTrue(psycopg2.__libpq_version__ < 90100)
|
|
|
|
|
|
|
|
|
2011-11-18 00:18:36 +04:00
|
|
|
def test_suite():
|
|
|
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
|
|
|
|
2018-10-23 02:39:14 +03:00
|
|
|
|
2011-11-18 00:18:36 +04:00
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|