DBUtils让连接数据库更高效

DBUtils让连接数据库更高效

 

DBUtils 是一套允许线程化 Python 程序可以安全和有效的访问数据库的模块。DBUtils实际上是一个包含两个子模块的Python包,一个用于连接DB-API 2模块,另一个用于连接典型的PyGreSQL 模块。详细内容请参考用户手册。

本文章利用DBUtils构造了一套简单的数据库连接模块,可支持mysql和pgdb,需要其他兼容DB-API2的数据库接口,可以根据例子自行扩展,不限于连接池方式,传统方式也可。下边直接上代码:

Connector.py

#coding:utf-8
"""
@author = yinzhixin
@version = 1.0

"""

from DBUtils.PooledDB import PooledDB
from DBUtils.PooledPg import PooledPg
from DBConfig import MYSQL_CONF, PGDB_CONF


class BaseDB(object):
    """
    @summary:数据库连接基类,包含公用方法,
            不同数据库定义各自的连接类,
            继承自该基类获取操作数据库方法
    @note:该类不可以实例化

    """
    def __init__(self):
        raise RuntimeError("The BaseDB can not be instanced")

    def __dictfetchall(self, cursor):
        """
        @return:[{k:v,k2:v2},{}....]
        """
        coloumns = [row[0] for row in cursor.description]
        result = []
        for row in cursor.fetchall():
            itemrow = []
            for item in row:
                itemrow.append(str(item))
            result.append(itemrow)
        return [dict(zip(coloumns, row)) for row in result]

    def __getInsertId(self):
        """
        获取当前连接最后一次插入操作生成的id,如果没有则为0
        """
        self._cursor.execute("SELECT @@IDENTITY AS id")
        result = self._cursor.fetchall()
        return result[0]['id']

    def __query(self, sql, *param):
        """
        内部共用方法,供增删改查方法调用
        """
        if not param:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql,*param)
        return count


    def query(self, sql, *param):
        """
        @summary:select查询
        @notes:count小于0时默认返回的是None
        """
        count = self.__query(sql, *param)
        if count > 0:
            result = self.__dictfetchall(self._cursor)
            return result

    def insert(self,sql,*param):
        self.__query(sql, *param)
        return self.__getInsertId()

    def update(self,sql,param=None):
        return self.__query(sql,param)

    def delete(self,sql,param=None):
        return self.__query(sql,param)

    def dispose(self,isEnd=1):
        """
        @summary: 释放连接池资源
        """
        self._cursor.close()
        self._conn.close()


class Mysql(BaseDB):
    """mysql连接类"""
    _pool = None

    def __init__(self):
        self._cursor = self._conn.cursor()  

    @property
    def _conn(self):
        if Mysql._pool is None:
            Mysql._pool = PooledDB(**MYSQL_CONF)
            #Mysql._pool = PersistentDB(**MYSQL_CONF)
        return Mysql._pool.connection()


class GPDB(BaseDB):
    """GPDB连接类"""
    _pool = None

    def __init__(self):
        self._cursor = self._conn.cursor()

    @property
    def _conn(self):
        if GPDB._pool is None:
            GPDB._pool = PooledPg(**GPDB_CONF)
        return GPDB._pool.connection()

if __name__ == '__main__':
    #forbidden = BaseDB()
    db = Mysql()
    result = db.query("select * from %s limit %s" % ('test_table',2))
    print result
    db.dispose()

DBConfig.py

#coding:utf-8
"""
@author = yinzhixin
@version = 1.0
@summary: 基于DB-API2.0的各种数据库连击配置信息
		  需要安装PyGreSQL、MySQLdb模块
@tips: 1、注意charset设置编码时为utf8,没有通常的横杠'-'
	   2、port为int类型,不能字符串
	   3、各配置参数意义请参考DBUtils文档
"""

import pgdb		
import MySQLdb
from MySQLdb.cursors import DictCursor
from pprint import pprint



PGDB_CONF = {
	'creator': 'pgdb',
	'mincached': 0,
	'maxcached': 0,
	'host': '127.0.0.1',
	'port': 5324,
	'user': 'test',
	'passwd': '1234',
	'db': 'test',
	'charset': 'utf8',
}

MYSQL_CONF = {
	'creator':MySQLdb,
	'mincached': 0,
	'maxcached': 0,
	'host': '127.0.0.1',
	'port': 3306,
	'user': 'test',
	'passwd': 'test',
	'db': 'test',
	'charset': 'utf8'
	#'cursorclass': DictCursor
}


if __name__ =='__main__':
	pprint(MYSQL)

 

Comments are closed.