python数据库链接池模块DBUtils

899次阅读
没有评论

共计 4126 个字符,预计需要花费 11 分钟才能阅读完成。

引入

👉Pymysql 模块的基本使用

  • 使用示例
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='shawn')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute("select id,username,mobile from ss_users where username=%(username)s",{'username':'shawn'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()

print(obj)  # {'id': 1, 'username': 'shawn', 'mobile': '15979302285'}

如果每到一个请求过来就创建一个链接这样效率会很低(Django 就是这样)

建立连接 >> 通信 >> 这一环节会很耗时间: 网络延时等

所以建立连接池

DBUtils 模块

DBUtils 是 Python 的一个用于实现数据库连接池的模块

1. 模块安装

pip3 install DBUtils

2. 模块导入

  • 老版本导入方式
from DBUtils.PooledDB import PooledDB
  • 新版本(因为 Python 后面导入模块全变成小写的形式)
from dbutils.pooled_db import PooledDB

3. 简单使用步骤

  • mysql_pool.py : 新建一个 Py 文件, 创建一个池对象
import pymysql

from dbutils.pooled_db import PooledDB


# 创建一个连接池对象
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=10,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0 和 None 表示全部共享。PS: 无用,因为 pymysql 和 MySQLdb 等模块的 threadsafety 都为 1,所有值无论设置为多少,_maxcached 永远为 0,所以永远是所有链接都共享。blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
    setsession=[],  # 开始会话前执行的命令列表。ping=0,  # ping MySQL 服务端,检查是否服务可用。host='127.0.0.1',
    port=3306,
    user='root',
    password='123456',
    database='shawn',
    charset='utf8'
)

为每个线程创建一个连接,线程即使调用了 close 方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭

  • 视图函数中使用
from flask import Flask
from mysql_pool import POOL

app = Flask(__name__)

# 在视图函数中使用
@app.route('/')
def index():
    conn = POOL.connection()
    cursor = conn.cursor()
    cursor.execute('select id,username,mobile from ss_users where username=%(username)s',{"username":"shawn"})
    result = cursor.fetchall()
    cursor.close()
    conn.close()  # 这里的关闭也没有彻底关闭,而是将线程放回数据库连接池中去了
    print(result)  # ((1, 'shawn', '15979302285'),)
    return str(result)

if __name__ == '__main__':
    app.run()

4. 封装成一个类, 调用其方法

  • settings.py 中书写 Config 配置类, 调用该类得到一个池对象
from datetime import timedelta
from redis import Redis
import pymysql
from dbutils.pooled_db import PooledDB, SharedDBConnection

class Config(object):
    DEBUG = True
    SECRET_KEY = "random_key"
    PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
    SESSION_REFRESH_EACH_REQUEST = True
    SESSION_TYPE = "redis"
    PYMYSQL_POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0 和 None 表示全部共享。PS: 无用,因为 pymysql 和 MySQLdb 等模块的 threadsafety 都为 1,所有值无论设置为多少,_maxcached 永远为 0,所以永远是所有链接都共享。blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL 服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='shawn',
        charset='utf8'
    )


# 生产环境
class ProductionConfig(Config):
    SESSION_REDIS = Redis(host='192.168.0.94', port='6379')


# 开发环境
class DevelopmentConfig(Config):
    SESSION_REDIS = Redis(host='127.0.0.1', port='6379')


# 测试环境
class TestingConfig(Config):
    pass
  • 封装池对象操作方法
import pymysql
from settings import Config
from flask import Flask


class SQLHelper(object):
    # 建立链接、游标
    @staticmethod
    def open(cursor):
        POOL = Config.PYMYSQL_POOL
        conn = POOL.connection()
        cursor = conn.cursor(cursor=cursor)
        return conn, cursor

    # 关闭游标、链接
    @staticmethod
    def close(conn, cursor):
        conn.commit()
        cursor.close()
        conn.close()

    # 接收原生 SQL 语句并执行, 得到一条结果返回 (期间调用 close() 方法)
    @classmethod
    def fetch_one(cls, sql, args, cursor=pymysql.cursors.DictCursor):
        conn, cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn, cursor)
        return obj

    # 接收原生 SQL 语句并执行, 得到所有结果返回 (期间调用 close() 方法)
    @classmethod
    def fetch_all(cls, sql, args, cursor=pymysql.cursors.DictCursor):
        conn, cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchall()
        cls.close(conn, cursor)
        return obj

# 视图函数中使用
@app.route('/')
def index():
    obj = SQLHelper.fetch_one("select id,username,mobile from ss_users where username=%(username)s",
                              {'username': 'shawn'})
    print(obj)  # {'id': 1, 'username': 'shawn', 'mobile': '15979302285'}
    return str(obj)


if __name__ == '__main__':
    app.run()
去数据库的服务通过一条命令,是可以看到当前服务的连接数是多少
使用 requests+ 线程池发送 100 个请求
看一下不使用连接池和使用连接池的 mysql 的连接数

https://blog.csdn.net/weixin_33146151/article/details/113459478

https://blog.csdn.net/caodongfang126/article/details/52764213

正文完
 
shawn
版权声明:本站原创文章,由 shawn 2023-06-16发表,共计4126字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)