psycopg2/tests/test_transaction.py

259 lines
9.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# test_transaction - unit test on transaction behaviour
#
# Copyright (C) 2007-2019 Federico Di Gregorio <fog@debian.org>
2020-01-18 00:10:44 +03:00
# Copyright (C) 2020 The Psycopg Team
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import threading
import unittest
from .testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import skip_if_crdb
import psycopg2
from psycopg2.extensions import (
ISOLATION_LEVEL_SERIALIZABLE, STATUS_BEGIN, STATUS_READY)
2016-10-11 02:10:53 +03:00
class TransactionTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
skip_if_crdb("isolation level", self.conn)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
curs = self.conn.cursor()
curs.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
# The constraint is set to deferrable for the commit_failed test
curs.execute('''
CREATE TEMPORARY TABLE table2 (
id int PRIMARY KEY,
table1_id int,
CONSTRAINT table2__table1_id__fk
FOREIGN KEY (table1_id) REFERENCES table1(id) DEFERRABLE)''')
curs.execute('INSERT INTO table1 VALUES (1)')
curs.execute('INSERT INTO table2 VALUES (1, 1)')
self.conn.commit()
def test_rollback(self):
# Test that rollback undoes changes
curs = self.conn.cursor()
curs.execute('INSERT INTO table2 VALUES (2, 1)')
# Rollback takes us from BEGIN state to READY state
self.assertEqual(self.conn.status, STATUS_BEGIN)
self.conn.rollback()
self.assertEqual(self.conn.status, STATUS_READY)
curs.execute('SELECT id, table1_id FROM table2 WHERE id = 2')
self.assertEqual(curs.fetchall(), [])
def test_commit(self):
# Test that commit stores changes
curs = self.conn.cursor()
curs.execute('INSERT INTO table2 VALUES (2, 1)')
# Rollback takes us from BEGIN state to READY state
self.assertEqual(self.conn.status, STATUS_BEGIN)
self.conn.commit()
self.assertEqual(self.conn.status, STATUS_READY)
# Now rollback and show that the new record is still there:
self.conn.rollback()
curs.execute('SELECT id, table1_id FROM table2 WHERE id = 2')
self.assertEqual(curs.fetchall(), [(2, 1)])
def test_failed_commit(self):
# Test that we can recover from a failed commit.
# We use a deferred constraint to cause a failure on commit.
curs = self.conn.cursor()
curs.execute('SET CONSTRAINTS table2__table1_id__fk DEFERRED')
curs.execute('INSERT INTO table2 VALUES (2, 42)')
# The commit should fail, and move the cursor back to READY state
self.assertEqual(self.conn.status, STATUS_BEGIN)
self.assertRaises(psycopg2.IntegrityError, self.conn.commit)
self.assertEqual(self.conn.status, STATUS_READY)
# The connection should be ready to use for the next transaction:
curs.execute('SELECT 1')
self.assertEqual(curs.fetchone()[0], 1)
class DeadlockSerializationTests(ConnectingTestCase):
"""Test deadlock and serialization failure errors."""
def connect(self):
conn = ConnectingTestCase.connect(self)
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
return conn
def setUp(self):
ConnectingTestCase.setUp(self)
skip_if_crdb("isolation level", self.conn)
curs = self.conn.cursor()
# Drop table if it already exists
try:
curs.execute("DROP TABLE table1")
self.conn.commit()
except psycopg2.DatabaseError:
self.conn.rollback()
try:
curs.execute("DROP TABLE table2")
self.conn.commit()
except psycopg2.DatabaseError:
self.conn.rollback()
# Create sample data
curs.execute("""
CREATE TABLE table1 (
id int PRIMARY KEY,
name text)
""")
curs.execute("INSERT INTO table1 VALUES (1, 'hello')")
curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)")
self.conn.commit()
def tearDown(self):
curs = self.conn.cursor()
curs.execute("DROP TABLE table1")
curs.execute("DROP TABLE table2")
self.conn.commit()
ConnectingTestCase.tearDown(self)
@slow
def test_deadlock(self):
self.thread1_error = self.thread2_error = None
step1 = threading.Event()
step2 = threading.Event()
def task1():
try:
conn = self.connect()
curs = conn.cursor()
curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
step1.set()
step2.wait()
curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
except psycopg2.DatabaseError as exc:
self.thread1_error = exc
step1.set()
conn.close()
2016-10-11 02:10:53 +03:00
def task2():
try:
conn = self.connect()
curs = conn.cursor()
step1.wait()
curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
step2.set()
curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
except psycopg2.DatabaseError as exc:
self.thread2_error = exc
step2.set()
conn.close()
# Run the threads in parallel. The "step1" and "step2" events
# ensure that the two transactions overlap.
thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# Exactly one of the threads should have failed with
# TransactionRollbackError:
self.assertFalse(self.thread1_error and self.thread2_error)
error = self.thread1_error or self.thread2_error
self.assertTrue(isinstance(
2016-10-11 02:10:53 +03:00
error, psycopg2.extensions.TransactionRollbackError))
@slow
def test_serialisation_failure(self):
self.thread1_error = self.thread2_error = None
step1 = threading.Event()
step2 = threading.Event()
def task1():
try:
conn = self.connect()
curs = conn.cursor()
curs.execute("SELECT name FROM table1 WHERE id = 1")
curs.fetchall()
step1.set()
step2.wait()
curs.execute("UPDATE table1 SET name='task1' WHERE id = 1")
conn.commit()
except psycopg2.DatabaseError as exc:
self.thread1_error = exc
step1.set()
conn.close()
2016-10-11 02:10:53 +03:00
def task2():
try:
conn = self.connect()
curs = conn.cursor()
step1.wait()
curs.execute("UPDATE table1 SET name='task2' WHERE id = 1")
conn.commit()
except psycopg2.DatabaseError as exc:
self.thread2_error = exc
step2.set()
conn.close()
# Run the threads in parallel. The "step1" and "step2" events
# ensure that the two transactions overlap.
thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# Exactly one of the threads should have failed with
# TransactionRollbackError:
self.assertFalse(self.thread1_error and self.thread2_error)
error = self.thread1_error or self.thread2_error
self.assertTrue(isinstance(
2016-10-11 02:10:53 +03:00
error, psycopg2.extensions.TransactionRollbackError))
class QueryCancellationTests(ConnectingTestCase):
"""Tests for query cancellation."""
def setUp(self):
ConnectingTestCase.setUp(self)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
@skip_before_postgres(8, 2)
def test_statement_timeout(self):
curs = self.conn.cursor()
# Set a low statement timeout, then sleep for a longer period.
curs.execute('SET statement_timeout TO 10')
self.assertRaises(psycopg2.extensions.QueryCanceledError,
curs.execute, 'SELECT pg_sleep(50)')
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
2018-10-23 02:39:14 +03:00
if __name__ == "__main__":
unittest.main()