fix test on windows

This commit is contained in:
Richard Bann 2019-06-18 22:26:00 +02:00
parent 52fc8bbeda
commit ff64a8b409

View File

@ -35,8 +35,8 @@ import subprocess as sp
from collections import deque
from operator import attrgetter
from weakref import ref
import multiprocessing
import signal
import platform
import psycopg2
import psycopg2.extras
@ -45,7 +45,7 @@ from psycopg2 import extensions as ext
from .testutils import (
PY2, unittest, skip_if_no_superuser, skip_before_postgres,
skip_after_postgres, skip_before_libpq, skip_after_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow, green)
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
from .testconfig import dbhost, dsn, dbname
@ -412,28 +412,31 @@ t.join()
@slow
def test_handles_keyboardinterrupt(self):
def conn(queue):
host = "10.255.255.1" # will timeout
queue.put(1)
try:
self.connect(host=host, password="x", connect_timeout=1)
except KeyboardInterrupt:
queue.put("KeyboardInterrupt")
except Exception as e:
queue.put(str(e))
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=conn, args=(queue,))
process.start()
queue.get()
time.sleep(0.5)
os.kill(process.pid, signal.SIGINT)
raised = queue.get()
process.join()
if green:
self.assertEqual(raised, "asynchronous connection attempt underway")
script = """\
import psycopg2
host = "10.255.255.1" # will timeout
try:
psycopg2.connect(host=host, password="x", connect_timeout=1)
except KeyboardInterrupt:
print("KeyboardInterrupt")
except Exception as e:
print(str(e))
"""
if platform.system() == 'Windows':
proc = sp.Popen([sys.executable, '-c', script], stdout=sp.PIPE,
universal_newlines=True,
creationflags=sp.CREATE_NEW_PROCESS_GROUP)
time.sleep(0.5)
proc.send_signal(signal.CTRL_C_EVENT)
else:
self.assertEqual(raised, "KeyboardInterrupt")
proc = sp.Popen([sys.executable, '-c', script], stdout=sp.PIPE,
universal_newlines=True)
time.sleep(0.5)
proc.send_signal(signal.SIGINT)
proc.wait()
out = proc.stdout.read().strip()
proc.stdout.close()
self.assertEqual(out, "KeyboardInterrupt")
class ParseDsnTestCase(ConnectingTestCase):