From ff64a8b4094acd48cdf1cf5deef3ba7509266a02 Mon Sep 17 00:00:00 2001 From: Richard Bann Date: Tue, 18 Jun 2019 22:26:00 +0200 Subject: [PATCH] fix test on windows --- tests/test_connection.py | 49 +++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/tests/test_connection.py b/tests/test_connection.py index 85459e4b..04c7293c 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -35,8 +35,8 @@ import subprocess as sp from collections import deque from operator import attrgetter from weakref import ref -import multiprocessing import signal +import platform import psycopg2 import psycopg2.extras @@ -45,7 +45,7 @@ from psycopg2 import extensions as ext from .testutils import ( PY2, unittest, skip_if_no_superuser, skip_before_postgres, skip_after_postgres, skip_before_libpq, skip_after_libpq, - ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow, green) + ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) from .testconfig import dbhost, dsn, dbname @@ -412,28 +412,31 @@ t.join() @slow def test_handles_keyboardinterrupt(self): - def conn(queue): - host = "10.255.255.1" # will timeout - queue.put(1) - try: - self.connect(host=host, password="x", connect_timeout=1) - except KeyboardInterrupt: - queue.put("KeyboardInterrupt") - except Exception as e: - queue.put(str(e)) - - queue = multiprocessing.Queue() - process = multiprocessing.Process(target=conn, args=(queue,)) - process.start() - queue.get() - time.sleep(0.5) - os.kill(process.pid, signal.SIGINT) - raised = queue.get() - process.join() - if green: - self.assertEqual(raised, "asynchronous connection attempt underway") + script = """\ +import psycopg2 +host = "10.255.255.1" # will timeout +try: + psycopg2.connect(host=host, password="x", connect_timeout=1) +except KeyboardInterrupt: + print("KeyboardInterrupt") +except Exception as e: + print(str(e)) +""" + if platform.system() == 'Windows': + proc = sp.Popen([sys.executable, '-c', script], stdout=sp.PIPE, + universal_newlines=True, + creationflags=sp.CREATE_NEW_PROCESS_GROUP) + time.sleep(0.5) + proc.send_signal(signal.CTRL_C_EVENT) else: - self.assertEqual(raised, "KeyboardInterrupt") + proc = sp.Popen([sys.executable, '-c', script], stdout=sp.PIPE, + universal_newlines=True) + time.sleep(0.5) + proc.send_signal(signal.SIGINT) + proc.wait() + out = proc.stdout.read().strip() + proc.stdout.close() + self.assertEqual(out, "KeyboardInterrupt") class ParseDsnTestCase(ConnectingTestCase):