Dropped Zope support

This commit is contained in:
Daniele Varrazzo 2012-12-02 16:00:31 +00:00
parent dcbbaa76d6
commit 8e08aeb690
33 changed files with 2 additions and 1821 deletions

1
NEWS
View File

@ -11,6 +11,7 @@ What's new in psycopg 2.5
- connection.reset() implemented using DISCARD ALL on server versions - connection.reset() implemented using DISCARD ALL on server versions
supporting it. supporting it.
- 'errorcodes' map updated to PostgreSQL 9.2. - 'errorcodes' map updated to PostgreSQL 9.2.
- Dropped support for Zope. There is now an external ZPsycopgDA project.
What's new in psycopg 2.4.5 What's new in psycopg 2.4.5

View File

@ -1,360 +0,0 @@
# ZPsycopgDA/DA.py - ZPsycopgDA Zope product: Database Connection
#
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# Import modules needed by _psycopg to allow tools like py2exe to do
# their work without bothering about the module dependencies.
ALLOWED_PSYCOPG_VERSIONS = ('2.4-beta1', '2.4-beta2', '2.4', '2.4.1', '2.4.2', '2.4.3', '2.4.4', '2.4.5')
import sys
import time
import db
import re
import Acquisition
import Shared.DC.ZRDB.Connection
from db import DB
from Globals import HTMLFile
from ExtensionClass import Base
from App.Dialogs import MessageDialog
from DateTime import DateTime
# ImageFile is deprecated in Zope >= 2.9
try:
from App.ImageFile import ImageFile
except ImportError:
# Zope < 2.9. If PIL's installed with a .pth file, we're probably
# hosed.
from ImageFile import ImageFile
# import psycopg and functions/singletons needed for date/time conversions
import psycopg2
from psycopg2 import NUMBER, STRING, ROWID, DATETIME
from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE
from psycopg2.extensions import TIME, INTERVAL
from psycopg2.extensions import new_type, register_type
# add a new connection to a folder
manage_addZPsycopgConnectionForm = HTMLFile('dtml/add',globals())
def manage_addZPsycopgConnection(self, id, title, connection_string,
zdatetime=None, tilevel=2,
encoding='', check=None, REQUEST=None):
"""Add a DB connection to a folder."""
self._setObject(id, Connection(id, title, connection_string,
zdatetime, check, tilevel, encoding))
if REQUEST is not None: return self.manage_main(self, REQUEST)
# the connection object
class Connection(Shared.DC.ZRDB.Connection.Connection):
"""ZPsycopg Connection."""
_isAnSQLConnection = 1
id = 'Psycopg2_database_connection'
database_type = 'Psycopg2'
meta_type = title = 'Z Psycopg 2 Database Connection'
icon = 'misc_/conn'
def __init__(self, id, title, connection_string,
zdatetime, check=None, tilevel=2, encoding='UTF-8'):
self.zdatetime = zdatetime
self.id = str(id)
self.edit(title, connection_string, zdatetime,
check=check, tilevel=tilevel, encoding=encoding)
def factory(self):
return DB
## connection parameters editing ##
def edit(self, title, connection_string,
zdatetime, check=None, tilevel=2, encoding='UTF-8'):
self.title = title
self.connection_string = connection_string
self.zdatetime = zdatetime
self.tilevel = tilevel
self.encoding = encoding
if check: self.connect(self.connection_string)
manage_properties = HTMLFile('dtml/edit', globals())
def manage_edit(self, title, connection_string,
zdatetime=None, check=None, tilevel=2, encoding='UTF-8',
REQUEST=None):
"""Edit the DB connection."""
self.edit(title, connection_string, zdatetime,
check=check, tilevel=tilevel, encoding=encoding)
if REQUEST is not None:
msg = "Connection edited."
return self.manage_main(self,REQUEST,manage_tabs_message=msg)
def connect(self, s):
try:
self._v_database_connection.close()
except:
pass
# check psycopg version and raise exception if does not match
if psycopg2.__version__.split(' ')[0] not in ALLOWED_PSYCOPG_VERSIONS:
raise ImportError("psycopg version mismatch (imported %s)" %
psycopg2.__version__)
self._v_connected = ''
dbf = self.factory()
# TODO: let the psycopg exception propagate, or not?
self._v_database_connection = dbf(
self.connection_string, self.tilevel, self.get_type_casts(), self.encoding)
self._v_database_connection.open()
self._v_connected = DateTime()
return self
def get_type_casts(self):
# note that in both cases order *is* important
if self.zdatetime:
return ZDATETIME, ZDATE, ZTIME
else:
return DATETIME, DATE, TIME
## browsing and table/column management ##
manage_options = Shared.DC.ZRDB.Connection.Connection.manage_options
# + (
# {'label': 'Browse', 'action':'manage_browse'},)
#manage_tables = HTMLFile('dtml/tables', globals())
#manage_browse = HTMLFile('dtml/browse', globals())
info = None
def table_info(self):
return self._v_database_connection.table_info()
def __getitem__(self, name):
if name == 'tableNamed':
if not hasattr(self, '_v_tables'): self.tpValues()
return self._v_tables.__of__(self)
raise KeyError, name
def tpValues(self):
res = []
conn = self._v_database_connection
for d in conn.tables(rdb=0):
try:
name = d['TABLE_NAME']
b = TableBrowser()
b.__name__ = name
b._d = d
b._c = c
try:
b.icon = table_icons[d['TABLE_TYPE']]
except:
pass
r.append(b)
except:
pass
return res
## database connection registration data ##
classes = (Connection,)
meta_types = ({'name':'Z Psycopg 2 Database Connection',
'action':'manage_addZPsycopgConnectionForm'},)
folder_methods = {
'manage_addZPsycopgConnection': manage_addZPsycopgConnection,
'manage_addZPsycopgConnectionForm': manage_addZPsycopgConnectionForm}
__ac_permissions__ = (
('Add Z Psycopg Database Connections',
('manage_addZPsycopgConnectionForm', 'manage_addZPsycopgConnection')),)
# add icons
misc_={'conn': ImageFile('icons/DBAdapterFolder_icon.gif', globals())}
for icon in ('table', 'view', 'stable', 'what', 'field', 'text', 'bin',
'int', 'float', 'date', 'time', 'datetime'):
misc_[icon] = ImageFile('icons/%s.gif' % icon, globals())
## zope-specific psycopg typecasters ##
# convert an ISO timestamp string from postgres to a Zope DateTime object
def _cast_DateTime(iso, curs):
if iso:
if iso in ['-infinity', 'infinity']:
return iso
else:
return DateTime(iso)
# convert an ISO date string from postgres to a Zope DateTime object
def _cast_Date(iso, curs):
if iso:
if iso in ['-infinity', 'infinity']:
return iso
else:
return DateTime(iso)
# Convert a time string from postgres to a Zope DateTime object.
# NOTE: we set the day as today before feeding to DateTime so
# that it has the same DST settings.
def _cast_Time(iso, curs):
if iso:
if iso in ['-infinity', 'infinity']:
return iso
else:
return DateTime(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())[:3]+
time.strptime(iso[:8], "%H:%M:%S")[3:]))
# NOTE: we don't cast intervals anymore because they are passed
# untouched to Zope.
def _cast_Interval(iso, curs):
return iso
ZDATETIME = new_type((1184, 1114), "ZDATETIME", _cast_DateTime)
ZINTERVAL = new_type((1186,), "ZINTERVAL", _cast_Interval)
ZDATE = new_type((1082,), "ZDATE", _cast_Date)
ZTIME = new_type((1083,), "ZTIME", _cast_Time)
## table browsing helpers ##
class TableBrowserCollection(Acquisition.Implicit):
pass
class Browser(Base):
def __getattr__(self, name):
try:
return self._d[name]
except KeyError:
raise AttributeError, name
class values:
def len(self):
return 1
def __getitem__(self, i):
try:
return self._d[i]
except AttributeError:
pass
self._d = self._f()
return self._d[i]
class TableBrowser(Browser, Acquisition.Implicit):
icon = 'what'
Description = check = ''
info = HTMLFile('table_info', globals())
menu = HTMLFile('table_menu', globals())
def tpValues(self):
v = values()
v._f = self.tpValues_
return v
def tpValues_(self):
r=[]
tname=self.__name__
for d in self._c.columns(tname):
b=ColumnBrowser()
b._d=d
try: b.icon=field_icons[d['Type']]
except: pass
b.TABLE_NAME=tname
r.append(b)
return r
def tpId(self): return self._d['TABLE_NAME']
def tpURL(self): return "Table/%s" % self._d['TABLE_NAME']
def Name(self): return self._d['TABLE_NAME']
def Type(self): return self._d['TABLE_TYPE']
manage_designInput=HTMLFile('designInput',globals())
def manage_buildInput(self, id, source, default, REQUEST=None):
"Create a database method for an input form"
args=[]
values=[]
names=[]
columns=self._columns
for i in range(len(source)):
s=source[i]
if s=='Null': continue
c=columns[i]
d=default[i]
t=c['Type']
n=c['Name']
names.append(n)
if s=='Argument':
values.append("<dtml-sqlvar %s type=%s>'" %
(n, vartype(t)))
a='%s%s' % (n, boboType(t))
if d: a="%s=%s" % (a,d)
args.append(a)
elif s=='Property':
values.append("<dtml-sqlvar %s type=%s>'" %
(n, vartype(t)))
else:
if isStringType(t):
if find(d,"\'") >= 0: d=join(split(d,"\'"),"''")
values.append("'%s'" % d)
elif d:
values.append(str(d))
else:
raise ValueError, (
'no default was given for <em>%s</em>' % n)
class ColumnBrowser(Browser):
icon='field'
def check(self):
return ('\t<input type=checkbox name="%s.%s">' %
(self.TABLE_NAME, self._d['Name']))
def tpId(self): return self._d['Name']
def tpURL(self): return "Column/%s" % self._d['Name']
def Description(self):
d=self._d
if d['Scale']:
return " %(Type)s(%(Precision)s,%(Scale)s) %(Nullable)s" % d
else:
return " %(Type)s(%(Precision)s) %(Nullable)s" % d
table_icons={
'TABLE': 'table',
'VIEW':'view',
'SYSTEM_TABLE': 'stable',
}
field_icons={
NUMBER.name: 'i',
STRING.name: 'text',
DATETIME.name: 'date',
INTEGER.name: 'int',
FLOAT.name: 'float',
BOOLEAN.name: 'bin',
ROWID.name: 'int'
}

View File

@ -1,29 +0,0 @@
# ZPsycopgDA/__init__.py - ZPsycopgDA Zope product
#
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# Import modules needed by _psycopg to allow tools like py2exe to do
# their work without bothering about the module dependencies.
__doc__ = "ZPsycopg Database Adapter Registration."
__version__ = '2.0'
import DA
def initialize(context):
context.registerClass(
DA.Connection,
permission = 'Add Z Psycopg 2 Database Connections',
constructors = (DA.manage_addZPsycopgConnectionForm,
DA.manage_addZPsycopgConnection),
icon = 'icons/DBAdapterFolder_icon.gif')

View File

@ -1,204 +0,0 @@
# ZPsycopgDA/db.py - query execution
#
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# Import modules needed by _psycopg to allow tools like py2exe to do
# their work without bothering about the module dependencies.
from Shared.DC.ZRDB.TM import TM
from Shared.DC.ZRDB import dbi_db
from ZODB.POSException import ConflictError
import site
import pool
import psycopg2
from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE, TIME
from psycopg2.extensions import TransactionRollbackError, register_type
from psycopg2 import NUMBER, STRING, ROWID, DATETIME
# the DB object, managing all the real query work
class DB(TM, dbi_db.DB):
_p_oid = _p_changed = _registered = None
def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
self.dsn = dsn
self.tilevel = tilevel
self.typecasts = typecasts
if enc is None or enc == "":
self.encoding = "utf-8"
else:
self.encoding = enc
self.failures = 0
self.calls = 0
self.make_mappings()
def getconn(self, init=True):
# if init is False we are trying to get hold on an already existing
# connection, so we avoid to (re)initialize it risking errors.
conn = pool.getconn(self.dsn)
if init:
conn.set_isolation_level(int(self.tilevel))
conn.set_client_encoding(self.encoding)
for tc in self.typecasts:
register_type(tc, conn)
return conn
def putconn(self, close=False):
try:
conn = pool.getconn(self.dsn, False)
except AttributeError:
pass
pool.putconn(self.dsn, conn, close)
def getcursor(self):
conn = self.getconn()
return conn.cursor()
def _finish(self, *ignored):
try:
conn = self.getconn(False)
conn.commit()
self.putconn()
except AttributeError:
pass
def _abort(self, *ignored):
try:
conn = self.getconn(False)
conn.rollback()
self.putconn()
except AttributeError:
pass
def open(self):
# this will create a new pool for our DSN if not already existing,
# then get and immediately release a connection
self.getconn()
self.putconn()
def close(self):
# FIXME: if this connection is closed we flush all the pool associated
# with the current DSN; does this makes sense?
pool.flushpool(self.dsn)
def sortKey(self):
return 1
def make_mappings(self):
"""Generate the mappings used later by self.convert_description()."""
self.type_mappings = {}
for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'),
(BOOLEAN,'n'), (ROWID, 'i'),
(DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]:
for v in t.values:
self.type_mappings[v] = (t, s)
def convert_description(self, desc, use_psycopg_types=False):
"""Convert DBAPI-2.0 description field to Zope format."""
items = []
for name, typ, width, ds, p, scale, null_ok in desc:
m = self.type_mappings.get(typ, (STRING, 's'))
items.append({
'name': name,
'type': use_psycopg_types and m[0] or m[1],
'width': width,
'precision': p,
'scale': scale,
'null': null_ok,
})
return items
## tables and rows ##
def tables(self, rdb=0, _care=('TABLE', 'VIEW')):
self._register()
c = self.getcursor()
c.execute(
"SELECT t.tablename AS NAME, 'TABLE' AS TYPE "
" FROM pg_tables t WHERE tableowner <> 'postgres' "
"UNION SELECT v.viewname AS NAME, 'VIEW' AS TYPE "
" FROM pg_views v WHERE viewowner <> 'postgres' "
"UNION SELECT t.tablename AS NAME, 'SYSTEM_TABLE\' AS TYPE "
" FROM pg_tables t WHERE tableowner = 'postgres' "
"UNION SELECT v.viewname AS NAME, 'SYSTEM_TABLE' AS TYPE "
"FROM pg_views v WHERE viewowner = 'postgres'")
res = []
for name, typ in c.fetchall():
if typ in _care:
res.append({'TABLE_NAME': name, 'TABLE_TYPE': typ})
self.putconn()
return res
def columns(self, table_name):
self._register()
c = self.getcursor()
try:
r = c.execute('SELECT * FROM "%s" WHERE 1=0' % table_name)
except:
return ()
self.putconn()
return self.convert_description(c.description, True)
## query execution ##
def query(self, query_string, max_rows=None, query_data=None):
self._register()
self.calls = self.calls+1
desc = ()
res = []
nselects = 0
c = self.getcursor()
try:
for qs in [x for x in query_string.split('\0') if x]:
try:
if query_data:
c.execute(qs, query_data)
else:
c.execute(qs)
except TransactionRollbackError:
# Ha, here we have to look like we are the ZODB raising conflict errrors, raising ZPublisher.Publish.Retry just doesn't work
#logging.debug("Serialization Error, retrying transaction", exc_info=True)
raise ConflictError("TransactionRollbackError from psycopg2")
except psycopg2.OperationalError:
#logging.exception("Operational error on connection, closing it.")
try:
# Only close our connection
self.putconn(True)
except:
#logging.debug("Something went wrong when we tried to close the pool", exc_info=True)
pass
if c.description is not None:
nselects += 1
if c.description != desc and nselects > 1:
raise psycopg2.ProgrammingError(
'multiple selects in single query not allowed')
if max_rows:
res = c.fetchmany(max_rows)
else:
res = c.fetchall()
desc = c.description
self.failures = 0
except StandardError, err:
self._abort()
raise err
return self.convert_description(desc), res

View File

@ -1,106 +0,0 @@
<dtml-var manage_page_header>
<dtml-var "manage_form_title(this(), _,
form_title='Add Z Psycopg 2 Database Connection',
help_product='ZPsycopgDA',
help_topic='ZPsycopgDA-Method-Add.stx'
)">
<p class="form-help">
A Zope Psycopg 2 Database Connection is used to connect and execute
queries on a PostgreSQL database.
</p>
<p class="form-help">
In the form below <em>Connection String</em> (also called the Data Source Name
or DSN for short) is a string... (TODO: finish docs)
</p>
<form action="manage_addZPsycopgConnection" method="POST">
<table cellspacing="0" cellpadding="2" border="0">
<tr>
<td align="left" valign="top">
<div class="form-label">
Id
</div>
</td>
<td align="left" valign="top">
<input type="text" name="id" size="40"
value="Psycopg2_database_connection" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Title
</div>
</td>
<td align="left" valign="top">
<input type="text" name="title" size="40"
value="Z Psycopg 2 Database Connection"/>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connection string
</div>
</td>
<td align="left" valign="top">
<input type="text" name="connection_string" size="40" value="" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connect immediately
</div>
</td>
<td align="left" valign="top">
<input type="checkbox" name="check" value="YES" checked="YES" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Use Zope's internal DateTime
</div>
</td>
<td align="left" valign="top">
<input type="checkbox" name="zdatetime" value="YES" checked="YES" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Transaction isolation level
</div>
</td>
<td align="left" valign="top">
<select name="tilevel:int">
<option value="1">Read committed</option>
<option value="2" selected="YES">Serializable</option>
</select>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Encoding
</div>
</td>
<td align="left" valign="top">
<input type="text" name="encoding" size="40" value="" />
</td>
</tr>
<tr>
<td align="left" valign="top" colspan="2">
<div class="form-element">
<input class="form-element" type="submit" name="submit" value=" Add " />
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>

View File

@ -1,11 +0,0 @@
<html>
<head><title><dtml-var title_or_id >tables</title></head>
<body bgcolor="#FFFFFF" link="#000099" vlink="#555555" alink="#77003B">
<dtml-var manage_tabs>
<dtml-tree header="info">
<IMG SRC="<dtml-var SCRIPT_NAME >/misc_/ZPsycopgDA/<dtml-var icon>"
ALT="<dtml-var Type>" BORDER="0">
<dtml-var Name><dtml-var Description>
</dtml-tree>
</body>
</html>

View File

@ -1,78 +0,0 @@
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<form action="manage_edit" method="POST">
<table cellspacing="0" cellpadding="2" border="0">
<tr>
<td align="left" valign="top">
<div class="form-optional">
Title
</div>
</td>
<td align="left" valign="top">
<input type="text" name="title" size="40"
value="&dtml-title;"/>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connection string
</div>
</td>
<td align="left" valign="top">
<input type="text" name="connection_string" size="40"
value="&dtml-connection_string;" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Use Zope's internal DateTime
</div>
</td>
<td align="left" valign="top">
<input type="checkbox" name="zdatetime" value="YES"
<dtml-if expr="zdatetime">checked="YES"</dtml-if> />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Transaction isolation level
</div>
</td>
<td align="left" valign="top">
<select name="tilevel:int">
<option value="1"
<dtml-if expr="tilevel==1">selected="YES"</dtml-if>>
Read committed</option>
<option value="2"
<dtml-if expr="tilevel==2">selected="YES"</dtml-if>>
Serializable</option>
</select>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Encoding
</div>
</td>
<td align="left" valign="top">
<input type="text" name="encoding" size="40"
value="&dtml-encoding;" />
</td>
</tr>
<tr>
<td align="left" valign="top" colspan="2">
<div class="form-element">
<input class="form-element" type="submit" name="submit"
value=" Save Changes " />
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>

View File

@ -1,7 +0,0 @@
<dtml-var standard_html_header>
<dtml-var TABLE_TYPE><dtml-if TABLE_OWNER>
owned by <dtml-var TABLE_OWNER></dtml-if>
<dtml-if REMARKS><br><dtml-var REMARKS></dtml-if>
<dtml-var standard_html_footer>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 897 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 924 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 930 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 925 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 915 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 929 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 918 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 884 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 878 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 918 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 926 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 893 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 894 B

View File

@ -1,49 +0,0 @@
# ZPsycopgDA/pool.py - ZPsycopgDA Zope product: connection pooling
#
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# Import modules needed by _psycopg to allow tools like py2exe to do
# their work without bothering about the module dependencies.
# All the connections are held in a pool of pools, directly accessible by the
# ZPsycopgDA code in db.py.
import threading
import psycopg2.pool
_connections_pool = {}
_connections_lock = threading.Lock()
def getpool(dsn, create=True):
_connections_lock.acquire()
try:
if not _connections_pool.has_key(dsn) and create:
_connections_pool[dsn] = \
psycopg2.pool.PersistentConnectionPool(4, 200, dsn)
finally:
_connections_lock.release()
return _connections_pool[dsn]
def flushpool(dsn):
_connections_lock.acquire()
try:
_connections_pool[dsn].closeall()
del _connections_pool[dsn]
finally:
_connections_lock.release()
def getconn(dsn, create=True):
return getpool(dsn, create=create).getconn()
def putconn(dsn, conn, close=False):
getpool(dsn).putconn(conn, close=close)

View File

@ -9,8 +9,7 @@ programming language. Its main features are the complete implementation of
the Python |DBAPI|_ specification and the thread safety (several threads can the Python |DBAPI|_ specification and the thread safety (several threads can
share the same connection). It was designed for heavily multi-threaded share the same connection). It was designed for heavily multi-threaded
applications that create and destroy lots of cursors and make a large number applications that create and destroy lots of cursors and make a large number
of concurrent :sql:`INSERT`\s or :sql:`UPDATE`\s. The Psycopg distribution of concurrent :sql:`INSERT`\s or :sql:`UPDATE`\s.
includes ZPsycopgDA, a Zope_ Database Adapter.
Psycopg 2 is mostly implemented in C as a libpq_ wrapper, resulting in being Psycopg 2 is mostly implemented in C as a libpq_ wrapper, resulting in being
both efficient and secure. It features client-side and :ref:`server-side both efficient and secure. It features client-side and :ref:`server-side
@ -33,7 +32,6 @@ Psycopg 2 is both Unicode and Python 3 friendly.
.. _Psycopg: http://initd.org/psycopg/ .. _Psycopg: http://initd.org/psycopg/
.. _PostgreSQL: http://www.postgresql.org/ .. _PostgreSQL: http://www.postgresql.org/
.. _Python: http://www.python.org/ .. _Python: http://www.python.org/
.. _Zope: http://www.zope.org/
.. _libpq: http://www.postgresql.org/docs/current/static/libpq.html .. _libpq: http://www.postgresql.org/docs/current/static/libpq.html
.. |COPY-TO-FROM| replace:: :sql:`COPY TO/COPY FROM` .. |COPY-TO-FROM| replace:: :sql:`COPY TO/COPY FROM`
.. __: http://www.postgresql.org/docs/current/static/sql-copy.html .. __: http://www.postgresql.org/docs/current/static/sql-copy.html

View File

@ -1,2 +0,0 @@
psycopg2
zope.app

View File

@ -1,24 +0,0 @@
# Load the license from an external source, so we don't have to keep a
# copy of it sitting around:
<load>
LICENSE.txt http://svn.zope.org/*checkout*/Zope3/trunk/ZopePublicLicense.txt?rev=25177
</load>
# Add a few things to the distribution root.
<distribution>
README.txt
</distribution>
# Specify what is included in the component.
<collection>
# Documentation files of the package:
*.txt
# Python modules from the package:
*.py
# Configuration files of the package:
*.zcml
</collection>

View File

@ -1,9 +0,0 @@
Metadata-Version: 1.0
Name: psycopg2da
Summary: Psycopg2 Database Adapter for Zope 3
Author: Fabio Tranchitella
Author-email: kobold@debian.org
License: ZPL 2.1
Description:
This package allows Zope 3 to connect to any PostGreSQL database via
the common Zope 3 RDB connection facilities.

View File

@ -1,79 +0,0 @@
==========
psycopg2da
==========
This file outlines the basics of using Zope3 with PostgreSQL via PsycopgDA.
Installing PsycopgDA
--------------------
1. Check out the psycopg2da package into a directory in your
PYTHONPATH. INSTANCE_HOME/lib/python or Zope3/src is usually the
most convenient place:
svn co svn://svn.zope.org/repos/main/psycopg2da/trunk psycopg2da
2. Copy `psycopg2-configure.zcml` to the `package-includes` directory
of your Zope instance.
Creating Database Connections
------------------------------
It is time to add some connections. A connection in Zope 3 is
registered as a utility.
3. Open a web browser on your Zope root folder (http://localhost:8080/
if you use the default settings in zope.conf.in).
4. Click on the 'Manage Site' action on the right side of the
screen. You should see a screen which reads 'Common Site Management
Tasks'
5. Around the middle of that page, you should see a link named 'Add
Utility'. Click on it.
6. Select 'Psycopg DA' and type in a name at the bottom of the page.
7. Enter the database connection string. It looks like this:
dbi://username:password@host:port/databasename
8. Click on the 'Add' button.
9. You should be on a page which reads 'Add Database Connection
Registration'. There you can configure the permission needed to use
the database connection, the name of the registration and the
registration status. You can use any name for 'Register As' field,
as long as it doesn't clash with an existing one. Choose a
permission. Choose between 'Registered' and 'Active' for the
'Registration Status'. Only one component of a kind can be 'Active'
at a time, so be careful.
10. You should be redirected to the 'Edit' screen of the connection
utility.
11. If you want to, you can go to the Test page and execute arbitrary
SQL queries to see whether the connection is working as expected.
Using SQL Scripts
-----------------
You can create SQL Scripts in the content space. For example:
12. Go to Zope root.
13. Add an SQL script (you can use the Common Tasks box on the left,
or the Add action on the right).
14. Click on the name of your new SQL script.
15. Choose a connection name (the one you entered in step 29) from the
drop-down.
16. Enter your query and click on the 'Save Changes' button.
17. You can test the script in the -- surprise! -- Test page.

View File

@ -1 +0,0 @@
# empty file

View File

@ -1,408 +0,0 @@
# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
#
# psycopg2da is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2da is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# Based on ZPsycopgDA.
#
# If you prefer you can use this package using the ZPL license as
# published on the Zope web site, http://www.zope.org/Resources/ZPL.
"""PostgreSQL Database Adapter for Zope 3"""
from zope.interface import implements
from zope.rdb import ZopeDatabaseAdapter, parseDSN, ZopeConnection, ZopeCursor
from zope.rdb.interfaces import DatabaseException, IZopeConnection
from zope.publisher.interfaces import Retry
from datetime import date, time, datetime, timedelta
import psycopg2
import psycopg2.extensions
import re
import sys
# OIDs from psycopg/pgtypes.h
DATE_OID = 1082
TIME_OID = 1083
TIMETZ_OID = 1266
TIMESTAMP_OID = 1114
TIMESTAMPTZ_OID = 1184
INTERVAL_OID = 1186
CHAR_OID = 18
TEXT_OID = 25
BPCHAR_OID = 1042
VARCHAR_OID = 1043
# date/time parsing functions
_dateFmt = re.compile(r"^(\d\d\d\d)-?([01]\d)-?([0-3]\d)$")
def parse_date(s):
"""Parses ISO-8601 compliant dates and returns a tuple (year, month,
day).
The following formats are accepted:
YYYY-MM-DD (extended format)
YYYYMMDD (basic format)
"""
m = _dateFmt.match(s)
if m is None:
raise ValueError, 'invalid date string: %s' % s
year, month, day = m.groups()
return int(year), int(month), int(day)
_timeFmt = re.compile(
r"^([0-2]\d)(?::?([0-5]\d)(?::?([0-5]\d)(?:[.,](\d+))?)?)?$")
def parse_time(s):
"""Parses ISO-8601 compliant times and returns a tuple (hour, minute,
second).
The following formats are accepted:
HH:MM:SS.ssss or HHMMSS.ssss
HH:MM:SS,ssss or HHMMSS,ssss
HH:MM:SS or HHMMSS
HH:MM or HHMM
HH
"""
m = _timeFmt.match(s)
if m is None:
raise ValueError, 'invalid time string: %s' % s
hr, mn, sc, msc = m.groups(0)
if msc != 0:
sc = float("%s.%s" % (sc, msc))
else:
sc = int(sc)
return int(hr), int(mn), sc
_tzFmt = re.compile(r"^([+-])([0-2]\d)(?::?([0-5]\d))?$")
def parse_tz(s):
"""Parses ISO-8601 timezones and returns the offset east of UTC in
minutes.
The following formats are accepted:
+/-HH:MM
+/-HHMM
+/-HH
Z (equivalent to +0000)
"""
if s == 'Z':
return 0
m = _tzFmt.match(s)
if m is None:
raise ValueError, 'invalid time zone: %s' % s
d, hoff, moff = m.groups(0)
if d == "-":
return - int(hoff) * 60 - int(moff)
return int(hoff) * 60 + int(moff)
_tzPos = re.compile(r"[Z+-]")
def parse_timetz(s):
"""Parses ISO-8601 compliant times that may include timezone information
and returns a tuple (hour, minute, second, tzoffset).
tzoffset is the offset east of UTC in minutes. It will be None if s does
not include time zone information.
Formats accepted are those listed in the descriptions of parse_time() and
parse_tz(). Time zone should immediatelly follow time without intervening
spaces.
"""
m = _tzPos.search(s)
if m is None:
return parse_time(s) + (None,)
pos = m.start()
return parse_time(s[:pos]) + (parse_tz(s[pos:]),)
_datetimeFmt = re.compile(r"[T ]")
def _split_datetime(s):
"""Split date and time parts of ISO-8601 compliant timestamp and
return a tuple (date, time).
' ' or 'T' used to separate date and time parts.
"""
m = _datetimeFmt.search(s)
if m is None:
raise ValueError, 'time part of datetime missing: %s' % s
pos = m.start()
return s[:pos], s[pos + 1:]
def parse_datetime(s):
"""Parses ISO-8601 compliant timestamp and returns a tuple (year, month,
day, hour, minute, second).
Formats accepted are those listed in the descriptions of parse_date() and
parse_time() with ' ' or 'T' used to separate date and time parts.
"""
dt, tm = _split_datetime(s)
return parse_date(dt) + parse_time(tm)
def parse_datetimetz(s):
"""Parses ISO-8601 compliant timestamp that may include timezone
information and returns a tuple (year, month, day, hour, minute, second,
tzoffset).
tzoffset is the offset east of UTC in minutes. It will be None if s does
not include time zone information.
Formats accepted are those listed in the descriptions of parse_date() and
parse_timetz() with ' ' or 'T' used to separate date and time parts.
"""
dt, tm = _split_datetime(s)
return parse_date(dt) + parse_timetz(tm)
def parse_interval(s):
"""Parses PostgreSQL interval notation and returns a tuple (years, months,
days, hours, minutes, seconds).
Values accepted:
interval ::= date
| time
| date time
date ::= date_comp
| date date_comp
date_comp ::= 1 'day'
| number 'days'
| 1 'month'
| 1 'mon'
| number 'months'
| number 'mons'
| 1 'year'
| number 'years'
time ::= number ':' number
| number ':' number ':' number
| number ':' number ':' number '.' fraction
"""
years = months = days = 0
hours = minutes = seconds = 0
elements = s.split()
# Tests with 7.4.6 on Ubuntu 5.4 interval output returns 'mon' and 'mons'
# and not 'month' or 'months' as expected. I've fixed this and left
# the original matches there too in case this is dependant on
# OS or PostgreSQL release.
for i in range(0, len(elements) - 1, 2):
count, unit = elements[i:i+2]
if unit == 'day' and count == '1':
days += 1
elif unit == 'days':
days += int(count)
elif unit == 'month' and count == '1':
months += 1
elif unit == 'mon' and count == '1':
months += 1
elif unit == 'months':
months += int(count)
elif unit == 'mons':
months += int(count)
elif unit == 'year' and count == '1':
years += 1
elif unit == 'years':
years += int(count)
else:
raise ValueError, 'unknown time interval %s %s' % (count, unit)
if len(elements) % 2 == 1:
hours, minutes, seconds = parse_time(elements[-1])
return (years, months, days, hours, minutes, seconds)
# Type conversions
def _conv_date(s, cursor):
if s:
return date(*parse_date(s))
def _conv_time(s, cursor):
if s:
hr, mn, sc = parse_time(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
return time(hr, mn, int(sc), int(micro))
def _conv_timetz(s, cursor):
if s:
from zope.datetime import tzinfo
hr, mn, sc, tz = parse_timetz(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
if tz: tz = tzinfo(tz)
return time(hr, mn, int(sc), int(micro), tz)
def _conv_timestamp(s, cursor):
if s:
y, m, d, hr, mn, sc = parse_datetime(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
return datetime(y, m, d, hr, mn, int(sc), int(micro))
def _conv_timestamptz(s, cursor):
if s:
from zope.datetime import tzinfo
y, m, d, hr, mn, sc, tz = parse_datetimetz(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
if tz: tz = tzinfo(tz)
return datetime(y, m, d, hr, mn, int(sc), int(micro), tz)
def _conv_interval(s, cursor):
if s:
y, m, d, hr, mn, sc = parse_interval(s)
if (y, m) != (0, 0):
# XXX: Currently there's no way to represent years and months as
# timedeltas
return s
else:
return timedelta(days=d, hours=hr, minutes=mn, seconds=sc)
def _get_string_conv(encoding):
def _conv_string(s, cursor):
if s is not None:
s = s.decode(encoding)
return s
return _conv_string
# User-defined types
DATE = psycopg2.extensions.new_type((DATE_OID,), "ZDATE", _conv_date)
TIME = psycopg2.extensions.new_type((TIME_OID,), "ZTIME", _conv_time)
TIMETZ = psycopg2.extensions.new_type((TIMETZ_OID,), "ZTIMETZ", _conv_timetz)
TIMESTAMP = psycopg2.extensions.new_type((TIMESTAMP_OID,), "ZTIMESTAMP", _conv_timestamp)
TIMESTAMPTZ = psycopg2.extensions.new_type((TIMESTAMPTZ_OID,), "ZTIMESTAMPTZ", _conv_timestamptz)
INTERVAL = psycopg2.extensions.new_type((INTERVAL_OID,), "ZINTERVAL", _conv_interval)
def registerTypes(encoding):
"""Register type conversions for psycopg"""
psycopg2.extensions.register_type(DATE)
psycopg2.extensions.register_type(TIME)
psycopg2.extensions.register_type(TIMETZ)
psycopg2.extensions.register_type(TIMESTAMP)
psycopg2.extensions.register_type(TIMESTAMPTZ)
psycopg2.extensions.register_type(INTERVAL)
STRING = psycopg2.extensions.new_type((CHAR_OID, TEXT_OID, BPCHAR_OID, VARCHAR_OID), "ZSTRING", _get_string_conv(encoding))
psycopg2.extensions.register_type(STRING)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
dsn2option_mapping = {'host': 'host',
'port': 'port',
'dbname': 'dbname',
'username': 'user',
'password': 'password'}
class Psycopg2Adapter(ZopeDatabaseAdapter):
"""A psycopg2 adapter for Zope3.
The following type conversions are performed:
DATE -> datetime.date
TIME -> datetime.time
TIMETZ -> datetime.time
TIMESTAMP -> datetime.datetime
TIMESTAMPTZ -> datetime.datetime
XXX: INTERVAL cannot be represented exactly as datetime.timedelta since
it might be something like '1 month', which is a variable number of days.
"""
def connect(self):
if not self.isConnected():
try:
self._v_connection = Psycopg2Connection(
self._connection_factory(), self
)
except psycopg2.Error, error:
raise DatabaseException, str(error)
def registerTypes(self):
registerTypes(self.getEncoding())
def _connection_factory(self):
"""Create a psycopg2 DBI connection based on the DSN"""
self.registerTypes()
conn_info = parseDSN(self.dsn)
conn_list = []
for dsnname, optname in dsn2option_mapping.iteritems():
if conn_info[dsnname]:
conn_list.append('%s=%s' % (optname, conn_info[dsnname]))
conn_str = ' '.join(conn_list)
connection = psycopg2.connect(conn_str)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
return connection
def disconnect(self):
if self.isConnected():
try:
self._v_connection.close()
except psycopg2.InterfaceError:
pass
self._v_connection = None
def _handle_psycopg_exception(error):
"""Called from a exception handler for psycopg2.Error.
If we have a serialization exception or a deadlock, we should retry the
transaction by raising a Retry exception. Otherwise, we reraise.
"""
if isinstance(error, psycopg2.extensions.TransactionRollbackError):
raise Retry(sys.exc_info())
raise
class IPsycopg2ZopeConnection(IZopeConnection):
"""A marker interface stating that this connection uses PostgreSQL."""
class Psycopg2Connection(ZopeConnection):
implements(IPsycopg2ZopeConnection)
def cursor(self):
"""See IZopeConnection"""
return Psycopg2Cursor(self.conn.cursor(), self)
def commit(self):
try:
ZopeConnection.commit(self)
except psycopg2.Error, error:
_handle_psycopg_exception(error)
class Psycopg2Cursor(ZopeCursor):
def execute(self, operation, parameters=None):
"""See IZopeCursor"""
try:
return ZopeCursor.execute(self, operation, parameters)
except psycopg2.Error, error:
_handle_psycopg_exception(error)
def executemany(operation, seq_of_parameters=None):
"""See IZopeCursor"""
raise RuntimeError, 'Oos'
try:
return ZopeCursor.execute(self, operation, seq_of_parameters)
except psycopg2.Error, error:
_handle_psycopg_exception(error)

View File

@ -1,51 +0,0 @@
<configure
xmlns="http://namespaces.zope.org/zope"
xmlns:browser="http://namespaces.zope.org/browser"
i18n_domain="psycopg2da">
<class class=".adapter.Psycopg2Adapter">
<factory id="zope.da.Psycopg2DA" />
<require
permission="zope.rdb.Use"
interface="zope.rdb.interfaces.IZopeDatabaseAdapter"
/>
<require
permission="zope.ManageServices"
interface="zope.rdb.interfaces.IZopeDatabaseAdapterManagement"
/>
</class>
<class class=".adapter.Psycopg2Connection">
<require
permission="zope.rdb.Use"
interface="zope.rdb.interfaces.IZopeConnection"
/>
</class>
<class class=".adapter.Psycopg2Cursor">
<require
permission="zope.rdb.Use"
interface="zope.rdb.interfaces.IZopeCursor"
/>
</class>
<browser:addform
name="AddPsycopg2DA"
schema="zope.rdb.interfaces.IManageableZopeDatabaseAdapter"
label="Add Psycopg2 (PostGreSQL) Database Adapter"
content_factory=".adapter.Psycopg2Adapter"
arguments="dsn"
fields="dsn"
permission="zope.ManageContent"
/>
<!-- Menu entry for "add utility" menu -->
<browser:addMenuItem
class=".adapter.Psycopg2Adapter"
title="Psycopg2 DA"
description="A PostgreSQL Database Adapter using the Psycopg2 driver"
permission="zope.ManageApplication"
view="AddPsycopg2DA"
/>
</configure>

View File

@ -1 +0,0 @@
<include package="psycopg2da" />

View File

@ -1,395 +0,0 @@
# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
#
# psycopg2da is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2da is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# If you prefer you can use this package using the ZPL license as
# published on the Zope web site, http://www.zope.org/Resources/ZPL.
"""Unit tests for Psycopg2DA."""
from unittest import TestCase, TestSuite, main, makeSuite
from datetime import tzinfo, timedelta
import psycopg2
import psycopg2.extensions
class Stub(object):
def __init__(self, **kw):
self.__dict__.update(kw)
class TZStub(tzinfo):
def __init__(self, h, m):
self.offset = h * 60 + m
def utcoffset(self, dt):
return timedelta(minutes=self.offset)
def dst(self, dt):
return 0
def tzname(self, dt):
return ''
def __repr__(self):
return 'tzinfo(%d)' % self.offset
def __reduce__(self):
return type(self), (), self.__dict__
class ConnectionStub(object):
def set_isolation_level(self, level):
pass
class Psycopg2Stub(object):
__shared_state = {} # 'Borg' design pattern
DATE = psycopg2.extensions.DATE
TIME = psycopg2.extensions.TIME
DATETIME = psycopg2.DATETIME
INTERVAL = psycopg2.extensions.INTERVAL
ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
def __init__(self):
self.__dict__ = self.__shared_state
self.types = {}
def connect(self, connection_string):
self.last_connection_string = connection_string
return ConnectionStub()
def new_type(self, values, name, converter):
return Stub(name=name, values=values)
def register_type(self, type):
for typeid in type.values:
self.types[typeid] = type
def getExtensions(self):
return self
extensions = property(getExtensions)
class TestPsycopg2TypeConversion(TestCase):
def test_conv_date(self):
from psycopg2da.adapter import _conv_date
from datetime import date
def c(s):
return _conv_date(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('2001-03-02'), date(2001, 3, 2))
def test_conv_time(self):
from psycopg2da.adapter import _conv_time
from datetime import time
def c(s):
return _conv_time(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('23:17:57'), time(23, 17, 57))
self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000))
def test_conv_timetz(self):
from psycopg2da.adapter import _conv_timetz
from datetime import time
def c(s):
return _conv_timetz(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0)))
self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30)))
def test_conv_timestamp(self):
from psycopg2da.adapter import _conv_timestamp
from datetime import datetime
def c(s):
return _conv_timestamp(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('2001-03-02 12:44:01'),
datetime(2001, 3, 2, 12, 44, 01))
self.assertEquals(c('2001-03-02 12:44:01.037'),
datetime(2001, 3, 2, 12, 44, 01, 37000))
self.assertEquals(c('2001-03-02 12:44:01.000001'),
datetime(2001, 3, 2, 12, 44, 01, 1))
def test_conv_timestamptz(self):
from psycopg2da.adapter import _conv_timestamptz
from datetime import datetime
def c(s):
return _conv_timestamptz(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('2001-03-02 12:44:01+01:00'),
datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0)))
self.assertEquals(c('2001-03-02 12:44:01.037-00:30'),
datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30)))
self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'),
datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0)))
self.assertEquals(c('2001-06-25 12:14:00-07'),
datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0)))
def test_conv_interval(self):
from psycopg2da.adapter import _conv_interval
from datetime import timedelta
def c(s):
return _conv_interval(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('01:00'), timedelta(hours=1))
self.assertEquals(c('00:15'), timedelta(minutes=15))
self.assertEquals(c('00:00:47'), timedelta(seconds=47))
self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000))
self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111))
self.assertEquals(c('1 day'), timedelta(days=1))
self.assertEquals(c('2 days'), timedelta(days=2))
self.assertEquals(c('374 days'), timedelta(days=374))
self.assertEquals(c('2 days 03:20:15.123456'),
timedelta(days=2, hours=3, minutes=20,
seconds=15, microseconds=123456))
# XXX There's a problem with years and months. Currently timedelta
# cannot represent them accurately
self.assertEquals(c('1 month'), '1 month')
self.assertEquals(c('2 months'), '2 months')
self.assertEquals(c('1 mon'), '1 mon')
self.assertEquals(c('2 mons'), '2 mons')
self.assertEquals(c('1 year'), '1 year')
self.assertEquals(c('3 years'), '3 years')
# Later we might be able to use
## self.assertEquals(c('1 month'), timedelta(months=1))
## self.assertEquals(c('2 months'), timedelta(months=2))
## self.assertEquals(c('1 year'), timedelta(years=1))
## self.assertEquals(c('3 years'), timedelta(years=3))
self.assertRaises(ValueError, c, '2 day')
self.assertRaises(ValueError, c, '2days')
self.assertRaises(ValueError, c, '123')
def test_conv_string(self):
from psycopg2da.adapter import _get_string_conv
_conv_string = _get_string_conv("utf-8")
def c(s):
return _conv_string(s, None)
self.assertEquals(c(None), None)
self.assertEquals(c(''), u'')
self.assertEquals(c('c'), u'c')
self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2')
self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2')
class TestPsycopg2Adapter(TestCase):
def setUp(self):
import psycopg2da.adapter as adapter
self.real_psycopg2 = adapter.psycopg2
adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub()
def tearDown(self):
import psycopg2da.adapter as adapter
adapter.psycopg2 = self.real_psycopg2
def test_connection_factory(self):
from psycopg2da.adapter import Psycopg2Adapter
a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored')
c = a._connection_factory()
args = self.psycopg2_stub.last_connection_string.split()
args.sort()
self.assertEquals(args, ['dbname=dbname', 'host=hostname',
'password=password', 'port=port',
'user=username'])
def test_registerTypes(self):
import psycopg2da.adapter as adapter
from psycopg2da.adapter import Psycopg2Adapter
a = Psycopg2Adapter('dbi://')
a.registerTypes()
for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP',
'TIMESTAMPTZ', 'INTERVAL'):
typeid = getattr(adapter, '%s_OID' % typename)
result = self.psycopg2_stub.types.get(typeid, None)
if not result:
# comparing None with psycopg2.type object segfaults
self.fail("did not register %s (%d): got None, not Z%s"
% (typename, typeid, typename))
else:
result_name = getattr(result, 'name', 'None')
self.assertEquals(result, getattr(adapter, typename),
"did not register %s (%d): got %s, not Z%s"
% (typename, typeid, result_name, typename))
class TestISODateTime(TestCase):
# Test if date/time parsing functions accept a sensible subset of ISO-8601
# compliant date/time strings.
#
# Resources:
# http://www.cl.cam.ac.uk/~mgk25/iso-time.html
# http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html
# http://www.w3.org/TR/NOTE-datetime
# http://www.ietf.org/rfc/rfc3339.txt
basic_dates = (('20020304', (2002, 03, 04)),
('20000229', (2000, 02, 29)))
extended_dates = (('2002-03-04', (2002, 03, 04)),
('2000-02-29', (2000, 02, 29)))
basic_times = (('12', (12, 0, 0)),
('1234', (12, 34, 0)),
('123417', (12, 34, 17)),
('123417.5', (12, 34, 17.5)),
('123417,5', (12, 34, 17.5)))
extended_times = (('12', (12, 0, 0)),
('12:34', (12, 34, 0)),
('12:34:17', (12, 34, 17)),
('12:34:17.5', (12, 34, 17.5)),
('12:34:17,5', (12, 34, 17.5)))
basic_tzs = (('Z', 0),
('+02', 2*60),
('+1130', 11*60+30),
('-05', -5*60),
('-0030', -30))
extended_tzs = (('Z', 0),
('+02', 2*60),
('+11:30', 11*60+30),
('-05', -5*60),
('-00:30', -30))
time_separators = (' ', 'T')
bad_dates = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29',
'1990/13/14')
bad_times = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,')
bad_timetzs = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q')
bad_datetimes = ('', 'foo', '2002-03-0412:33')
bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200')
exception_type = ValueError
# We need the following funcions:
# parse_date -> (year, month, day)
# parse_time -> (hour, minute, second)
# parse_timetz -> (hour, minute, second, tzoffset)
# parse_datetime -> (year, month, day, hour, minute, second)
# parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset)
# second can be a float, all other values are ints
# tzoffset is offset in minutes east of UTC
def setUp(self):
from psycopg2da.adapter import parse_date, parse_time, \
parse_timetz, parse_datetime, parse_datetimetz
self.parse_date = parse_date
self.parse_time = parse_time
self.parse_timetz = parse_timetz
self.parse_datetime = parse_datetime
self.parse_datetimetz = parse_datetimetz
def test_basic_date(self):
for s, d in self.basic_dates:
self.assertEqual(self.parse_date(s), d)
def test_extended_date(self):
for s, d in self.extended_dates:
self.assertEqual(self.parse_date(s), d)
def test_bad_date(self):
for s in self.bad_dates:
self.assertRaises(self.exception_type, self.parse_date, s)
def test_basic_time(self):
for s, t in self.basic_times:
self.assertEqual(self.parse_time(s), t)
def test_extended_time(self):
for s, t in self.extended_times:
self.assertEqual(self.parse_time(s), t)
def test_bad_time(self):
for s in self.bad_times:
self.assertRaises(self.exception_type, self.parse_time, s)
def test_basic_timetz(self):
for s, t in self.basic_times:
for tz, off in self.basic_tzs:
self.assertEqual(self.parse_timetz(s+tz), t + (off,))
def test_extended_timetz(self):
for s, t in self.extended_times:
for tz, off in self.extended_tzs:
self.assertEqual(self.parse_timetz(s+tz), t + (off,))
def test_bad_timetzs(self):
for s in self.bad_timetzs:
self.assertRaises(self.exception_type, self.parse_timetz, s)
def test_basic_datetime(self):
for ds, d in self.basic_dates:
for ts, t in self.basic_times:
for sep in self.time_separators:
self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
def test_extended_datetime(self):
for ds, d in self.extended_dates:
for ts, t in self.extended_times:
for sep in self.time_separators:
self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
def test_bad_datetimes(self):
for s in self.bad_datetimes:
self.assertRaises(self.exception_type, self.parse_datetime, s)
def test_basic_datetimetz(self):
for ds, d in self.basic_dates:
for ts, t in self.basic_times:
for tz, off in self.basic_tzs:
for sep in self.time_separators:
self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
d + t + (off,))
def test_extended_datetimetz(self):
for ds, d in self.extended_dates:
for ts, t in self.extended_times:
for tz, off in self.extended_tzs:
for sep in self.time_separators:
self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
d + t + (off,))
def test_bad_datetimetzs(self):
for s in self.bad_datetimetzs:
self.assertRaises(self.exception_type, self.parse_datetimetz, s)
def test_suite():
return TestSuite((
makeSuite(TestPsycopg2TypeConversion),
makeSuite(TestPsycopg2Adapter),
makeSuite(TestISODateTime),
))
if __name__=='__main__':
main(defaultTest='test_suite')

View File

@ -8,7 +8,3 @@ envlist = py26, py27
[testenv] [testenv]
commands = make check commands = make check
deps =
zope.interface
zope.rdb
zope.datetime