Added psycopg2da, the zope3 database adapter for psycopg2.

This commit is contained in:
Fabio Tranchitella 2006-09-19 03:50:35 +00:00
parent 31189ef0df
commit 146f64e71d
9 changed files with 980 additions and 0 deletions

View File

@ -0,0 +1,2 @@
psycopg2
zope.app

24
psycopg2da/PACKAGE.cfg Normal file
View File

@ -0,0 +1,24 @@
# Load the license from an external source, so we don't have to keep a
# copy of it sitting around:
<load>
LICENSE.txt http://svn.zope.org/*checkout*/Zope3/trunk/ZopePublicLicense.txt?rev=25177
</load>
# Add a few things to the distribution root.
<distribution>
README.txt
</distribution>
# Specify what is included in the component.
<collection>
# Documentation files of the package:
*.txt
# Python modules from the package:
*.py
# Configuration files of the package:
*.zcml
</collection>

View File

@ -0,0 +1,9 @@
Metadata-Version: 1.0
Name: psycopg2da
Summary: Psycopg2 Database Adapter for Zope 3
Author: Fabio Tranchitella
Author-email: kobold@debian.org
License: ZPL 2.1
Description:
This package allows Zope 3 to connect to any PostGreSQL database via
the common Zope 3 RDB connection facilities.

79
psycopg2da/README.txt Normal file
View File

@ -0,0 +1,79 @@
==========
psycopg2da
==========
This file outlines the basics of using Zope3 with PostgreSQL via PsycopgDA.
Installing PsycopgDA
--------------------
1. Check out the psycopg2da package into a directory in your
PYTHONPATH. INSTANCE_HOME/lib/python or Zope3/src is usually the
most convenient place:
svn co svn://svn.zope.org/repos/main/psycopg2da/trunk psycopg2da
2. Copy `psycopg2-configure.zcml` to the `package-includes` directory
of your Zope instance.
Creating Database Connections
------------------------------
It is time to add some connections. A connection in Zope 3 is
registered as a utility.
3. Open a web browser on your Zope root folder (http://localhost:8080/
if you use the default settings in zope.conf.in).
4. Click on the 'Manage Site' action on the right side of the
screen. You should see a screen which reads 'Common Site Management
Tasks'
5. Around the middle of that page, you should see a link named 'Add
Utility'. Click on it.
6. Select 'Psycopg DA' and type in a name at the bottom of the page.
7. Enter the database connection string. It looks like this:
dbi://username:password@host:port/databasename
8. Click on the 'Add' button.
9. You should be on a page which reads 'Add Database Connection
Registration'. There you can configure the permission needed to use
the database connection, the name of the registration and the
registration status. You can use any name for 'Register As' field,
as long as it doesn't clash with an existing one. Choose a
permission. Choose between 'Registered' and 'Active' for the
'Registration Status'. Only one component of a kind can be 'Active'
at a time, so be careful.
10. You should be redirected to the 'Edit' screen of the connection
utility.
11. If you want to, you can go to the Test page and execute arbitrary
SQL queries to see whether the connection is working as expected.
Using SQL Scripts
-----------------
You can create SQL Scripts in the content space. For example:
12. Go to Zope root.
13. Add an SQL script (you can use the Common Tasks box on the left,
or the Add action on the right).
14. Click on the name of your new SQL script.
15. Choose a connection name (the one you entered in step 29) from the
drop-down.
16. Enter your query and click on the 'Save Changes' button.
17. You can test the script in the -- surprise! -- Test page.

0
psycopg2da/__init__.py Normal file
View File

425
psycopg2da/adapter.py Normal file
View File

@ -0,0 +1,425 @@
# psycopg2da
# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
#
# Based on psycopgda:
#
# Copyright (c) 2002-2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# If you prefer you can use this package using the ZPL license as
# published on the Zope web site, http://www.zope.org/Resources/ZPL.
#
"""PostgreSQL Database Adapter for Zope 3"""
from zope.interface import implements
from zope.rdb import ZopeDatabaseAdapter, parseDSN, ZopeConnection, ZopeCursor
from zope.rdb.interfaces import DatabaseException, IZopeConnection
from zope.publisher.interfaces import Retry
from datetime import date, time, datetime, timedelta
import psycopg2
import psycopg2.extensions
import re
import sys
# OIDs from psycopg/pgtypes.h
DATE_OID = 1082
TIME_OID = 1083
TIMETZ_OID = 1266
TIMESTAMP_OID = 1114
TIMESTAMPTZ_OID = 1184
INTERVAL_OID = 1186
CHAR_OID = 18
TEXT_OID = 25
BPCHAR_OID = 1042
VARCHAR_OID = 1043
# date/time parsing functions
_dateFmt = re.compile(r"^(\d\d\d\d)-?([01]\d)-?([0-3]\d)$")
def parse_date(s):
"""Parses ISO-8601 compliant dates and returns a tuple (year, month,
day).
The following formats are accepted:
YYYY-MM-DD (extended format)
YYYYMMDD (basic format)
"""
m = _dateFmt.match(s)
if m is None:
raise ValueError, 'invalid date string: %s' % s
year, month, day = m.groups()
return int(year), int(month), int(day)
_timeFmt = re.compile(
r"^([0-2]\d)(?::?([0-5]\d)(?::?([0-5]\d)(?:[.,](\d+))?)?)?$")
def parse_time(s):
"""Parses ISO-8601 compliant times and returns a tuple (hour, minute,
second).
The following formats are accepted:
HH:MM:SS.ssss or HHMMSS.ssss
HH:MM:SS,ssss or HHMMSS,ssss
HH:MM:SS or HHMMSS
HH:MM or HHMM
HH
"""
m = _timeFmt.match(s)
if m is None:
raise ValueError, 'invalid time string: %s' % s
hr, mn, sc, msc = m.groups(0)
if msc != 0:
sc = float("%s.%s" % (sc, msc))
else:
sc = int(sc)
return int(hr), int(mn), sc
_tzFmt = re.compile(r"^([+-])([0-2]\d)(?::?([0-5]\d))?$")
def parse_tz(s):
"""Parses ISO-8601 timezones and returns the offset east of UTC in
minutes.
The following formats are accepted:
+/-HH:MM
+/-HHMM
+/-HH
Z (equivalent to +0000)
"""
if s == 'Z':
return 0
m = _tzFmt.match(s)
if m is None:
raise ValueError, 'invalid time zone: %s' % s
d, hoff, moff = m.groups(0)
if d == "-":
return - int(hoff) * 60 - int(moff)
return int(hoff) * 60 + int(moff)
_tzPos = re.compile(r"[Z+-]")
def parse_timetz(s):
"""Parses ISO-8601 compliant times that may include timezone information
and returns a tuple (hour, minute, second, tzoffset).
tzoffset is the offset east of UTC in minutes. It will be None if s does
not include time zone information.
Formats accepted are those listed in the descriptions of parse_time() and
parse_tz(). Time zone should immediatelly follow time without intervening
spaces.
"""
m = _tzPos.search(s)
if m is None:
return parse_time(s) + (None,)
pos = m.start()
return parse_time(s[:pos]) + (parse_tz(s[pos:]),)
_datetimeFmt = re.compile(r"[T ]")
def _split_datetime(s):
"""Split date and time parts of ISO-8601 compliant timestamp and
return a tuple (date, time).
' ' or 'T' used to separate date and time parts.
"""
m = _datetimeFmt.search(s)
if m is None:
raise ValueError, 'time part of datetime missing: %s' % s
pos = m.start()
return s[:pos], s[pos + 1:]
def parse_datetime(s):
"""Parses ISO-8601 compliant timestamp and returns a tuple (year, month,
day, hour, minute, second).
Formats accepted are those listed in the descriptions of parse_date() and
parse_time() with ' ' or 'T' used to separate date and time parts.
"""
dt, tm = _split_datetime(s)
return parse_date(dt) + parse_time(tm)
def parse_datetimetz(s):
"""Parses ISO-8601 compliant timestamp that may include timezone
information and returns a tuple (year, month, day, hour, minute, second,
tzoffset).
tzoffset is the offset east of UTC in minutes. It will be None if s does
not include time zone information.
Formats accepted are those listed in the descriptions of parse_date() and
parse_timetz() with ' ' or 'T' used to separate date and time parts.
"""
dt, tm = _split_datetime(s)
return parse_date(dt) + parse_timetz(tm)
def parse_interval(s):
"""Parses PostgreSQL interval notation and returns a tuple (years, months,
days, hours, minutes, seconds).
Values accepted:
interval ::= date
| time
| date time
date ::= date_comp
| date date_comp
date_comp ::= 1 'day'
| number 'days'
| 1 'month'
| 1 'mon'
| number 'months'
| number 'mons'
| 1 'year'
| number 'years'
time ::= number ':' number
| number ':' number ':' number
| number ':' number ':' number '.' fraction
"""
years = months = days = 0
hours = minutes = seconds = 0
elements = s.split()
# Tests with 7.4.6 on Ubuntu 5.4 interval output returns 'mon' and 'mons'
# and not 'month' or 'months' as expected. I've fixed this and left
# the original matches there too in case this is dependant on
# OS or PostgreSQL release.
for i in range(0, len(elements) - 1, 2):
count, unit = elements[i:i+2]
if unit == 'day' and count == '1':
days += 1
elif unit == 'days':
days += int(count)
elif unit == 'month' and count == '1':
months += 1
elif unit == 'mon' and count == '1':
months += 1
elif unit == 'months':
months += int(count)
elif unit == 'mons':
months += int(count)
elif unit == 'year' and count == '1':
years += 1
elif unit == 'years':
years += int(count)
else:
raise ValueError, 'unknown time interval %s %s' % (count, unit)
if len(elements) % 2 == 1:
hours, minutes, seconds = parse_time(elements[-1])
return (years, months, days, hours, minutes, seconds)
# Type conversions
def _conv_date(s, cursor):
if s:
return date(*parse_date(s))
def _conv_time(s, cursor):
if s:
hr, mn, sc = parse_time(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
return time(hr, mn, int(sc), int(micro))
def _conv_timetz(s, cursor):
if s:
from zope.datetime import tzinfo
hr, mn, sc, tz = parse_timetz(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
if tz: tz = tzinfo(tz)
return time(hr, mn, int(sc), int(micro), tz)
def _conv_timestamp(s, cursor):
if s:
y, m, d, hr, mn, sc = parse_datetime(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
return datetime(y, m, d, hr, mn, int(sc), int(micro))
def _conv_timestamptz(s, cursor):
if s:
from zope.datetime import tzinfo
y, m, d, hr, mn, sc, tz = parse_datetimetz(s)
sc, micro = divmod(sc, 1.0)
micro = round(micro * 1000000)
if tz: tz = tzinfo(tz)
return datetime(y, m, d, hr, mn, int(sc), int(micro), tz)
def _conv_interval(s, cursor):
if s:
y, m, d, hr, mn, sc = parse_interval(s)
if (y, m) != (0, 0):
# XXX: Currently there's no way to represent years and months as
# timedeltas
return s
else:
return timedelta(days=d, hours=hr, minutes=mn, seconds=sc)
def _get_string_conv(encoding):
def _conv_string(s, cursor):
if s is not None:
s = s.decode(encoding)
return s
return _conv_string
# User-defined types
DATE = psycopg2.extensions.new_type((DATE_OID,), "ZDATE", _conv_date)
TIME = psycopg2.extensions.new_type((TIME_OID,), "ZTIME", _conv_time)
TIMETZ = psycopg2.extensions.new_type((TIMETZ_OID,), "ZTIMETZ", _conv_timetz)
TIMESTAMP = psycopg2.extensions.new_type((TIMESTAMP_OID,), "ZTIMESTAMP", _conv_timestamp)
TIMESTAMPTZ = psycopg2.extensions.new_type((TIMESTAMPTZ_OID,), "ZTIMESTAMPTZ", _conv_timestamptz)
INTERVAL = psycopg2.extensions.new_type((INTERVAL_OID,), "ZINTERVAL", _conv_interval)
def registerTypes(encoding):
"""Register type conversions for psycopg"""
psycopg2.extensions.register_type(DATE)
psycopg2.extensions.register_type(TIME)
psycopg2.extensions.register_type(TIMETZ)
psycopg2.extensions.register_type(TIMESTAMP)
psycopg2.extensions.register_type(TIMESTAMPTZ)
psycopg2.extensions.register_type(INTERVAL)
STRING = psycopg2.extensions.new_type((CHAR_OID, TEXT_OID, BPCHAR_OID, VARCHAR_OID), "ZSTRING", _get_string_conv(encoding))
psycopg2.extensions.register_type(STRING)
dsn2option_mapping = {'host': 'host',
'port': 'port',
'dbname': 'dbname',
'username': 'user',
'password': 'password'}
class Psycopg2Adapter(ZopeDatabaseAdapter):
"""A psycopg2 adapter for Zope3.
The following type conversions are performed:
DATE -> datetime.date
TIME -> datetime.time
TIMETZ -> datetime.time
TIMESTAMP -> datetime.datetime
TIMESTAMPTZ -> datetime.datetime
XXX: INTERVAL cannot be represented exactly as datetime.timedelta since
it might be something like '1 month', which is a variable number of days.
"""
def connect(self):
if not self.isConnected():
try:
self._v_connection = Psycopg2Connection(
self._connection_factory(), self
)
except psycopg2.Error, error:
raise DatabaseException, str(error)
def registerTypes(self):
registerTypes(self.getEncoding())
def _connection_factory(self):
"""Create a psycopg2 DBI connection based on the DSN"""
self.registerTypes()
conn_info = parseDSN(self.dsn)
conn_list = []
for dsnname, optname in dsn2option_mapping.iteritems():
if conn_info[dsnname]:
conn_list.append('%s=%s' % (optname, conn_info[dsnname]))
conn_str = ' '.join(conn_list)
connection = psycopg2.connect(conn_str)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
return connection
def disconnect(self):
if self.isConnected():
try:
self._v_connection.close()
except psycopg2.InterfaceError:
pass
self._v_connection = None
def _handle_psycopg_exception(error):
"""Called from a exception handler for psycopg2.Error.
If we have a serialization exception or a deadlock, we should retry the
transaction by raising a Retry exception. Otherwise, we reraise.
"""
if not error.args:
raise
msg = error.args[0]
# These messages are from PostgreSQL 8.0. They may change between
# PostgreSQL releases - if so, the different messages should be added
# rather than the existing ones changed so this logic works with
# different versions.
if msg.startswith(
'ERROR: could not serialize access due to concurrent update'
):
raise Retry(sys.exc_info())
if msg.startswith('ERROR: deadlock detected'):
raise Retry(sys.exc_info())
raise
class IPsycopg2ZopeConnection(IZopeConnection):
"""A marker interface stating that this connection uses PostgreSQL."""
class Psycopg2Connection(ZopeConnection):
implements(IPsycopg2ZopeConnection)
def cursor(self):
"""See IZopeConnection"""
return Psycopg2Cursor(self.conn.cursor(), self)
def commit(self):
try:
ZopeConnection.commit(self)
except psycopg2.Error, error:
_handle_psycopg_exception(error)
class Psycopg2Cursor(ZopeCursor):
def execute(self, operation, parameters=None):
"""See IZopeCursor"""
try:
return ZopeCursor.execute(self, operation, parameters)
except psycopg2.Error, error:
_handle_psycopg_exception(error)
def executemany(operation, seq_of_parameters=None):
"""See IZopeCursor"""
raise RuntimeError, 'Oos'
try:
return ZopeCursor.execute(self, operation, seq_of_parameters)
except psycopg2.Error, error:
_handle_psycopg_exception(error)

51
psycopg2da/configure.zcml Normal file
View File

@ -0,0 +1,51 @@
<configure
xmlns="http://namespaces.zope.org/zope"
xmlns:browser="http://namespaces.zope.org/browser"
i18n_domain="psycopg2da">
<class class=".adapter.Psycopg2Adapter">
<factory id="zope.da.Psycopg2DA" />
<require
permission="zope.rdb.Use"
interface="zope.rdb.interfaces.IZopeDatabaseAdapter"
/>
<require
permission="zope.ManageServices"
interface="zope.rdb.interfaces.IZopeDatabaseAdapterManagement"
/>
</class>
<class class=".adapter.Psycopg2Connection">
<require
permission="zope.rdb.Use"
interface="zope.rdb.interfaces.IZopeConnection"
/>
</class>
<class class=".adapter.Psycopg2Cursor">
<require
permission="zope.rdb.Use"
interface="zope.rdb.interfaces.IZopeCursor"
/>
</class>
<browser:addform
name="AddPsycopg2DA"
schema="zope.rdb.interfaces.IManageableZopeDatabaseAdapter"
label="Add Psycopg2 (PostGreSQL) Database Adapter"
content_factory=".adapter.Psycopg2Adapter"
arguments="dsn"
fields="dsn"
permission="zope.ManageContent"
/>
<!-- Menu entry for "add utility" menu -->
<browser:addMenuItem
class=".adapter.Psycopg2Adapter"
title="Psycopg2 DA"
description="A PostgreSQL Database Adapter using the Psycopg2 driver"
permission="zope.ManageApplication"
view="AddPsycopg2DA"
/>
</configure>

View File

@ -0,0 +1 @@
<include package="psycopg2da" />

389
psycopg2da/tests.py Normal file
View File

@ -0,0 +1,389 @@
# psycopg2da
# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Unit tests for Psycopg2DA."""
from unittest import TestCase, TestSuite, main, makeSuite
from datetime import tzinfo, timedelta
import psycopg2
import psycopg2.extensions
class Stub(object):
def __init__(self, **kw):
self.__dict__.update(kw)
class TZStub(tzinfo):
def __init__(self, h, m):
self.offset = h * 60 + m
def utcoffset(self, dt):
return timedelta(minutes=self.offset)
def dst(self, dt):
return 0
def tzname(self, dt):
return ''
def __repr__(self):
return 'tzinfo(%d)' % self.offset
def __reduce__(self):
return type(self), (), self.__dict__
class ConnectionStub(object):
def set_isolation_level(self, level):
pass
class Psycopg2Stub(object):
__shared_state = {} # 'Borg' design pattern
DATE = psycopg2.extensions.DATE
TIME = psycopg2.extensions.TIME
DATETIME = psycopg2.DATETIME
INTERVAL = psycopg2.extensions.INTERVAL
ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
def __init__(self):
self.__dict__ = self.__shared_state
self.types = {}
def connect(self, connection_string):
self.last_connection_string = connection_string
return ConnectionStub()
def new_type(self, values, name, converter):
return Stub(name=name, values=values)
def register_type(self, type):
for typeid in type.values:
self.types[typeid] = type
def getExtensions(self):
return self
extensions = property(getExtensions)
class TestPsycopg2TypeConversion(TestCase):
def test_conv_date(self):
from psycopg2da.adapter import _conv_date
from datetime import date
def c(s):
return _conv_date(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('2001-03-02'), date(2001, 3, 2))
def test_conv_time(self):
from psycopg2da.adapter import _conv_time
from datetime import time
def c(s):
return _conv_time(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('23:17:57'), time(23, 17, 57))
self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000))
def test_conv_timetz(self):
from psycopg2da.adapter import _conv_timetz
from datetime import time
def c(s):
return _conv_timetz(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0)))
self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30)))
def test_conv_timestamp(self):
from psycopg2da.adapter import _conv_timestamp
from datetime import datetime
def c(s):
return _conv_timestamp(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('2001-03-02 12:44:01'),
datetime(2001, 3, 2, 12, 44, 01))
self.assertEquals(c('2001-03-02 12:44:01.037'),
datetime(2001, 3, 2, 12, 44, 01, 37000))
self.assertEquals(c('2001-03-02 12:44:01.000001'),
datetime(2001, 3, 2, 12, 44, 01, 1))
def test_conv_timestamptz(self):
from psycopg2da.adapter import _conv_timestamptz
from datetime import datetime
def c(s):
return _conv_timestamptz(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('2001-03-02 12:44:01+01:00'),
datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0)))
self.assertEquals(c('2001-03-02 12:44:01.037-00:30'),
datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30)))
self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'),
datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0)))
self.assertEquals(c('2001-06-25 12:14:00-07'),
datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0)))
def test_conv_interval(self):
from psycopg2da.adapter import _conv_interval
from datetime import timedelta
def c(s):
return _conv_interval(s, None)
self.assertEquals(c(''), None)
self.assertEquals(c('01:00'), timedelta(hours=1))
self.assertEquals(c('00:15'), timedelta(minutes=15))
self.assertEquals(c('00:00:47'), timedelta(seconds=47))
self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000))
self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111))
self.assertEquals(c('1 day'), timedelta(days=1))
self.assertEquals(c('2 days'), timedelta(days=2))
self.assertEquals(c('374 days'), timedelta(days=374))
self.assertEquals(c('2 days 03:20:15.123456'),
timedelta(days=2, hours=3, minutes=20,
seconds=15, microseconds=123456))
# XXX There's a problem with years and months. Currently timedelta
# cannot represent them accurately
self.assertEquals(c('1 month'), '1 month')
self.assertEquals(c('2 months'), '2 months')
self.assertEquals(c('1 mon'), '1 mon')
self.assertEquals(c('2 mons'), '2 mons')
self.assertEquals(c('1 year'), '1 year')
self.assertEquals(c('3 years'), '3 years')
# Later we might be able to use
## self.assertEquals(c('1 month'), timedelta(months=1))
## self.assertEquals(c('2 months'), timedelta(months=2))
## self.assertEquals(c('1 year'), timedelta(years=1))
## self.assertEquals(c('3 years'), timedelta(years=3))
self.assertRaises(ValueError, c, '2 day')
self.assertRaises(ValueError, c, '2days')
self.assertRaises(ValueError, c, '123')
def test_conv_string(self):
from psycopg2da.adapter import _get_string_conv
_conv_string = _get_string_conv("utf-8")
def c(s):
return _conv_string(s, None)
self.assertEquals(c(None), None)
self.assertEquals(c(''), u'')
self.assertEquals(c('c'), u'c')
self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2')
self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2')
class TestPsycopg2Adapter(TestCase):
def setUp(self):
import psycopg2da.adapter as adapter
self.real_psycopg2 = adapter.psycopg2
adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub()
def tearDown(self):
import psycopg2da.adapter as adapter
adapter.psycopg2 = self.real_psycopg2
def test_connection_factory(self):
from psycopg2da.adapter import Psycopg2Adapter
a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored')
c = a._connection_factory()
args = self.psycopg2_stub.last_connection_string.split()
args.sort()
self.assertEquals(args, ['dbname=dbname', 'host=hostname',
'password=password', 'port=port',
'user=username'])
def test_registerTypes(self):
import psycopg2da.adapter as adapter
from psycopg2da.adapter import Psycopg2Adapter
a = Psycopg2Adapter('dbi://')
a.registerTypes()
for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP',
'TIMESTAMPTZ', 'INTERVAL'):
typeid = getattr(adapter, '%s_OID' % typename)
result = self.psycopg2_stub.types.get(typeid, None)
if not result:
# comparing None with psycopg2.type object segfaults
self.fail("did not register %s (%d): got None, not Z%s"
% (typename, typeid, typename))
else:
result_name = getattr(result, 'name', 'None')
self.assertEquals(result, getattr(adapter, typename),
"did not register %s (%d): got %s, not Z%s"
% (typename, typeid, result_name, typename))
class TestISODateTime(TestCase):
# Test if date/time parsing functions accept a sensible subset of ISO-8601
# compliant date/time strings.
#
# Resources:
# http://www.cl.cam.ac.uk/~mgk25/iso-time.html
# http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html
# http://www.w3.org/TR/NOTE-datetime
# http://www.ietf.org/rfc/rfc3339.txt
basic_dates = (('20020304', (2002, 03, 04)),
('20000229', (2000, 02, 29)))
extended_dates = (('2002-03-04', (2002, 03, 04)),
('2000-02-29', (2000, 02, 29)))
basic_times = (('12', (12, 0, 0)),
('1234', (12, 34, 0)),
('123417', (12, 34, 17)),
('123417.5', (12, 34, 17.5)),
('123417,5', (12, 34, 17.5)))
extended_times = (('12', (12, 0, 0)),
('12:34', (12, 34, 0)),
('12:34:17', (12, 34, 17)),
('12:34:17.5', (12, 34, 17.5)),
('12:34:17,5', (12, 34, 17.5)))
basic_tzs = (('Z', 0),
('+02', 2*60),
('+1130', 11*60+30),
('-05', -5*60),
('-0030', -30))
extended_tzs = (('Z', 0),
('+02', 2*60),
('+11:30', 11*60+30),
('-05', -5*60),
('-00:30', -30))
time_separators = (' ', 'T')
bad_dates = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29',
'1990/13/14')
bad_times = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,')
bad_timetzs = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q')
bad_datetimes = ('', 'foo', '2002-03-0412:33')
bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200')
exception_type = ValueError
# We need the following funcions:
# parse_date -> (year, month, day)
# parse_time -> (hour, minute, second)
# parse_timetz -> (hour, minute, second, tzoffset)
# parse_datetime -> (year, month, day, hour, minute, second)
# parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset)
# second can be a float, all other values are ints
# tzoffset is offset in minutes east of UTC
def setUp(self):
from psycopg2da.adapter import parse_date, parse_time, \
parse_timetz, parse_datetime, parse_datetimetz
self.parse_date = parse_date
self.parse_time = parse_time
self.parse_timetz = parse_timetz
self.parse_datetime = parse_datetime
self.parse_datetimetz = parse_datetimetz
def test_basic_date(self):
for s, d in self.basic_dates:
self.assertEqual(self.parse_date(s), d)
def test_extended_date(self):
for s, d in self.extended_dates:
self.assertEqual(self.parse_date(s), d)
def test_bad_date(self):
for s in self.bad_dates:
self.assertRaises(self.exception_type, self.parse_date, s)
def test_basic_time(self):
for s, t in self.basic_times:
self.assertEqual(self.parse_time(s), t)
def test_extended_time(self):
for s, t in self.extended_times:
self.assertEqual(self.parse_time(s), t)
def test_bad_time(self):
for s in self.bad_times:
self.assertRaises(self.exception_type, self.parse_time, s)
def test_basic_timetz(self):
for s, t in self.basic_times:
for tz, off in self.basic_tzs:
self.assertEqual(self.parse_timetz(s+tz), t + (off,))
def test_extended_timetz(self):
for s, t in self.extended_times:
for tz, off in self.extended_tzs:
self.assertEqual(self.parse_timetz(s+tz), t + (off,))
def test_bad_timetzs(self):
for s in self.bad_timetzs:
self.assertRaises(self.exception_type, self.parse_timetz, s)
def test_basic_datetime(self):
for ds, d in self.basic_dates:
for ts, t in self.basic_times:
for sep in self.time_separators:
self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
def test_extended_datetime(self):
for ds, d in self.extended_dates:
for ts, t in self.extended_times:
for sep in self.time_separators:
self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
def test_bad_datetimes(self):
for s in self.bad_datetimes:
self.assertRaises(self.exception_type, self.parse_datetime, s)
def test_basic_datetimetz(self):
for ds, d in self.basic_dates:
for ts, t in self.basic_times:
for tz, off in self.basic_tzs:
for sep in self.time_separators:
self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
d + t + (off,))
def test_extended_datetimetz(self):
for ds, d in self.extended_dates:
for ts, t in self.extended_times:
for tz, off in self.extended_tzs:
for sep in self.time_separators:
self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
d + t + (off,))
def test_bad_datetimetzs(self):
for s in self.bad_datetimetzs:
self.assertRaises(self.exception_type, self.parse_datetimetz, s)
def test_suite():
return TestSuite((
makeSuite(TestPsycopg2TypeConversion),
makeSuite(TestPsycopg2Adapter),
makeSuite(TestISODateTime),
))
if __name__=='__main__':
main(defaultTest='test_suite')