diff --git a/psycopg2da/DEPENDENCIES.cfg b/psycopg2da/DEPENDENCIES.cfg new file mode 100644 index 00000000..f4edbac9 --- /dev/null +++ b/psycopg2da/DEPENDENCIES.cfg @@ -0,0 +1,2 @@ +psycopg2 +zope.app diff --git a/psycopg2da/PACKAGE.cfg b/psycopg2da/PACKAGE.cfg new file mode 100644 index 00000000..7e1bbfc1 --- /dev/null +++ b/psycopg2da/PACKAGE.cfg @@ -0,0 +1,24 @@ +# Load the license from an external source, so we don't have to keep a +# copy of it sitting around: + + LICENSE.txt http://svn.zope.org/*checkout*/Zope3/trunk/ZopePublicLicense.txt?rev=25177 + + +# Add a few things to the distribution root. + + README.txt + + +# Specify what is included in the component. + + + # Documentation files of the package: + *.txt + + # Python modules from the package: + *.py + + # Configuration files of the package: + *.zcml + + diff --git a/psycopg2da/PUBLICATION.cfg b/psycopg2da/PUBLICATION.cfg new file mode 100644 index 00000000..d4083144 --- /dev/null +++ b/psycopg2da/PUBLICATION.cfg @@ -0,0 +1,9 @@ +Metadata-Version: 1.0 +Name: psycopg2da +Summary: Psycopg2 Database Adapter for Zope 3 +Author: Fabio Tranchitella +Author-email: kobold@debian.org +License: ZPL 2.1 +Description: + This package allows Zope 3 to connect to any PostGreSQL database via + the common Zope 3 RDB connection facilities. diff --git a/psycopg2da/README.txt b/psycopg2da/README.txt new file mode 100644 index 00000000..48d2fdda --- /dev/null +++ b/psycopg2da/README.txt @@ -0,0 +1,79 @@ +========== +psycopg2da +========== + +This file outlines the basics of using Zope3 with PostgreSQL via PsycopgDA. + +Installing PsycopgDA +-------------------- + +1. Check out the psycopg2da package into a directory in your + PYTHONPATH. INSTANCE_HOME/lib/python or Zope3/src is usually the + most convenient place: + + + svn co svn://svn.zope.org/repos/main/psycopg2da/trunk psycopg2da + + +2. Copy `psycopg2-configure.zcml` to the `package-includes` directory + of your Zope instance. + + +Creating Database Connections +------------------------------ + +It is time to add some connections. A connection in Zope 3 is +registered as a utility. + +3. Open a web browser on your Zope root folder (http://localhost:8080/ + if you use the default settings in zope.conf.in). + +4. Click on the 'Manage Site' action on the right side of the + screen. You should see a screen which reads 'Common Site Management + Tasks' + +5. Around the middle of that page, you should see a link named 'Add + Utility'. Click on it. + +6. Select 'Psycopg DA' and type in a name at the bottom of the page. + +7. Enter the database connection string. It looks like this: + + dbi://username:password@host:port/databasename + +8. Click on the 'Add' button. + +9. You should be on a page which reads 'Add Database Connection + Registration'. There you can configure the permission needed to use + the database connection, the name of the registration and the + registration status. You can use any name for 'Register As' field, + as long as it doesn't clash with an existing one. Choose a + permission. Choose between 'Registered' and 'Active' for the + 'Registration Status'. Only one component of a kind can be 'Active' + at a time, so be careful. + +10. You should be redirected to the 'Edit' screen of the connection + utility. + +11. If you want to, you can go to the Test page and execute arbitrary + SQL queries to see whether the connection is working as expected. + + +Using SQL Scripts +----------------- + +You can create SQL Scripts in the content space. For example: + +12. Go to Zope root. + +13. Add an SQL script (you can use the Common Tasks box on the left, + or the Add action on the right). + +14. Click on the name of your new SQL script. + +15. Choose a connection name (the one you entered in step 29) from the + drop-down. + +16. Enter your query and click on the 'Save Changes' button. + +17. You can test the script in the -- surprise! -- Test page. diff --git a/psycopg2da/__init__.py b/psycopg2da/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/psycopg2da/adapter.py b/psycopg2da/adapter.py new file mode 100644 index 00000000..352afc2a --- /dev/null +++ b/psycopg2da/adapter.py @@ -0,0 +1,425 @@ +# psycopg2da +# Copyright (C) 2006 Fabio Tranchitella +# +# Based on psycopgda: +# +# Copyright (c) 2002-2006 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# If you prefer you can use this package using the ZPL license as +# published on the Zope web site, http://www.zope.org/Resources/ZPL. +# +"""PostgreSQL Database Adapter for Zope 3""" + +from zope.interface import implements +from zope.rdb import ZopeDatabaseAdapter, parseDSN, ZopeConnection, ZopeCursor +from zope.rdb.interfaces import DatabaseException, IZopeConnection +from zope.publisher.interfaces import Retry + +from datetime import date, time, datetime, timedelta + +import psycopg2 +import psycopg2.extensions +import re +import sys + +# OIDs from psycopg/pgtypes.h +DATE_OID = 1082 +TIME_OID = 1083 +TIMETZ_OID = 1266 +TIMESTAMP_OID = 1114 +TIMESTAMPTZ_OID = 1184 +INTERVAL_OID = 1186 +CHAR_OID = 18 +TEXT_OID = 25 +BPCHAR_OID = 1042 +VARCHAR_OID = 1043 + +# date/time parsing functions +_dateFmt = re.compile(r"^(\d\d\d\d)-?([01]\d)-?([0-3]\d)$") + +def parse_date(s): + """Parses ISO-8601 compliant dates and returns a tuple (year, month, + day). + + The following formats are accepted: + YYYY-MM-DD (extended format) + YYYYMMDD (basic format) + """ + m = _dateFmt.match(s) + if m is None: + raise ValueError, 'invalid date string: %s' % s + year, month, day = m.groups() + return int(year), int(month), int(day) + + +_timeFmt = re.compile( + r"^([0-2]\d)(?::?([0-5]\d)(?::?([0-5]\d)(?:[.,](\d+))?)?)?$") + +def parse_time(s): + """Parses ISO-8601 compliant times and returns a tuple (hour, minute, + second). + + The following formats are accepted: + HH:MM:SS.ssss or HHMMSS.ssss + HH:MM:SS,ssss or HHMMSS,ssss + HH:MM:SS or HHMMSS + HH:MM or HHMM + HH + """ + m = _timeFmt.match(s) + if m is None: + raise ValueError, 'invalid time string: %s' % s + hr, mn, sc, msc = m.groups(0) + if msc != 0: + sc = float("%s.%s" % (sc, msc)) + else: + sc = int(sc) + return int(hr), int(mn), sc + + +_tzFmt = re.compile(r"^([+-])([0-2]\d)(?::?([0-5]\d))?$") + +def parse_tz(s): + """Parses ISO-8601 timezones and returns the offset east of UTC in + minutes. + + The following formats are accepted: + +/-HH:MM + +/-HHMM + +/-HH + Z (equivalent to +0000) + """ + if s == 'Z': + return 0 + m = _tzFmt.match(s) + if m is None: + raise ValueError, 'invalid time zone: %s' % s + d, hoff, moff = m.groups(0) + if d == "-": + return - int(hoff) * 60 - int(moff) + return int(hoff) * 60 + int(moff) + + +_tzPos = re.compile(r"[Z+-]") + +def parse_timetz(s): + """Parses ISO-8601 compliant times that may include timezone information + and returns a tuple (hour, minute, second, tzoffset). + + tzoffset is the offset east of UTC in minutes. It will be None if s does + not include time zone information. + + Formats accepted are those listed in the descriptions of parse_time() and + parse_tz(). Time zone should immediatelly follow time without intervening + spaces. + """ + m = _tzPos.search(s) + if m is None: + return parse_time(s) + (None,) + pos = m.start() + return parse_time(s[:pos]) + (parse_tz(s[pos:]),) + + +_datetimeFmt = re.compile(r"[T ]") + +def _split_datetime(s): + """Split date and time parts of ISO-8601 compliant timestamp and + return a tuple (date, time). + + ' ' or 'T' used to separate date and time parts. + """ + m = _datetimeFmt.search(s) + if m is None: + raise ValueError, 'time part of datetime missing: %s' % s + pos = m.start() + return s[:pos], s[pos + 1:] + + +def parse_datetime(s): + """Parses ISO-8601 compliant timestamp and returns a tuple (year, month, + day, hour, minute, second). + + Formats accepted are those listed in the descriptions of parse_date() and + parse_time() with ' ' or 'T' used to separate date and time parts. + """ + dt, tm = _split_datetime(s) + return parse_date(dt) + parse_time(tm) + + +def parse_datetimetz(s): + """Parses ISO-8601 compliant timestamp that may include timezone + information and returns a tuple (year, month, day, hour, minute, second, + tzoffset). + + tzoffset is the offset east of UTC in minutes. It will be None if s does + not include time zone information. + + Formats accepted are those listed in the descriptions of parse_date() and + parse_timetz() with ' ' or 'T' used to separate date and time parts. + """ + dt, tm = _split_datetime(s) + return parse_date(dt) + parse_timetz(tm) + + +def parse_interval(s): + """Parses PostgreSQL interval notation and returns a tuple (years, months, + days, hours, minutes, seconds). + + Values accepted: + interval ::= date + | time + | date time + date ::= date_comp + | date date_comp + date_comp ::= 1 'day' + | number 'days' + | 1 'month' + | 1 'mon' + | number 'months' + | number 'mons' + | 1 'year' + | number 'years' + time ::= number ':' number + | number ':' number ':' number + | number ':' number ':' number '.' fraction + """ + years = months = days = 0 + hours = minutes = seconds = 0 + elements = s.split() + # Tests with 7.4.6 on Ubuntu 5.4 interval output returns 'mon' and 'mons' + # and not 'month' or 'months' as expected. I've fixed this and left + # the original matches there too in case this is dependant on + # OS or PostgreSQL release. + for i in range(0, len(elements) - 1, 2): + count, unit = elements[i:i+2] + if unit == 'day' and count == '1': + days += 1 + elif unit == 'days': + days += int(count) + elif unit == 'month' and count == '1': + months += 1 + elif unit == 'mon' and count == '1': + months += 1 + elif unit == 'months': + months += int(count) + elif unit == 'mons': + months += int(count) + elif unit == 'year' and count == '1': + years += 1 + elif unit == 'years': + years += int(count) + else: + raise ValueError, 'unknown time interval %s %s' % (count, unit) + if len(elements) % 2 == 1: + hours, minutes, seconds = parse_time(elements[-1]) + return (years, months, days, hours, minutes, seconds) + + +# Type conversions +def _conv_date(s, cursor): + if s: + return date(*parse_date(s)) + +def _conv_time(s, cursor): + if s: + hr, mn, sc = parse_time(s) + sc, micro = divmod(sc, 1.0) + micro = round(micro * 1000000) + return time(hr, mn, int(sc), int(micro)) + +def _conv_timetz(s, cursor): + if s: + from zope.datetime import tzinfo + hr, mn, sc, tz = parse_timetz(s) + sc, micro = divmod(sc, 1.0) + micro = round(micro * 1000000) + if tz: tz = tzinfo(tz) + return time(hr, mn, int(sc), int(micro), tz) + +def _conv_timestamp(s, cursor): + if s: + y, m, d, hr, mn, sc = parse_datetime(s) + sc, micro = divmod(sc, 1.0) + micro = round(micro * 1000000) + return datetime(y, m, d, hr, mn, int(sc), int(micro)) + +def _conv_timestamptz(s, cursor): + if s: + from zope.datetime import tzinfo + y, m, d, hr, mn, sc, tz = parse_datetimetz(s) + sc, micro = divmod(sc, 1.0) + micro = round(micro * 1000000) + if tz: tz = tzinfo(tz) + return datetime(y, m, d, hr, mn, int(sc), int(micro), tz) + +def _conv_interval(s, cursor): + if s: + y, m, d, hr, mn, sc = parse_interval(s) + if (y, m) != (0, 0): + # XXX: Currently there's no way to represent years and months as + # timedeltas + return s + else: + return timedelta(days=d, hours=hr, minutes=mn, seconds=sc) + +def _get_string_conv(encoding): + def _conv_string(s, cursor): + if s is not None: + s = s.decode(encoding) + return s + return _conv_string + +# User-defined types +DATE = psycopg2.extensions.new_type((DATE_OID,), "ZDATE", _conv_date) +TIME = psycopg2.extensions.new_type((TIME_OID,), "ZTIME", _conv_time) +TIMETZ = psycopg2.extensions.new_type((TIMETZ_OID,), "ZTIMETZ", _conv_timetz) +TIMESTAMP = psycopg2.extensions.new_type((TIMESTAMP_OID,), "ZTIMESTAMP", _conv_timestamp) +TIMESTAMPTZ = psycopg2.extensions.new_type((TIMESTAMPTZ_OID,), "ZTIMESTAMPTZ", _conv_timestamptz) +INTERVAL = psycopg2.extensions.new_type((INTERVAL_OID,), "ZINTERVAL", _conv_interval) + +def registerTypes(encoding): + """Register type conversions for psycopg""" + psycopg2.extensions.register_type(DATE) + psycopg2.extensions.register_type(TIME) + psycopg2.extensions.register_type(TIMETZ) + psycopg2.extensions.register_type(TIMESTAMP) + psycopg2.extensions.register_type(TIMESTAMPTZ) + psycopg2.extensions.register_type(INTERVAL) + STRING = psycopg2.extensions.new_type((CHAR_OID, TEXT_OID, BPCHAR_OID, VARCHAR_OID), "ZSTRING", _get_string_conv(encoding)) + psycopg2.extensions.register_type(STRING) + + +dsn2option_mapping = {'host': 'host', + 'port': 'port', + 'dbname': 'dbname', + 'username': 'user', + 'password': 'password'} + +class Psycopg2Adapter(ZopeDatabaseAdapter): + """A psycopg2 adapter for Zope3. + + The following type conversions are performed: + + DATE -> datetime.date + TIME -> datetime.time + TIMETZ -> datetime.time + TIMESTAMP -> datetime.datetime + TIMESTAMPTZ -> datetime.datetime + + XXX: INTERVAL cannot be represented exactly as datetime.timedelta since + it might be something like '1 month', which is a variable number of days. + """ + + def connect(self): + if not self.isConnected(): + try: + self._v_connection = Psycopg2Connection( + self._connection_factory(), self + ) + except psycopg2.Error, error: + raise DatabaseException, str(error) + + def registerTypes(self): + registerTypes(self.getEncoding()) + + def _connection_factory(self): + """Create a psycopg2 DBI connection based on the DSN""" + self.registerTypes() + conn_info = parseDSN(self.dsn) + conn_list = [] + for dsnname, optname in dsn2option_mapping.iteritems(): + if conn_info[dsnname]: + conn_list.append('%s=%s' % (optname, conn_info[dsnname])) + conn_str = ' '.join(conn_list) + connection = psycopg2.connect(conn_str) + connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + return connection + + def disconnect(self): + if self.isConnected(): + try: + self._v_connection.close() + except psycopg2.InterfaceError: + pass + self._v_connection = None + + +def _handle_psycopg_exception(error): + """Called from a exception handler for psycopg2.Error. + + If we have a serialization exception or a deadlock, we should retry the + transaction by raising a Retry exception. Otherwise, we reraise. + """ + if not error.args: + raise + msg = error.args[0] + # These messages are from PostgreSQL 8.0. They may change between + # PostgreSQL releases - if so, the different messages should be added + # rather than the existing ones changed so this logic works with + # different versions. + if msg.startswith( + 'ERROR: could not serialize access due to concurrent update' + ): + raise Retry(sys.exc_info()) + if msg.startswith('ERROR: deadlock detected'): + raise Retry(sys.exc_info()) + raise + + +class IPsycopg2ZopeConnection(IZopeConnection): + """A marker interface stating that this connection uses PostgreSQL.""" + + +class Psycopg2Connection(ZopeConnection): + + implements(IPsycopg2ZopeConnection) + + def cursor(self): + """See IZopeConnection""" + return Psycopg2Cursor(self.conn.cursor(), self) + + def commit(self): + try: + ZopeConnection.commit(self) + except psycopg2.Error, error: + _handle_psycopg_exception(error) + + +class Psycopg2Cursor(ZopeCursor): + + def execute(self, operation, parameters=None): + """See IZopeCursor""" + try: + return ZopeCursor.execute(self, operation, parameters) + except psycopg2.Error, error: + _handle_psycopg_exception(error) + + def executemany(operation, seq_of_parameters=None): + """See IZopeCursor""" + raise RuntimeError, 'Oos' + try: + return ZopeCursor.execute(self, operation, seq_of_parameters) + except psycopg2.Error, error: + _handle_psycopg_exception(error) diff --git a/psycopg2da/configure.zcml b/psycopg2da/configure.zcml new file mode 100644 index 00000000..7671fb75 --- /dev/null +++ b/psycopg2da/configure.zcml @@ -0,0 +1,51 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/psycopg2da/psycopgda-configure.zcml b/psycopg2da/psycopgda-configure.zcml new file mode 100644 index 00000000..2a395437 --- /dev/null +++ b/psycopg2da/psycopgda-configure.zcml @@ -0,0 +1 @@ + diff --git a/psycopg2da/tests.py b/psycopg2da/tests.py new file mode 100644 index 00000000..4a75f4e8 --- /dev/null +++ b/psycopg2da/tests.py @@ -0,0 +1,389 @@ +# psycopg2da +# Copyright (C) 2006 Fabio Tranchitella +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +"""Unit tests for Psycopg2DA.""" + +from unittest import TestCase, TestSuite, main, makeSuite +from datetime import tzinfo, timedelta + +import psycopg2 +import psycopg2.extensions + + +class Stub(object): + + def __init__(self, **kw): + self.__dict__.update(kw) + +class TZStub(tzinfo): + + def __init__(self, h, m): + self.offset = h * 60 + m + + def utcoffset(self, dt): + return timedelta(minutes=self.offset) + + def dst(self, dt): + return 0 + + def tzname(self, dt): + return '' + + def __repr__(self): + return 'tzinfo(%d)' % self.offset + + def __reduce__(self): + return type(self), (), self.__dict__ + +class ConnectionStub(object): + + def set_isolation_level(self, level): + pass + +class Psycopg2Stub(object): + + __shared_state = {} # 'Borg' design pattern + + DATE = psycopg2.extensions.DATE + TIME = psycopg2.extensions.TIME + DATETIME = psycopg2.DATETIME + INTERVAL = psycopg2.extensions.INTERVAL + ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE + + def __init__(self): + self.__dict__ = self.__shared_state + self.types = {} + + def connect(self, connection_string): + self.last_connection_string = connection_string + return ConnectionStub() + + def new_type(self, values, name, converter): + return Stub(name=name, values=values) + + def register_type(self, type): + for typeid in type.values: + self.types[typeid] = type + + def getExtensions(self): + return self + extensions = property(getExtensions) + + +class TestPsycopg2TypeConversion(TestCase): + + def test_conv_date(self): + from psycopg2da.adapter import _conv_date + from datetime import date + def c(s): + return _conv_date(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('2001-03-02'), date(2001, 3, 2)) + + def test_conv_time(self): + from psycopg2da.adapter import _conv_time + from datetime import time + def c(s): + return _conv_time(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('23:17:57'), time(23, 17, 57)) + self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000)) + + def test_conv_timetz(self): + from psycopg2da.adapter import _conv_timetz + from datetime import time + def c(s): + return _conv_timetz(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0))) + self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30))) + + def test_conv_timestamp(self): + from psycopg2da.adapter import _conv_timestamp + from datetime import datetime + def c(s): + return _conv_timestamp(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('2001-03-02 12:44:01'), + datetime(2001, 3, 2, 12, 44, 01)) + self.assertEquals(c('2001-03-02 12:44:01.037'), + datetime(2001, 3, 2, 12, 44, 01, 37000)) + self.assertEquals(c('2001-03-02 12:44:01.000001'), + datetime(2001, 3, 2, 12, 44, 01, 1)) + + def test_conv_timestamptz(self): + from psycopg2da.adapter import _conv_timestamptz + from datetime import datetime + def c(s): + return _conv_timestamptz(s, None) + self.assertEquals(c(''), None) + + self.assertEquals(c('2001-03-02 12:44:01+01:00'), + datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0))) + self.assertEquals(c('2001-03-02 12:44:01.037-00:30'), + datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30))) + self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'), + datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0))) + self.assertEquals(c('2001-06-25 12:14:00-07'), + datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0))) + + def test_conv_interval(self): + from psycopg2da.adapter import _conv_interval + from datetime import timedelta + def c(s): + return _conv_interval(s, None) + + self.assertEquals(c(''), None) + self.assertEquals(c('01:00'), timedelta(hours=1)) + self.assertEquals(c('00:15'), timedelta(minutes=15)) + self.assertEquals(c('00:00:47'), timedelta(seconds=47)) + self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000)) + self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111)) + self.assertEquals(c('1 day'), timedelta(days=1)) + self.assertEquals(c('2 days'), timedelta(days=2)) + self.assertEquals(c('374 days'), timedelta(days=374)) + self.assertEquals(c('2 days 03:20:15.123456'), + timedelta(days=2, hours=3, minutes=20, + seconds=15, microseconds=123456)) + # XXX There's a problem with years and months. Currently timedelta + # cannot represent them accurately + self.assertEquals(c('1 month'), '1 month') + self.assertEquals(c('2 months'), '2 months') + self.assertEquals(c('1 mon'), '1 mon') + self.assertEquals(c('2 mons'), '2 mons') + self.assertEquals(c('1 year'), '1 year') + self.assertEquals(c('3 years'), '3 years') + # Later we might be able to use + ## self.assertEquals(c('1 month'), timedelta(months=1)) + ## self.assertEquals(c('2 months'), timedelta(months=2)) + ## self.assertEquals(c('1 year'), timedelta(years=1)) + ## self.assertEquals(c('3 years'), timedelta(years=3)) + + self.assertRaises(ValueError, c, '2 day') + self.assertRaises(ValueError, c, '2days') + self.assertRaises(ValueError, c, '123') + + def test_conv_string(self): + from psycopg2da.adapter import _get_string_conv + _conv_string = _get_string_conv("utf-8") + def c(s): + return _conv_string(s, None) + self.assertEquals(c(None), None) + self.assertEquals(c(''), u'') + self.assertEquals(c('c'), u'c') + self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2') + self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2') + +class TestPsycopg2Adapter(TestCase): + + def setUp(self): + import psycopg2da.adapter as adapter + self.real_psycopg2 = adapter.psycopg2 + adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub() + + def tearDown(self): + import psycopg2da.adapter as adapter + adapter.psycopg2 = self.real_psycopg2 + + def test_connection_factory(self): + from psycopg2da.adapter import Psycopg2Adapter + a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored') + c = a._connection_factory() + args = self.psycopg2_stub.last_connection_string.split() + args.sort() + self.assertEquals(args, ['dbname=dbname', 'host=hostname', + 'password=password', 'port=port', + 'user=username']) + + def test_registerTypes(self): + import psycopg2da.adapter as adapter + from psycopg2da.adapter import Psycopg2Adapter + a = Psycopg2Adapter('dbi://') + a.registerTypes() + for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP', + 'TIMESTAMPTZ', 'INTERVAL'): + typeid = getattr(adapter, '%s_OID' % typename) + result = self.psycopg2_stub.types.get(typeid, None) + if not result: + # comparing None with psycopg2.type object segfaults + self.fail("did not register %s (%d): got None, not Z%s" + % (typename, typeid, typename)) + else: + result_name = getattr(result, 'name', 'None') + self.assertEquals(result, getattr(adapter, typename), + "did not register %s (%d): got %s, not Z%s" + % (typename, typeid, result_name, typename)) + + +class TestISODateTime(TestCase): + + # Test if date/time parsing functions accept a sensible subset of ISO-8601 + # compliant date/time strings. + # + # Resources: + # http://www.cl.cam.ac.uk/~mgk25/iso-time.html + # http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html + # http://www.w3.org/TR/NOTE-datetime + # http://www.ietf.org/rfc/rfc3339.txt + + basic_dates = (('20020304', (2002, 03, 04)), + ('20000229', (2000, 02, 29))) + + extended_dates = (('2002-03-04', (2002, 03, 04)), + ('2000-02-29', (2000, 02, 29))) + + basic_times = (('12', (12, 0, 0)), + ('1234', (12, 34, 0)), + ('123417', (12, 34, 17)), + ('123417.5', (12, 34, 17.5)), + ('123417,5', (12, 34, 17.5))) + + extended_times = (('12', (12, 0, 0)), + ('12:34', (12, 34, 0)), + ('12:34:17', (12, 34, 17)), + ('12:34:17.5', (12, 34, 17.5)), + ('12:34:17,5', (12, 34, 17.5))) + + basic_tzs = (('Z', 0), + ('+02', 2*60), + ('+1130', 11*60+30), + ('-05', -5*60), + ('-0030', -30)) + + extended_tzs = (('Z', 0), + ('+02', 2*60), + ('+11:30', 11*60+30), + ('-05', -5*60), + ('-00:30', -30)) + + time_separators = (' ', 'T') + + bad_dates = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29', + '1990/13/14') + + bad_times = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,') + + bad_timetzs = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q') + + bad_datetimes = ('', 'foo', '2002-03-0412:33') + + bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200') + + exception_type = ValueError + + # We need the following funcions: + # parse_date -> (year, month, day) + # parse_time -> (hour, minute, second) + # parse_timetz -> (hour, minute, second, tzoffset) + # parse_datetime -> (year, month, day, hour, minute, second) + # parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset) + # second can be a float, all other values are ints + # tzoffset is offset in minutes east of UTC + + def setUp(self): + from psycopg2da.adapter import parse_date, parse_time, \ + parse_timetz, parse_datetime, parse_datetimetz + self.parse_date = parse_date + self.parse_time = parse_time + self.parse_timetz = parse_timetz + self.parse_datetime = parse_datetime + self.parse_datetimetz = parse_datetimetz + + def test_basic_date(self): + for s, d in self.basic_dates: + self.assertEqual(self.parse_date(s), d) + + def test_extended_date(self): + for s, d in self.extended_dates: + self.assertEqual(self.parse_date(s), d) + + def test_bad_date(self): + for s in self.bad_dates: + self.assertRaises(self.exception_type, self.parse_date, s) + + def test_basic_time(self): + for s, t in self.basic_times: + self.assertEqual(self.parse_time(s), t) + + def test_extended_time(self): + for s, t in self.extended_times: + self.assertEqual(self.parse_time(s), t) + + def test_bad_time(self): + for s in self.bad_times: + self.assertRaises(self.exception_type, self.parse_time, s) + + def test_basic_timetz(self): + for s, t in self.basic_times: + for tz, off in self.basic_tzs: + self.assertEqual(self.parse_timetz(s+tz), t + (off,)) + + def test_extended_timetz(self): + for s, t in self.extended_times: + for tz, off in self.extended_tzs: + self.assertEqual(self.parse_timetz(s+tz), t + (off,)) + + def test_bad_timetzs(self): + for s in self.bad_timetzs: + self.assertRaises(self.exception_type, self.parse_timetz, s) + + def test_basic_datetime(self): + for ds, d in self.basic_dates: + for ts, t in self.basic_times: + for sep in self.time_separators: + self.assertEqual(self.parse_datetime(ds+sep+ts), d + t) + + def test_extended_datetime(self): + for ds, d in self.extended_dates: + for ts, t in self.extended_times: + for sep in self.time_separators: + self.assertEqual(self.parse_datetime(ds+sep+ts), d + t) + + def test_bad_datetimes(self): + for s in self.bad_datetimes: + self.assertRaises(self.exception_type, self.parse_datetime, s) + + def test_basic_datetimetz(self): + for ds, d in self.basic_dates: + for ts, t in self.basic_times: + for tz, off in self.basic_tzs: + for sep in self.time_separators: + self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz), + d + t + (off,)) + + def test_extended_datetimetz(self): + for ds, d in self.extended_dates: + for ts, t in self.extended_times: + for tz, off in self.extended_tzs: + for sep in self.time_separators: + self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz), + d + t + (off,)) + + def test_bad_datetimetzs(self): + for s in self.bad_datetimetzs: + self.assertRaises(self.exception_type, self.parse_datetimetz, s) + + +def test_suite(): + return TestSuite(( + makeSuite(TestPsycopg2TypeConversion), + makeSuite(TestPsycopg2Adapter), + makeSuite(TestISODateTime), + )) + +if __name__=='__main__': + main(defaultTest='test_suite')