1.mogodb支持事务的前提
1) MongoDB 版本:确保 MongoDB 版本大于或等于 4.0,因为事务支持是在 4.0 版本中引入的。
2) 副本集配置:MongoDB 必须以副本集(Replica Set)或者分片集群的方式运行,不能是单机模式,注意单节点副本集也是支持事务的(即只有一个 MongoDB 实例,但以副本集模式启动)。
2.安装docker
3.单副本集安装
3.1 创建目录和文件
1) 宿主机-mongodb的数据存储目录
mkdir /mongo/data
chmod 777 /mongo/data
2) 宿主机-mongodb的配置文件目录
mkdir /mongo/conf
chmod 777 /mongo/conf
3) 宿主机-mongodb的密钥文件
cd /mongo
openssl rand -base64 756 > keyFile
chmod 400 keyFile # 一定是400,不要赋权777
chown 999:999 keyFile
4) 在/mongo/conf目录下,生成配置文件mongod.conf
# 存储配置
storage:
dbPath: /data/db # 数据库文件存储路径
engine: wiredTiger # 存储引擎(通常为 wiredTiger)
wiredTiger:
engineConfig:
cacheSizeGB: 1 # WiredTiger 缓存大小(单位:GB)
directoryForIndexes: true # 是否为索引使用单独目录
# 网络配置
net:
port: 27017 # MongoDB 监听端口
bindIp: 0.0.0.0 # 绑定 IP 地址(0.0.0.0 表示监听所有网络接口)
# 安全配置
security:
authorization: enabled # 是否启用认证
keyFile: /data/mongodb/keyFile # 密钥文件路径(用于副本集或分片集群)
# 副本集配置(如果使用副本集)
replication:
replSetName: "rs0" # 副本集名称
3.2 启动MongoDB容器
docker run -d --name mongo -p 27017:27017 \
-v /mongo/data:/data/db \
-v /mongo/conf/mongod.conf:/etc/mongod.conf \
-v /mongo/keyFile:/data/mongodb/keyFile \
mongo:latest mongod --auth --bind_ip_all --config /etc/mongod.conf
# -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=123456
如果运行上述命令后,mongodb容器处于退出状态,请排查数据挂载目录权限、配置文件问题、keyFile文件问题、27017端口没有开放、容器占用系统太多资源导致系统资源不足等问题,可通过docker logs mongo查看具体原因。
3.3 查看存储引擎和版本
# 查看mongodb版本
db.version()
# 查看当前数据库的存储引擎(需要root角色用户)
db.serverStatus().storageEngine
3.4 初始化副本集和创建用户
# 进入MongoDB容器
docker exec -it mongo mongosh
# 初始化副本集
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "172.12.112.102:27017" }
]
})
# 创建管理员用户
use admin
db.createUser({
user: 'admin',
pwd: 'admin123456',
roles: [{role: 'userAdminAnyDatabase', db: 'admin'}]})
# 创建root用户
db.createUser({
user: "root",
pwd: "123456",
roles: [{ role: "root", db: "admin" }]
})
# 验证用户
db.auth("admin", "admin123456")
db.auth("root", "123456")
3.5 创建集合
use biobank
db.createUser({user: "test", pwd: "123456", roles: [{role: "readWrite", db: "printing"}]})
exit
docker exec -it mongo mongosh
use biobank
db.auth("test", "123456")
3.6 pymongo工具验证支持事务
from pymongo import MongoClient
from pymongo.errors import PyMongoError
# 连接到 MongoDB 副本集
client = MongoClient(
'mongodb://root:123456@172.16.210.206:27017',
replicaSet='rs0'
)
# 选择数据库和集合
db = client.test_database
collection = db.test_collection
collection2 = db.test_collection2
# 创建一个会话来支持事务
with client.start_session() as session:
try:
# 开始事务
with session.start_transaction():
# 插入文档
collection.insert_one({"_id": 1, "name": "Alice", "age": 20}, session=session)
collection2.insert_one({"_id": 1, "name": "Bob", "age": 25}, session=session)
# 提交事务
session.commit_transaction()
print("事务成功提交")
except PyMongoError as e:
# 如果发生错误,中止事务
print("事务中发生错误:", e)
# session.abort_transaction()
finally:
# 确保会话结束
session.end_session()
# 关闭客户端连接
client.close()
3.6 Flask-MongoEngine使用事务
在 MongoDB 中,事务是通过会话(Session)来管理的。Flask-MongoEngine 本身不直接提供事务管理,但可以通过 PyMongo 的会话功能来实现。
MONGODB_SETTINGS = {
'db': 'biobank',
'host': 'mongodb://test:123456@172.12.112.102:27017/biobank?replicaSet=rs0'
}
# 或者
MONGODB_SETTINGS = {
'host': '172.12.112.102',
'port': 27017,
'username': 'test',
'password': '123456',
'db': 'biobank',
'authSource': 'biobank', # 指定身份验证数据库
'replicaSet': 'rs0',
'connectTimeoutMS': 5000, # 设置套接字超时时间(毫秒)。
'socketTimeoutMS': 30000, # 设置套接字超时时间(毫秒)。
'serverSelectionTimeoutMS': 5000, # 设置服务器选择超时时间(毫秒),当客户端尝试连接到 MongoDB 服务器时,如果在此时间内没有找到可用的服务器,将抛出异常。
'retryWrites': True, # 写重试
'retryReads': True # 读重试
}
class SampleModel(db.Document):
"""
样本信息表,关联项目集合和患者集合
"""
sample = db.StringField() # 样本号
subject = db.ReferenceField(Subject) # 关联受检者
projects = db.ListField(db.ReferenceField(Project)) # 关联项目
created_at = db.DateTimeField(auto_now_add=True)
updated_at = db.DateTimeField(auto_now=True)
class Subject(db.Document):
"""
受试者集合
"""
subject_name = db.StringField(required=True, max_length=100)
gender = db.StringField(choices=['male', 'female', 'unknown'])
birthday = db.StringField(max_length=10)
contact_phone = db.StringField(max_length=20) # 联系电话
employee = db.ReferenceField(Employee) # 关联员工
created_at = db.DateTimeField(auto_now_add=True)
updated_at = db.DateTimeField(auto_now=True)
class Project(db.Document):
project_name = db.StringField(required=True, max_length=100, unique=True)
description = db.StringField(max_length=500)
investigator = db.StringField(max_length=100) # 项目负责人
created_at = db.DateTimeField(auto_now_add=True)
updated_at = db.DateTimeField(auto_now=True)
def bulk_samples(data=None):
"""
创建样本数据, save/ create / insert
@param data:
@return:
"""
if data is None:
data = []
client = db.connection
bio_bank = client.biobank
session = client.start_session()
try:
with session.start_transaction():
samples_to_create = []
sample_model_fields = SampleModel.get_fields()
subjects, projects = dict(), defaultdict(list)
for item in data:
if item['no'] is None or len(item['no']) == 0:
raise ValueError('no不能为空!')
sample_data = {k: item[k] for k in item.keys() if k in sample_model_fields}
if item.get('subject_name'):
if item['subject_name'] not in subjects.keys():
subject_obj = Subject.objects(subject_name=item['subject_name']).first()
if subject_obj is None:
kwargs = {
'subject_name': item['subject_name'],
'contact_phone': item.get('subject_phone'),
'gender': item['gender'] if item.get('gender') in ['male', 'female'] else 'unknown',
'birthday': item.get('birthday')
}
subject_obj = bio_bank.subject.insert_one(kwargs, session=session).inserted_id
subjects[item['subject_name']] = subject_obj
sample_data.update({'subject': subjects[item['subject_name']]})
if item.get('project_name'):
if item['project_name'] not in projects.keys():
project_obj = Project.objects(project_name=item['project_name']).first()
if project_obj is None:
kwargs = {'project_name': item.get('project_name')}
project_obj = bio_bank.project.insert_one(kwargs, session=session).inserted_id
projects[item['project_name']] = [project_obj]
sample_data.update({'projects': projects[item['project_name']]})
samples_to_create.append(sample_data)
bio_bank.sample_model.insert_many(samples_to_create, session=session)
return 200, 'success'
except NotUniqueError:
return 409, '样本已存在,不允许重复插入'
except Exception as e:
return 400, str(e)
4.三节点副本集安装-支持事务
4.1 创建三个单副本集容器
过程和单副本集一样,区别在于一点:可以选择创建docker网络,把三个mongo容器关联起来
docker network create mongo-net
# 三个容器共用一个keyFile
# primary
docker run -d --name mongo-primary -p 27018:27017 -v /docker/mongo-replica/mongo-primary/data:/data/db -v /docker/mongo-replica/mongo-primary/conf/mongod.conf:/etc/mongod.conf -v /docker/mongo-replica/keyFile:/data/mongodb/keyFile -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=123456 --network mongo-net docker-0.unsee.tech/mongo:latest mongod --bind_ip_all --config /etc/mongod.conf
# secondary
docker run -d --name mongo-secondary1 -p 27019:27017 -v /docker/mongo-replica/mongo-secondary1/data:/data/db -v /docker/mongo-replica/mongo-secondary1/conf/mongod.conf:/etc/mongod.conf -v /docker/mongo-replica/keyFile:/data/mongodb/keyFile -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=123456 --network mongo-net docker-0.unsee.tech/mongo:latest mongod --bind_ip_all --config /etc/mongod.conf
# secondary
docker run -d --name mongo-secondary1 -p 27020:27017 -v /docker/mongo-replica/mongo-secondary2/data:/data/db -v /docker/mongo-replica/mongo-secondary2/conf/mongod.conf:/etc/mongod.conf -v /docker/mongo-replica/keyFile:/data/mongodb/keyFile -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=123456 --network mongo-net docker-0.unsee.tech/mongo:latest mongod --bind_ip_all --config /etc/mongod.conf
4.2 在主节点中配置副本集
# host配置推荐是mongo-primary:27017这种形式,但是外部的python程序解析不了主机,因此改为ip+port的形式
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "172.115.290.287:27018" },
{ _id: 1, host: "172.115.290.287:27019"},
{ _id: 2, host: "172.115.290.287:27020" }
]
})
4.3 修改副本配置(如果配置错误,可以写js脚本修改)
var cfg = rs.conf();
for (var i = 0; i < cfg.members.length; i++) {
if (cfg.members[i].host === "mongo-primary:27017") {
cfg.members[i].host = "172.16.200.280:27018";
} else if (cfg.members[i].host === "mongo-secondary1:27017") {
cfg.members[i].host = "172.16.200.280:27019";
}else if (cfg.members[i].host === "mongo-secondary2:27017") {
cfg.members[i].host = "172.16.200.280:27020";
}
}
4.4 python验证是否支持事务
from pymongo import MongoClient
from pymongo.errors import PyMongoError
# 连接到 MongoDB 副本集
client = MongoClient(
'mongodb://root:123456@172.16.210.206:27018,
replicaSet='rs0'
)
# 选择数据库和集合
db = client.test_database
collection1 = db.test_collection1
collection2 = db.test_collection2
# 创建一个会话来支持事务, 可以插入同一个_id,验证事务是否回滚
with client.start_session() as session:
try:
# 开始事务
with session.start_transaction():
# 插入文档
collection1.insert_one({"_id": 1, "name": "Alice2", "age": 320}, session=session)
collection2.insert_one({"_id": 2, "name": "Bob", "age": 25}, session=session)
# 提交事务
session.commit_transaction()
print("事务成功提交")
except PyMongoError as e:
# 如果发生错误,中止事务
print("事务中发生错误:", e)
# session.abort_transaction()
finally:
# 确保会话结束
session.end_session()
# 关闭客户端连接
client.close()