共计 4126 个字符,预计需要花费 11 分钟才能阅读完成。
引入
- 使用示例
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='shawn')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute("select id,username,mobile from ss_users where username=%(username)s",{'username':'shawn'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()
print(obj) # {'id': 1, 'username': 'shawn', 'mobile': '15979302285'}
如果每到一个请求过来就创建一个链接这样效率会很低(Django 就是这样)
建立连接 >> 通信 >> 这一环节会很耗时间: 网络延时等
所以建立连接池
DBUtils 模块
DBUtils 是 Python 的一个用于实现数据库连接池的模块
1. 模块安装
pip3 install DBUtils
2. 模块导入
- 老版本导入方式
from DBUtils.PooledDB import PooledDB
- 新版本(因为 Python 后面导入模块全变成小写的形式)
from dbutils.pooled_db import PooledDB
3. 简单使用步骤
- mysql_pool.py : 新建一个 Py 文件, 创建一个池对象
import pymysql
from dbutils.pooled_db import PooledDB
# 创建一个连接池对象
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=10, # 连接池允许的最大连接数,0 和 None 表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制
maxshared=3,
# 链接池中最多共享的链接数量,0 和 None 表示全部共享。PS: 无用,因为 pymysql 和 MySQLdb 等模块的 threadsafety 都为 1,所有值无论设置为多少,_maxcached 永远为 0,所以永远是所有链接都共享。blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制
setsession=[], # 开始会话前执行的命令列表。ping=0, # ping MySQL 服务端,检查是否服务可用。host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='shawn',
charset='utf8'
)
为每个线程创建一个连接,线程即使调用了 close 方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭
- 视图函数中使用
from flask import Flask
from mysql_pool import POOL
app = Flask(__name__)
# 在视图函数中使用
@app.route('/')
def index():
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select id,username,mobile from ss_users where username=%(username)s',{"username":"shawn"})
result = cursor.fetchall()
cursor.close()
conn.close() # 这里的关闭也没有彻底关闭,而是将线程放回数据库连接池中去了
print(result) # ((1, 'shawn', '15979302285'),)
return str(result)
if __name__ == '__main__':
app.run()
4. 封装成一个类, 调用其方法
- settings.py 中书写 Config 配置类, 调用该类得到一个池对象
from datetime import timedelta
from redis import Redis
import pymysql
from dbutils.pooled_db import PooledDB, SharedDBConnection
class Config(object):
DEBUG = True
SECRET_KEY = "random_key"
PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
SESSION_REFRESH_EACH_REQUEST = True
SESSION_TYPE = "redis"
PYMYSQL_POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0 和 None 表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制
maxshared=3,
# 链接池中最多共享的链接数量,0 和 None 表示全部共享。PS: 无用,因为 pymysql 和 MySQLdb 等模块的 threadsafety 都为 1,所有值无论设置为多少,_maxcached 永远为 0,所以永远是所有链接都共享。blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL 服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='shawn',
charset='utf8'
)
# 生产环境
class ProductionConfig(Config):
SESSION_REDIS = Redis(host='192.168.0.94', port='6379')
# 开发环境
class DevelopmentConfig(Config):
SESSION_REDIS = Redis(host='127.0.0.1', port='6379')
# 测试环境
class TestingConfig(Config):
pass
- 封装池对象操作方法
import pymysql
from settings import Config
from flask import Flask
class SQLHelper(object):
# 建立链接、游标
@staticmethod
def open(cursor):
POOL = Config.PYMYSQL_POOL
conn = POOL.connection()
cursor = conn.cursor(cursor=cursor)
return conn, cursor
# 关闭游标、链接
@staticmethod
def close(conn, cursor):
conn.commit()
cursor.close()
conn.close()
# 接收原生 SQL 语句并执行, 得到一条结果返回 (期间调用 close() 方法)
@classmethod
def fetch_one(cls, sql, args, cursor=pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchone()
cls.close(conn, cursor)
return obj
# 接收原生 SQL 语句并执行, 得到所有结果返回 (期间调用 close() 方法)
@classmethod
def fetch_all(cls, sql, args, cursor=pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchall()
cls.close(conn, cursor)
return obj
# 视图函数中使用
@app.route('/')
def index():
obj = SQLHelper.fetch_one("select id,username,mobile from ss_users where username=%(username)s",
{'username': 'shawn'})
print(obj) # {'id': 1, 'username': 'shawn', 'mobile': '15979302285'}
return str(obj)
if __name__ == '__main__':
app.run()
去数据库的服务通过一条命令,是可以看到当前服务的连接数是多少
使用 requests+ 线程池发送 100 个请求
看一下不使用连接池和使用连接池的 mysql 的连接数
https://blog.csdn.net/weixin_33146151/article/details/113459478
https://blog.csdn.net/caodongfang126/article/details/52764213
正文完