mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-08-04 12:20:09 +03:00
Merge 5920d4f25d
into 7a1d1791d3
This commit is contained in:
commit
0ea07491cf
|
@ -107,17 +107,17 @@ class DB(TM, dbi_db.DB):
|
||||||
def make_mappings(self):
|
def make_mappings(self):
|
||||||
"""Generate the mappings used later by self.convert_description()."""
|
"""Generate the mappings used later by self.convert_description()."""
|
||||||
self.type_mappings = {}
|
self.type_mappings = {}
|
||||||
for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'),
|
for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'),
|
||||||
(BOOLEAN,'n'), (ROWID, 'i'),
|
(BOOLEAN,'n'), (ROWID, 'i'),
|
||||||
(DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]:
|
(DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]:
|
||||||
for v in t.values:
|
for v in t.values:
|
||||||
self.type_mappings[v] = (t, s)
|
self.type_mappings[v] = (t, s)
|
||||||
|
|
||||||
def convert_description(self, desc, use_psycopg_types=False):
|
def convert_description(self, desc, use_psycopg_types=False):
|
||||||
"""Convert DBAPI-2.0 description field to Zope format."""
|
"""Convert DBAPI-2.0 description field to Zope format."""
|
||||||
items = []
|
items = []
|
||||||
for name, typ, width, ds, p, scale, null_ok in desc:
|
for name, typ, width, ds, p, scale, null_ok in desc:
|
||||||
m = self.type_mappings.get(typ, (STRING, 's'))
|
m = self.type_mappings.get(typ, (STRING, 's'))
|
||||||
items.append({
|
items.append({
|
||||||
'name': name,
|
'name': name,
|
||||||
'type': use_psycopg_types and m[0] or m[1],
|
'type': use_psycopg_types and m[0] or m[1],
|
||||||
|
|
42
ZPsycopgDA/test_da.py
Normal file
42
ZPsycopgDA/test_da.py
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
# zopectl run script to test the DA/threading behavior
|
||||||
|
#
|
||||||
|
# Usage: bin/zopectl run test_da.py "dbname=xxx"
|
||||||
|
#
|
||||||
|
from Products.ZPsycopgDA.DA import ZDATETIME
|
||||||
|
from Products.ZPsycopgDA.db import DB
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
dsn = sys.argv[1]
|
||||||
|
|
||||||
|
|
||||||
|
typecasts = [ZDATETIME]
|
||||||
|
|
||||||
|
|
||||||
|
def DA_connect():
|
||||||
|
db = DB(dsn, tilevel=2, typecasts=typecasts)
|
||||||
|
db.open()
|
||||||
|
return db
|
||||||
|
|
||||||
|
|
||||||
|
def assert_casts(conn, name):
|
||||||
|
connection = conn.getcursor().connection
|
||||||
|
if (connection.string_types ==
|
||||||
|
{1114: ZDATETIME, 1184: ZDATETIME}):
|
||||||
|
print '%s pass\n' % name
|
||||||
|
else:
|
||||||
|
print '%s fail (%s)\n' % (name, connection.string_types)
|
||||||
|
|
||||||
|
|
||||||
|
def test_connect(name):
|
||||||
|
assert_casts(conn1, name)
|
||||||
|
|
||||||
|
|
||||||
|
conn1 = DA_connect()
|
||||||
|
t1 = threading.Thread(target=test_connect, args=('t1',))
|
||||||
|
t1.start()
|
||||||
|
t2 = threading.Thread(target=test_connect, args=('t2',))
|
||||||
|
t2.start()
|
||||||
|
t1.join()
|
||||||
|
t2.join()
|
Loading…
Reference in New Issue
Block a user