"""SteadyDB - hardened DB-API 2 connections.
Implements steady connections to a database based on an
arbitrary DB-API 2 compliant database interface module.
The connections are transparently reopened when they are
closed or the database connection has been lost or when
they are used more often than an optional usage limit.
Database cursors are transparently reopened as well when
the execution of a database operation cannot be performed
due to a lost connection. Only if the connection is lost
after the execution, when rows are already fetched from the
database, this will give an error and the cursor will not
be reopened automatically, because there is no reliable way
to recover the state of the cursor in such a situation.
Connections which have been marked as being in a transaction
with a begin() call will not be silently replaced either.
A typical situation where database connections are lost
is when the database server or an intervening firewall is
shutdown and restarted for maintenance reasons. In such a
case, all database connections would become unusable, even
though the database service may be already available again.
The "hardened" connections provided by this module will
make the database connections immediately available again.
This approach results in a steady database connection that
can be used by PooledDB or PersistentDB to create pooled or
persistent connections to a database in a threaded environment
such as the application server of "Webware for Python."
Note, however, that the connections themselves may not be
thread-safe (depending on the used DB-API module).
For the Python DB-API 2 specification, see:
http://www.python.org/peps/pep-0249.html
For information on Webware for Python, see:
http://www.webwareforpython.org
Usage:
You can use the connection constructor connect() in the same
way as you would use the connection constructor of a DB-API 2
module if you specify the DB-API 2 module to be used as the
first parameter, or alternatively you can specify an arbitrary
constructor function returning new DB-API 2 compliant connection
objects as the first parameter. Passing just a function allows
implementing failover mechanisms and load balancing strategies.
You may also specify a usage limit as the second parameter
(set it to None if you prefer unlimited usage), an optional
list of commands that may serve to prepare the session as a
third parameter, the exception classes for which the failover
mechanism shall be applied, and you can specify whether is is
allowed to close the connection (by default this is true).
When the connection to the database is lost or has been used
too often, it will be transparently reset in most situations,
without further notice.
import pgdb # import used DB-API 2 module
from DBUtils.SteadyDB import connect
db = connect(pgdb, 10000, ["set datestyle to german"],
host=..., database=..., user=..., ...)
...
cursor = db.cursor()
...
cursor.execute('select ...')
result = cursor.fetchall()
...
cursor.close()
...
db.close()
Ideas for improvement:
* Alternatively to the maximum number of uses,
implement a maximum time to live for connections.
* Optionally log usage and loss of connection.
Copyright, credits and license:
* Contributed as supplement for Webware for Python and PyGreSQL
by Christoph Zwerschke in September 2005
* Allowing creator functions as first parameter as in SQLAlchemy
suggested by Ezio Vernacotola in December 2006
Licensed under the Open Software License version 2.1.
"""
__version__ = '1.1'
__revision__ = "$Rev: 8218 $"
__date__ = "$Date: 2011-08-14 13:57:11 +0200 (So, 14. Aug 2011) $"
import sys
class SteadyDBError(Exception):
"""General SteadyDB error."""
class InvalidCursor(SteadyDBError):
"""Database cursor is invalid."""
def connect(creator, maxusage=None, setsession=None,
failures=None, ping=1, closeable=True, *args, **kwargs):
"""A tough version of the connection constructor of a DB-API 2 module.
creator: either an arbitrary function returning new DB-API 2 compliant
connection objects or a DB-API 2 compliant database module
maxusage: maximum usage limit for the underlying DB-API 2 connection
(number of database operations, 0 or None means unlimited usage)
callproc(), execute() and executemany() count as one operation.
When the limit is reached, the connection is automatically reset.
setsession: an optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to german", "set time zone mez"]
failures: an optional exception class or a tuple of exception classes
for which the failover mechanism shall be applied, if the default
(OperationalError, InternalError) is not adequate
ping: determines when the connection should be checked with ping()
(0 = None = never, 1 = default = when _ping_check() is called,
2 = whenever a cursor is created, 4 = when a query is executed,
7 = always, and all other bit combinations of these values)
closeable: if this is set to false, then closing the connection will
be silently ignored, but by default the connection can be closed
args, kwargs: the parameters that shall be passed to the creator
function or the connection constructor of the DB-API 2 module
"""
return SteadyDBConnection(creator, maxusage, setsession,
failures, ping, closeable, *args, **kwargs)
class SteadyDBConnection:
"""A "tough" version of DB-API 2 connections."""
version = __version__
def __init__(self, creator, maxusage=None, setsession=None,
failures=None, ping=1, closeable=True, *args, **kwargs):
"""Create a "tough" DB-API 2 connection."""
self._con = None
self._closed = True
try:
self._creator = creator.connect
self._dbapi = creator
except AttributeError:
self._creator = creator
try:
self._dbapi = creator.dbapi
except AttributeError:
try:
self._dbapi = sys.modules[creator.__module__]
if self._dbapi.connect != creator:
raise AttributeError
except (AttributeError, KeyError):
self._dbapi = None
try:
self._threadsafety = creator.threadsafety
except AttributeError:
try:
self._threadsafety = self._dbapi.threadsafety
except AttributeError:
self._threadsafety = None
if not callable(self._creator):
raise TypeError("%r is not a connection provider." % (creator,))
if maxusage is None:
maxusage = 0
if not isinstance(maxusage, (int, long)):
raise TypeError("'maxusage' must be an integer value.")
self._maxusage = maxusage
self._setsession_sql = setsession
if failures is not None and not isinstance(
failures, tuple) and not issubclass(failures, Exception):
raise TypeError("'failures' must be a tuple of exceptions.")
self._failures = failures
self._ping = isinstance(ping, int) and ping or 0
self._closeable = closeable
self._args, self._kwargs = args, kwargs
self._store(self._create())
def _create(self):
"""Create a new connection using the creator function."""
con = self._creator(*self._args, **self._kwargs)
try:
try:
if self._dbapi.connect != self._creator:
raise AttributeError
except AttributeError:
try:
mod = con.__module__
except AttributeError:
mod = None
while mod:
try:
self._dbapi = sys.modules[mod]
if not callable(self._dbapi.connect):
raise AttributeError
except (AttributeError, KeyError):
pass
else:
break
i = mod.rfind('.')
if i < 0:
mod = None
else:
mod = mod[:i]
else:
try:
mod = con.OperationalError.__module__
except AttributeError:
mod = None
while mod:
try:
self._dbapi = sys.modules[mod]
if not callable(self._dbapi.connect):
raise AttributeError
except (AttributeError, KeyError):
pass
else:
break
i = mod.rfind('.')
if i < 0:
mod = None
else:
mod = mod[:i]
else:
self._dbapi = None
if self._threadsafety is None:
try:
self._threadsafety = self._dbapi.threadsafety
except AttributeError:
try:
self._threadsafety = con.threadsafety
except AttributeError:
pass
if self._failures is None:
try:
self._failures = (self._dbapi.OperationalError,
self._dbapi.InternalError)
except AttributeError:
try:
self._failures = (self._creator.OperationalError,
self._creator.InternalError)
except AttributeError:
try:
self._failures = (con.OperationalError,
con.InternalError)
except AttributeError:
raise AttributeError(
"Could not determine failure exceptions"
" (please set failures or creator.dbapi).")
if isinstance(self._failures, tuple):
self._failure = self._failures[0]
else:
self._failure = self._failures
self._setsession(con)
except Exception, error:
try:
con.close()
except Exception:
pass
raise error
return con
def _setsession(self, con=None):
"""Execute the SQL commands for session preparation."""
if con is None:
con = self._con
if self._setsession_sql:
cursor = con.cursor()
for sql in self._setsession_sql:
cursor.execute(sql)
cursor.close()
def _store(self, con):
"""Store a database connection for subsequent use."""
self._con = con
self._transaction = False
self._closed = False
self._usage = 0
def _close(self):
"""Close the tough connection.
You can always close a tough connection with this method
and it will not complain if you close it more than once.
"""
if not self._closed:
try:
self._con.close()
except Exception:
pass
self._transaction = False
self._closed = True
def _reset(self, force=False):
"""Reset a tough connection.
Rollback if forced or the connection was in a transaction.
"""
if force or self._transaction:
try:
self.rollback()
except Exception:
pass
def _ping_check(self, ping=1, reconnect=True):
"""Check whether the connection is still alive using ping().
If the the underlying connection is not active and the ping
parameter is set accordingly, the connection will be recreated
unless the connection is currently inside a transaction.
"""
if ping & self._ping:
try:
alive = self._con.ping()
except (AttributeError, IndexError, TypeError, ValueError):
self._ping = 0
alive = None
reconnect = False
except Exception:
alive = False
else:
if alive is None:
alive = True
if alive:
reconnect = False
if reconnect and not self._transaction:
try:
con = self._create()
except Exception:
pass
else:
self._close()
self._store(con)
alive = True
return alive
def dbapi(self):
"""Return the underlying DB-API 2 module of the connection."""
if self._dbapi is None:
raise AttributeError("Could not determine DB-API 2 module"
" (please set creator.dbapi).")
return self._dbapi
def threadsafety(self):
"""Return the thread safety level of the connection."""
if self._threadsafety is None:
if self._dbapi is None:
raise AttributeError("Could not determine threadsafety"
" (please set creator.dbapi or creator.threadsafety).")
return 0
return self._threadsafety
def close(self):
"""Close the tough connection.
You are allowed to close a tough connection by default
and it will not complain if you close it more than once.
You can disallow closing connections by setting
the closeable parameter to something false. In this case,
closing tough connections will be silently ignored.
"""
if self._closeable:
self._close()
elif self._transaction:
self._reset()
def begin(self, *args, **kwargs):
"""Indicate the beginning of a transaction.
During a transaction, connections will not not transparently
replaced, and all errors will be raised to the application.
If the underlying driver supports this method, it will be called
with the given parameters (e.g. for distributed transactions).
"""
self._transaction = True
try:
begin = self._con.begin
except AttributeError:
pass
else:
begin(*args, **kwargs)
def commit(self):
"""Commit any pending transaction."""
self._transaction = False
self._con.commit()
def rollback(self):
"""Rollback pending transaction."""
self._transaction = False
self._con.rollback()
def cancel(self):
"""Cancel a long-running transaction.
If the underlying driver supports this method, it will be called.
"""
self._transaction = False
try:
cancel = self._con.cancel
except AttributeError:
pass
else:
cancel()
def ping(self, *args, **kwargs):
"""Ping connection."""
return self._con.ping(*args, **kwargs)
def _cursor(self, *args, **kwargs):
"""A "tough" version of the method cursor()."""
transaction = self._transaction
if not transaction:
self._ping_check(2)
try:
if self._maxusage:
if self._usage >= self._maxusage:
raise self._failure
cursor = self._con.cursor(*args, **kwargs)
except self._failures, error:
try:
con = self._create()
except Exception:
pass
else:
try:
cursor = con.cursor(*args, **kwargs)
except Exception:
pass
else:
self._close()
self._store(con)
if transaction:
raise error
return cursor
try:
con.close()
except Exception:
pass
if transaction:
self._transaction = False
raise error
return cursor
def cursor(self, *args, **kwargs):
"""Return a new Cursor Object using the connection."""
return SteadyDBCursor(self, *args, **kwargs)
def __del__(self):
"""Delete the steady connection."""
try:
self._close()
except Exception:
pass
class SteadyDBCursor:
"""A "tough" version of DB-API 2 cursors."""
def __init__(self, con, *args, **kwargs):
"""Create a "tough" DB-API 2 cursor."""
self._cursor = None
self._closed = True
self._con = con
self._args, self._kwargs = args, kwargs
self._clearsizes()
try:
self._cursor = con._cursor(*args, **kwargs)
except AttributeError:
raise TypeError("%r is not a SteadyDBConnection." % (con,))
self._closed = False
def setinputsizes(self, sizes):
"""Store input sizes in case cursor needs to be reopened."""
self._inputsizes = sizes
def setoutputsize(self, size, column=None):
"""Store output sizes in case cursor needs to be reopened."""
self._outputsizes[column] = size
def _clearsizes(self):
"""Clear stored input and output sizes."""
self._inputsizes = []
self._outputsizes = {}
def _setsizes(self, cursor=None):
"""Set stored input and output sizes for cursor execution."""
if cursor is None:
cursor = self._cursor
if self._inputsizes:
cursor.setinputsizes(self._inputsizes)
for column, size in self._outputsizes.iteritems():
if column is None:
cursor.setoutputsize(size)
else:
cursor.setoutputsize(size, column)
def close(self):
"""Close the tough cursor.
It will not complain if you close it more than once.
"""
if not self._closed:
try:
self._cursor.close()
except Exception:
pass
self._closed = True
def _get_tough_method(self, name):
"""Return a "tough" version of the given cursor method."""
def tough_method(*args, **kwargs):
execute = name.startswith('execute')
con = self._con
transaction = con._transaction
if not transaction:
con._ping_check(4)
try:
if con._maxusage:
if con._usage >= con._maxusage:
raise con._failure
if execute:
self._setsizes()
method = getattr(self._cursor, name)
result = method(*args, **kwargs)
if execute:
self._clearsizes()
except con._failures, error:
if not transaction:
try:
cursor2 = con._cursor(
*self._args, **self._kwargs)
except Exception:
pass
else:
try:
if execute:
self._setsizes(cursor2)
method = getattr(cursor2, name)
result = method(*args, **kwargs)
if execute:
self._clearsizes()
except Exception:
pass
else:
self.close()
self._cursor = cursor2
con._usage += 1
return result
try:
cursor2.close()
except Exception:
pass
try:
con2 = con._create()
except Exception:
pass
else:
try:
cursor2 = con2.cursor(
*self._args, **self._kwargs)
except Exception:
pass
else:
if transaction:
self.close()
con._close()
con._store(con2)
self._cursor = cursor2
raise error
try:
if execute:
self._setsizes(cursor2)
method2 = getattr(cursor2, name)
result = method2(*args, **kwargs)
if execute:
self._clearsizes()
except error.__class__:
use2 = False
except Exception, error:
use2 = True
else:
use2 = True
error = None
if use2:
self.close()
con._close()
con._store(con2)
self._cursor = cursor2
con._usage += 1
if error:
raise error
return result
try:
cursor2.close()
except Exception:
pass
try:
con2.close()
except Exception:
pass
if transaction:
self._transaction = False
raise error
else:
con._usage += 1
return result
return tough_method
def __getattr__(self, name):
"""Inherit methods and attributes of underlying cursor."""
if self._cursor:
if name.startswith('execute') or name.startswith('call'):
return self._get_tough_method(name)
else:
return getattr(self._cursor, name)
else:
raise InvalidCursor
def __del__(self):
"""Delete the steady cursor."""
try:
self.close()
except Exception:
pass