Skip to content

Commit

Permalink
feat: sqlite实现读写分离,原理上阻止死锁
Browse files Browse the repository at this point in the history
  • Loading branch information
PaienNate committed Jan 16, 2025
1 parent d1d5f3e commit be9975b
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 106 deletions.
5 changes: 1 addition & 4 deletions api/api_bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,7 @@ func forceStop(c echo.Context) error {

for _, i := range diceManager.Dice {
d := i
err := d.DBOperator.Close()
if err != nil {
return
}
d.DBOperator.Close()
}

// 清理gocqhttp
Expand Down
1 change: 1 addition & 0 deletions dice/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2565,4 +2565,5 @@ func (d *Dice) Save(isAuto bool) {
ep.StatsDump(d)
}
}
log.Info("自动保存完毕")
}
99 changes: 78 additions & 21 deletions dice/dice_attrs_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,55 +168,99 @@ func (am *AttrsManager) Init(d *Dice) {
// 正常工作
am.CheckForSave()

Check failure on line 169 in dice/dice_attrs_manager.go

View workflow job for this annotation

GitHub Actions / Test & Lint

Error return value of `am.CheckForSave` is not checked (errcheck)
am.CheckAndFreeUnused()

Check failure on line 170 in dice/dice_attrs_manager.go

View workflow job for this annotation

GitHub Actions / Test & Lint

Error return value of `am.CheckAndFreeUnused` is not checked (errcheck)
time.Sleep(15 * time.Second)
time.Sleep(60 * time.Second)
}
}
}()
am.cancel = cancel
}

func (am *AttrsManager) CheckForSave() (int, int) {
times := 0
saved := 0

db := am.db
if db == nil {
func (am *AttrsManager) CheckForSave() error {
if am.db == nil {
// 尚未初始化
return 0, 0
return errors.New("数据库尚未初始化")
}

var resultList []*model.AttributesBatchUpsertModel
prepareToSave := map[string]int{}
am.m.Range(func(key string, value *AttributesItem) bool {
if !value.IsSaved {
saved += 1
value.SaveToDB(db)
saveModel, err := value.GetBatchSaveModel()
if err != nil {
// 打印日志
log.Errorf("定期写入用户数据出错(获取批量保存模型): %v", err)
return true
}
prepareToSave[key] = 1
resultList = append(resultList, saveModel)
}
times += 1
return true
})
return times, saved
// 整体落盘
if len(resultList) == 0 {
log.Infof("[松子调试用]定期写入用户数据(批量保存) %v 条", len(resultList))
return nil
}

if err := model.AttrsPutsByIDBatch(am.db, resultList); err != nil {
log.Errorf("定期写入用户数据出错(批量保存): %v", err)
return err
}
for key := range prepareToSave {
// 理应不存在这个数据没有的情况
v, _ := am.m.Load(key)
v.IsSaved = true
}
// 输出日志本次落盘了几个数据
log.Infof("[松子调试用]定期写入用户数据(批量保存) %v 条", len(resultList))

return nil
}

// CheckAndFreeUnused 此函数会被定期调用,释放最近不用的对象
func (am *AttrsManager) CheckAndFreeUnused() {
db := am.db
func (am *AttrsManager) CheckAndFreeUnused() error {
db := am.db.GetDataDB(model.WRITE)
if db == nil {
// 尚未初始化
return
return errors.New("数据库尚未初始化")
}

prepareToFree := map[string]int{}
currentTime := time.Now().Unix()
currentTime := time.Now()
var resultList []*model.AttributesBatchUpsertModel
am.m.Range(func(key string, value *AttributesItem) bool {
if value.LastUsedTime-currentTime > 60*10 {
lastUsedTime := time.Unix(value.LastUsedTime, 0)
if lastUsedTime.Sub(currentTime) > 10*time.Minute {
saveModel, err := value.GetBatchSaveModel()
if err != nil {
// 打印日志
log.Errorf("定期清理用户数据出错(获取批量保存模型): %v", err)
return true
}
prepareToFree[key] = 1
// 直接保存
value.SaveToDB(db)
resultList = append(resultList, saveModel)
}
return true
})

// 整体落盘
if len(resultList) == 0 {
log.Infof("[松子调试用]定期清理用户数据(批量保存) %v 条", len(resultList))
return nil
}

if err := model.AttrsPutsByIDBatch(am.db, resultList); err != nil {
log.Errorf("定期清理写入用户数据出错(批量保存): %v", err)
return err
}

for key := range prepareToFree {
am.m.Delete(key)
// 理应不存在这个数据没有的情况
v, _ := am.m.LoadAndDelete(key)
v.IsSaved = true
}
// 输出日志本次落盘了几个数据
log.Infof("[松子调试用]定期清理用户数据(批量保存) %v 条", len(resultList))
return nil
}

func (am *AttrsManager) CharBind(charId string, groupId string, userId string) error {
Expand Down Expand Up @@ -292,6 +336,19 @@ func (i *AttributesItem) SaveToDB(db model.DatabaseOperator) {
i.IsSaved = true
}

func (i *AttributesItem) GetBatchSaveModel() (*model.AttributesBatchUpsertModel, error) {
rawData, err := ds.NewDictVal(i.valueMap).V().ToJSON()
if err != nil {
return nil, err
}
return &model.AttributesBatchUpsertModel{
Id: i.ID,
Data: rawData,
Name: i.Name,
SheetType: i.SheetType,
}, nil
}

func (i *AttributesItem) Load(name string) *ds.VMValue {
v, _ := i.valueMap.Load(name)
i.LastUsedTime = time.Now().Unix()
Expand Down
70 changes: 68 additions & 2 deletions dice/model/attrs_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"fmt"
"time"

"sealdice-core/utils"

ds "github.com/sealdice/dicescript"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"sealdice-core/utils"
)

const (
Expand Down Expand Up @@ -141,6 +143,70 @@ func AttrsPutById(operator DatabaseOperator, id string, data []byte, name, sheet
return nil // 操作成功,返回 nil
}

type AttributesBatchUpsertModel struct {
Id string `json:"id"`
Data []byte `json:"data"`
Name string `json:"name"`
SheetType string `json:"sheetType"`
}

// AttrsPutsByIDBatch 特殊入库函数 因为它
func AttrsPutsByIDBatch(operator DatabaseOperator, saveList []*AttributesBatchUpsertModel) error {
db := operator.GetDataDB(WRITE)
now := time.Now().Unix() // 获取当前时间
trulySaveList := make([]map[string]any, 0)
for _, singleSave := range saveList {
trulySaveList = append(trulySaveList, map[string]any{
// 第一次全量建表
"id": singleSave.Id,
// 使用BYTE规避无法插入的问题
"data": BYTE(singleSave.Data),
"is_hidden": true,
"binding_sheet_id": "",
"name": singleSave.Name,
"sheet_type": singleSave.SheetType,
"created_at": now,
"updated_at": now,
})
}
// 保守的调整一次插入1K条,这应该足够应对大部分场景,这种情况下,相当于有1K个人在60s内绑定了角色卡?
batchSize := 1000
// TODO: 只能手动分批次插入,原因看下面
// 由于传入的就是tx,所以这里如果插入失败,会自动回滚
err := db.Transaction(func(tx *gorm.DB) error {
for i := 0; i < len(trulySaveList); i += batchSize {
end := i + batchSize
if end > len(trulySaveList) {
end = len(trulySaveList)
}
batch := trulySaveList[i:end]
res := tx.Debug().Clauses(clause.OnConflict{
// 冲突列判断
Columns: []clause.Column{
{Name: "id"},
},
DoUpdates: clause.Assignments(map[string]interface{}{
"data": clause.Column{Name: "data"}, // 更新 data 字段
"updated_at": now, // 更新时设置 updated_at
"name": clause.Column{Name: "name"}, // 更新 name 字段
"sheet_type": clause.Column{Name: "sheet_type"}, // 更新 sheet_type 字段
}),
}).
Model(&AttributesItemModel{}).
// 注意! 这里有坑,不能使用CreateInBatches + map[string]interface{}。
// CreateInBatches会设置结果接收位置为:subtx.Statement.Dest = reflectValue.Slice(i, ends).Interface()
// 指向map[string]interface{},导致数据没办法正确放入。
// 只能用Create,同时千万别设置Create的BatchSize,否则会导致它使用上面那个函数,还是会报错。
Create(&batch)
if res.Error != nil {
return res.Error
}
}
return nil
})
return err
}

func AttrsDeleteById(operator DatabaseOperator, id string) error {
db := operator.GetDataDB(WRITE)
// 使用 GORM 的 Delete 方法删除指定 id 的记录
Expand Down
1 change: 1 addition & 0 deletions dice/model/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func Vacuum(db *gorm.DB, path string) error {
}

// FlushWAL 执行 WAL 日志的检查点和内存收缩
// TODO: 在确认备份逻辑后删除该函数并收归到engine内,由engine统一做备份
func FlushWAL(db *gorm.DB) error {
// 检查数据库驱动是否为 SQLite
if !strings.Contains(db.Dialector.Name(), "sqlite") {
Expand Down
25 changes: 18 additions & 7 deletions dice/model/database/sqlite_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

func SQLiteDBInit(path string, useWAL bool) (*gorm.DB, error) {
// 使用即时事务
path = fmt.Sprintf("%v?_txlock=immediate&_busy_timeout=99999", path)
path = fmt.Sprintf("%v?_txlock=immediate&_busy_timeout=15000", path)
open, err := gorm.Open(sqlite.Open(path), &gorm.Config{
// 注意,这里虽然是Info,但实际上打印就变成了Debug.
Logger: logger.Default.LogMode(logger.Info),
Expand All @@ -29,7 +29,6 @@ func SQLiteDBInit(path string, useWAL bool) (*gorm.DB, error) {
}
// Enable Cache Mode
open, err = cache.GetOtterCacheDB(open)
// 所有优化增加
if err != nil {
return nil, err
}
Expand All @@ -43,6 +42,8 @@ func SQLiteDBInit(path string, useWAL bool) (*gorm.DB, error) {
}

func createReadDB(path string, gormConf *gorm.Config) (*gorm.DB, error) {
// _txlock=immediate 解决BEGIN IMMEDIATELY
path = fmt.Sprintf("%v?_txlock=immediate", path)
// ---- 创建读连接 -----
readDB, err := gorm.Open(sqlite.Open(path), gormConf)
if err != nil {
Expand Down Expand Up @@ -112,18 +113,22 @@ func SQLiteDBRWInit(path string) (*gorm.DB, *gorm.DB, error) {
// https://highperformancesqlite.com/articles/sqlite-recommended-pragmas
// https://litestream.io/tips/
// copied from https://github.com/bihe/monorepo
// add PRAGMA optimize=0x10002; from https://github.com/Palats/mastopoof
func SetDefaultPragmas(db *sql.DB) error {
var (
stmt string
val string
)
// 外键的暂时弃用,反正咱也不用外键536870912
// "foreign_keys": "1", // 1(bool) --> https://www.sqlite.org/pragma.html#pragma_foreign_keys
defaultPragmas := map[string]string{
"journal_mode": "wal", // https://www.sqlite.org/pragma.html#pragma_journal_mode
"busy_timeout": "5000", // https://www.sqlite.org/pragma.html#pragma_busy_timeout
"synchronous": "1", // NORMAL --> https://www.sqlite.org/pragma.html#pragma_synchronous
"cache_size": "10000", // 10000 pages = 40MB --> https://www.sqlite.org/pragma.html#pragma_cache_size
// 外键的暂时弃用,反正咱也不用外键(乐)
// "foreign_keys": "1", // 1(bool) --> https://www.sqlite.org/pragma.html#pragma_foreign_keys
"busy_timeout": "15000", // https://www.sqlite.org/pragma.html#pragma_busy_timeout
// 在 WAL 模式下使用 synchronous=NORMAL 提交的事务可能会在断电或系统崩溃后回滚。
// 无论同步设置或日志模式如何,事务在应用程序崩溃时都是持久的。
// 对于在 WAL 模式下运行的大多数应用程序来说,synchronous=NORMAL 设置是一个不错的选择。
"synchronous": "1", // NORMAL --> https://www.sqlite.org/pragma.html#pragma_synchronous
"cache_size": "536870912", // 536870912 = 512MB --> https://www.sqlite.org/pragma.html#pragma_cache_size
}

// set the pragmas
Expand All @@ -145,6 +150,12 @@ func SetDefaultPragmas(db *sql.DB) error {
return fmt.Errorf("could not set pragma %s to %s", k, defaultPragmas[k])
}
}
// 这个不能在上面,因为他没有任何返回值
// Setup some regular optimization according to sqlite doc:
// https://www.sqlite.org/lang_analyze.html
if _, err := db.Exec("PRAGMA optimize=0x10002;"); err != nil {
return fmt.Errorf("unable set optimize pragma: %w", err)
}

return nil
}
2 changes: 1 addition & 1 deletion dice/model/engine_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type DatabaseOperator interface {
GetDataDB(mode DBMode) *gorm.DB
GetLogDB(mode DBMode) *gorm.DB
GetCensorDB(mode DBMode) *gorm.DB
Close() error
Close()
}

// 实现检查 copied from platform
Expand Down
Loading

0 comments on commit be9975b

Please sign in to comment.