#!/usr/bin/env python # test_transaction - unit test on transaction behaviour # # Copyright (C) 2007-2019 Federico Di Gregorio # Copyright (C) 2020-2021 The Psycopg Team # # psycopg2 is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # In addition, as a special exception, the copyright holders give # permission to link this program with the OpenSSL library (or with # modified versions of OpenSSL that use the same license as OpenSSL), # and distribute linked combinations including the two. # # You must obey the GNU Lesser General Public License in all respects for # all of the code used other than OpenSSL. # # psycopg2 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. import threading import unittest from .testutils import ConnectingTestCase, skip_before_postgres, slow from .testutils import skip_if_crdb import psycopg2 from psycopg2.extensions import ( ISOLATION_LEVEL_SERIALIZABLE, STATUS_BEGIN, STATUS_READY) class TransactionTests(ConnectingTestCase): def setUp(self): ConnectingTestCase.setUp(self) skip_if_crdb("isolation level", self.conn) self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) curs = self.conn.cursor() curs.execute(''' CREATE TEMPORARY TABLE table1 ( id int PRIMARY KEY )''') # The constraint is set to deferrable for the commit_failed test curs.execute(''' CREATE TEMPORARY TABLE table2 ( id int PRIMARY KEY, table1_id int, CONSTRAINT table2__table1_id__fk FOREIGN KEY (table1_id) REFERENCES table1(id) DEFERRABLE)''') curs.execute('INSERT INTO table1 VALUES (1)') curs.execute('INSERT INTO table2 VALUES (1, 1)') self.conn.commit() def test_rollback(self): # Test that rollback undoes changes curs = self.conn.cursor() curs.execute('INSERT INTO table2 VALUES (2, 1)') # Rollback takes us from BEGIN state to READY state self.assertEqual(self.conn.status, STATUS_BEGIN) self.conn.rollback() self.assertEqual(self.conn.status, STATUS_READY) curs.execute('SELECT id, table1_id FROM table2 WHERE id = 2') self.assertEqual(curs.fetchall(), []) def test_commit(self): # Test that commit stores changes curs = self.conn.cursor() curs.execute('INSERT INTO table2 VALUES (2, 1)') # Rollback takes us from BEGIN state to READY state self.assertEqual(self.conn.status, STATUS_BEGIN) self.conn.commit() self.assertEqual(self.conn.status, STATUS_READY) # Now rollback and show that the new record is still there: self.conn.rollback() curs.execute('SELECT id, table1_id FROM table2 WHERE id = 2') self.assertEqual(curs.fetchall(), [(2, 1)]) def test_failed_commit(self): # Test that we can recover from a failed commit. # We use a deferred constraint to cause a failure on commit. curs = self.conn.cursor() curs.execute('SET CONSTRAINTS table2__table1_id__fk DEFERRED') curs.execute('INSERT INTO table2 VALUES (2, 42)') # The commit should fail, and move the cursor back to READY state self.assertEqual(self.conn.status, STATUS_BEGIN) self.assertRaises(psycopg2.IntegrityError, self.conn.commit) self.assertEqual(self.conn.status, STATUS_READY) # The connection should be ready to use for the next transaction: curs.execute('SELECT 1') self.assertEqual(curs.fetchone()[0], 1) class DeadlockSerializationTests(ConnectingTestCase): """Test deadlock and serialization failure errors.""" def connect(self): conn = ConnectingTestCase.connect(self) conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) return conn def setUp(self): ConnectingTestCase.setUp(self) skip_if_crdb("isolation level", self.conn) curs = self.conn.cursor() # Drop table if it already exists try: curs.execute("DROP TABLE table1") self.conn.commit() except psycopg2.DatabaseError: self.conn.rollback() try: curs.execute("DROP TABLE table2") self.conn.commit() except psycopg2.DatabaseError: self.conn.rollback() # Create sample data curs.execute(""" CREATE TABLE table1 ( id int PRIMARY KEY, name text) """) curs.execute("INSERT INTO table1 VALUES (1, 'hello')") curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)") self.conn.commit() def tearDown(self): curs = self.conn.cursor() curs.execute("DROP TABLE table1") curs.execute("DROP TABLE table2") self.conn.commit() ConnectingTestCase.tearDown(self) @slow def test_deadlock(self): self.thread1_error = self.thread2_error = None step1 = threading.Event() step2 = threading.Event() def task1(): try: conn = self.connect() curs = conn.cursor() curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") step1.set() step2.wait() curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") except psycopg2.DatabaseError as exc: self.thread1_error = exc step1.set() conn.close() def task2(): try: conn = self.connect() curs = conn.cursor() step1.wait() curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") step2.set() curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") except psycopg2.DatabaseError as exc: self.thread2_error = exc step2.set() conn.close() # Run the threads in parallel. The "step1" and "step2" events # ensure that the two transactions overlap. thread1 = threading.Thread(target=task1) thread2 = threading.Thread(target=task2) thread1.start() thread2.start() thread1.join() thread2.join() # Exactly one of the threads should have failed with # TransactionRollbackError: self.assertFalse(self.thread1_error and self.thread2_error) error = self.thread1_error or self.thread2_error self.assertTrue(isinstance( error, psycopg2.extensions.TransactionRollbackError)) @slow def test_serialisation_failure(self): self.thread1_error = self.thread2_error = None step1 = threading.Event() step2 = threading.Event() def task1(): try: conn = self.connect() curs = conn.cursor() curs.execute("SELECT name FROM table1 WHERE id = 1") curs.fetchall() step1.set() step2.wait() curs.execute("UPDATE table1 SET name='task1' WHERE id = 1") conn.commit() except psycopg2.DatabaseError as exc: self.thread1_error = exc step1.set() conn.close() def task2(): try: conn = self.connect() curs = conn.cursor() step1.wait() curs.execute("UPDATE table1 SET name='task2' WHERE id = 1") conn.commit() except psycopg2.DatabaseError as exc: self.thread2_error = exc step2.set() conn.close() # Run the threads in parallel. The "step1" and "step2" events # ensure that the two transactions overlap. thread1 = threading.Thread(target=task1) thread2 = threading.Thread(target=task2) thread1.start() thread2.start() thread1.join() thread2.join() # Exactly one of the threads should have failed with # TransactionRollbackError: self.assertFalse(self.thread1_error and self.thread2_error) error = self.thread1_error or self.thread2_error self.assertTrue(isinstance( error, psycopg2.extensions.TransactionRollbackError)) class QueryCancellationTests(ConnectingTestCase): """Tests for query cancellation.""" def setUp(self): ConnectingTestCase.setUp(self) self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) @skip_before_postgres(8, 2) def test_statement_timeout(self): curs = self.conn.cursor() # Set a low statement timeout, then sleep for a longer period. curs.execute('SET statement_timeout TO 10') self.assertRaises(psycopg2.extensions.QueryCanceledError, curs.execute, 'SELECT pg_sleep(50)') def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) if __name__ == "__main__": unittest.main()