壽陽網(wǎng)站建設(shè)哈爾濱網(wǎng)絡(luò)推廣優(yōu)化
背景
?分表組件改造的背景,我在這篇文章《gorm.io/sharding改造:賦能單表,靈活支持多分表策略(上)》中已經(jīng)做了詳細(xì)的介紹——這個組件不支持單表多個分表策略,為了突破這個限制做的改造。
在上一篇文章中,我們討論了注冊的改造,注冊的改造修改邏輯比較簡單,但是,上一篇文章中遺留了一個很重要的議題——在增刪改查的實際業(yè)務(wù)操作中,分表組件究竟如何精準(zhǔn)地定位到對應(yīng)的分表策略,以確保業(yè)務(wù)邏輯的順利執(zhí)行?這篇文章,我們重點討論這個邏輯。
源碼解讀
首先,我們需要看一下當(dāng)我們執(zhí)行查詢,新增,更新或是刪除邏輯,其執(zhí)行流程是什么。比如,這么一個查詢。
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
我們大概梳理一下其執(zhí)行流程。
- 初始化查詢:
- 當(dāng)我們執(zhí)行查詢?
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
,首先會通過?db.Model(&Order{})
?初始化一個查詢實例,設(shè)置相關(guān)的模型信息。
- 當(dāng)我們執(zhí)行查詢?
- 構(gòu)建查詢條件:
- 接著,通過?
.Where("user_id = ?", userID)
?方法,將查詢條件?user_id = ?
?以及對應(yīng)的參數(shù)?userID
?添加到查詢實例中。
- 接著,通過?
- 執(zhí)行查詢:
- 調(diào)用?
.Find(&orders)
?方法時,開始執(zhí)行查詢流程。 - 在?
Find
?方法中,首先通過?db.getInstance()
?獲取數(shù)據(jù)庫實例。 - 然后,檢查是否存在查詢條件,如果有,則構(gòu)建 SQL 條件表達(dá)式,并將其添加到查詢語句中。
- 設(shè)置查詢結(jié)果的目標(biāo)對象?
dest
,即?&orders
。
func (db *DB) Find(dest interface{}, conds ...interface{}) (tx *DB) {tx = db.getInstance()if len(conds) > 0 {if exprs := tx.Statement.BuildCondition(conds[0], conds[1:]...); len(exprs) > 0 {tx.Statement.AddClause(clause.Where{Exprs: exprs})}}tx.Statement.Dest = destreturn tx.callbacks.Query().Execute(tx) }
- 調(diào)用?
- 執(zhí)行回調(diào)和處理:
- 調(diào)用?
tx.callbacks.Query().Execute(tx)
?執(zhí)行查詢回調(diào)鏈。 - 在?
Execute
?方法中,會遍歷并執(zhí)行所有注冊的查詢前和查詢后的回調(diào)函數(shù)。
func (p *processor) Execute(db *DB) *DB {//省略其他代碼邏輯 ...... for _, f := range p.fns {f(db)}//省略其他代碼邏輯 ...... return db }
- 調(diào)用?
- 分片和查詢執(zhí)行:
- 最終,調(diào)用?
pool.QueryContext
?方法,根據(jù)上下文、SQL 查詢語句和參數(shù)執(zhí)行實際的數(shù)據(jù)庫查詢。 - 在?
QueryContext
?方法中,會調(diào)用?pool.sharding.resolve
?方法解析并修改查詢語句,以處理數(shù)據(jù)庫分片邏輯。 resolve
?方法解析 SQL 查詢語句,提取表名,并根據(jù)表名獲取相應(yīng)的分片配置。- 根據(jù)分片配置,可能會修改原始查詢語句,以適應(yīng)分片策略。
func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {var (curTime = time.Now())//該方法根據(jù)傳入的SQL查詢(及其參數(shù))和上下文信息,動態(tài)地解析、修改并返回最終的分片 //查詢、原始查詢、目標(biāo)表名以及可能出現(xiàn)的錯誤。_, stQuery, _, err := pool.sharding.resolve(query, args...)if err != nil {return nil, err}// 省略......return rows, err }
func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) {ftQuery = querystQuery = queryif len(s.configs) == 0 {return}expr, err := sqlparser.NewParser(strings.NewReader(query)).ParseStatement()if err != nil {return ftQuery, stQuery, tableName, nil}// 省略......tableName = table.Name.Namer, ok := s.configs[tableName]if !ok {return} // 省略......return }
- 最終,調(diào)用?
- 返回結(jié)果:
- 執(zhí)行查詢后,將結(jié)果填充到目標(biāo)對象?
&orders
?中,并返回查詢結(jié)果或錯誤。
- 執(zhí)行查詢后,將結(jié)果填充到目標(biāo)對象?
我們重點關(guān)注resolve方法,這個方法包含了分表邏輯的處理邏輯:r, ok := s.configs[tableName]獲取對應(yīng)表的分表策略。
通過上述代碼的解析,我們現(xiàn)在應(yīng)該有了解決方案。原來的邏輯獲取分表策略是根據(jù)表明獲取的。那我們只要修改這個邏輯,根據(jù)表名+分表鍵名作為唯一鍵獲取對應(yīng)的分表策略就能實現(xiàn)我們的目標(biāo)。
方案
接下來,我們需要思考的是,如何把分表鍵傳進(jìn)來呢?
我一開始想的是通過解析query獲取查詢條件中的分表鍵。但是,當(dāng)我深入的看了這個邏輯之后,發(fā)現(xiàn)這個設(shè)想不能實現(xiàn),因為value, id, keyFind, err = s.nonInsertValue(r.ShardingKey, condition, args...)這個方法中獲取查詢條件的字段是在這個函數(shù)內(nèi)部實現(xiàn)的,不能保持一個統(tǒng)一的結(jié)構(gòu),而且改造復(fù)雜度比較高。
context在go語言有著廣泛的使用場景,所以,我想著通過context的方式把分表鍵傳遞進(jìn)來。有了這個想法,改造起來就很簡單了。我們只需要resolve方法增加一個context的傳參,并且r, ok := s.configs[tableName]這個獲取分表策略,改成用表名+從context中獲取的分表鍵作為鍵來獲取分表策略即可。
如此,我們就實現(xiàn)了根據(jù)表名+分表鍵獲取對應(yīng)分表策略的邏輯,至此,我們的改造任務(wù)完成。
案例
我目前也只是簡單的測試了兩種分表策略的場景,僅僅只覆蓋了查詢和插入的場景。更復(fù)雜的場景還沒有測試。諸如并發(fā)情況下的場景。
package testimport ("context""fmt""testing""time""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/sharding"
)
var globalDB *gorm.DBtype Order struct {ID int64 `gorm:"primaryKey"`OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片鍵UserID int64 `gorm:"sharding:user_id"`ProductID int64OrderDate time.TimeOrderYear int
}
// 自定義 ShardingAlgorithm
func customShardingAlgorithm4(value any) (suffix string, err error) {if year, ok := value.(int); ok {return fmt.Sprintf("_%d", year), nil}return "", fmt.Errorf("invalid order_date")
}func customShardingAlgorithmUserId(value any) (suffix string, err error) {if userId, ok := value.(int64); ok {return fmt.Sprintf("_%d", userId%4), nil}return "", fmt.Errorf("invalid user_id")
}// customePrimaryKeyGeneratorFn 自定義主鍵生成函數(shù)
func customePrimaryKeyGeneratorFn(tableIdx int64) int64 {var id int64seqTableName := "gorm_sharding_orders_id_seq" // 序列表名db := globalDB// 使用事務(wù)來確保主鍵生成的原子性tx := db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 鎖定序列表以確保并發(fā)安全(可選,取決于你的 MySQL 配置和并發(fā)級別)// 注意:在某些 MySQL 版本和配置中,使用 LOCK TABLES 可能不是最佳選擇// 這里僅作為示例,實際應(yīng)用中可能需要更精細(xì)的并發(fā)控制策略tx.Exec("LOCK TABLES " + seqTableName + " WRITE")// 查詢當(dāng)前的最大 IDtx.Raw("SELECT id FROM " + seqTableName + " ORDER BY id DESC LIMIT 1").Scan(&id)// 更新序列表(這里直接遞增 1,實際應(yīng)用中可能需要更復(fù)雜的邏輯)newID := id + 1tx.Exec("INSERT INTO "+seqTableName+" (id) VALUES (?)", newID) // 這里假設(shè)序列表允許插入任意 ID,實際應(yīng)用中可能需要其他機(jī)制來確保 ID 的唯一性和連續(xù)性// 釋放鎖定tx.Exec("UNLOCK TABLES")// 提交事務(wù)if err := tx.Commit().Error; err != nil {panic(err) // 實際應(yīng)用中應(yīng)該使用更優(yōu)雅的錯誤處理機(jī)制}return newID
}// Test_Gorm_Sharding 用于測試 Gorm Sharding 插件
func Test_Gorm_Sharding6(t *testing.T) {// 連接到 MySQL 數(shù)據(jù)庫dsn := "dev:xxxx@tcp(ip:port)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"db, err := gorm.Open(mysql.New(mysql.Config{DSN: dsn,}), &gorm.Config{})if err != nil {panic("failed to connect database")}globalDB = dbconfig1 := sharding.Config{ShardingKey: "order_year",ShardingAlgorithm: customShardingAlgorithm4, // 使用自定義的分片算法//PrimaryKeyGenerator: sharding.PKMySQLSequence,PrimaryKeyGenerator: sharding.PKCustom,PrimaryKeyGeneratorFn: customePrimaryKeyGeneratorFn,}config2 := sharding.Config{ShardingKey: "user_id",NumberOfShards: 4,ShardingAlgorithm: customShardingAlgorithmUserId, // 使用自定義的分片算法PrimaryKeyGenerator: sharding.PKSnowflake, // 使用 Snowflake 算法生成主鍵}mapConfig := make(map[string]sharding.Config)mapConfig["orders_order_year"] = config1mapConfig["orders_user_id"] = config2// 配置 Gorm Sharding 中間件,使用自定義的分片算法middleware := sharding.RegisterWithKeys(mapConfig) // 邏輯表名為 "orders"db.Use(middleware)// 查詢示例var orders []Orderctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "order_year")db = db.WithContext(ctx)err = db.Model(&Order{}).Where("order_year=? and product_id=?", 2025, 102).Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("sharding key order_year Selected orders: %#v\n", orders)// 查詢示例FindByUserID2(db, int64(1))// 示例:插入訂單數(shù)據(jù)InsertOrderByUserId(db)InsertOrderByOrderYear(db)
}func FindByUserID2(db *gorm.DB, userID int64) ([]Order, error) {var orders []Order// 查詢示例ctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "user_id")db = db.WithContext(ctx)err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("no sharding key user_id Selected orders: %#v\n", orders)return orders, err
}type OrderByUserId struct {ID int64 `gorm:"primaryKey"`OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片鍵UserID int64 `gorm:"sharding:user_id"`ProductID int64OrderDate time.Time
}func InsertOrderByUserId(db *gorm.DB) error {ctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "user_id")db = db.WithContext(ctx)// 示例:插入訂單數(shù)據(jù)order := OrderByUserId{OrderId: "20240101ORDER0001",UserID: 100,ProductID: 100,OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),}err := db.Table("orders").Create(&order).Errorif err != nil {fmt.Println("Error creating order:", err)}order2 := OrderByUserId{OrderId: "20250101ORDER0001",UserID: 105,ProductID: 100,OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),}err = db.Table("orders").Create(&order2).Errorif err != nil {fmt.Println("Error creating order:", err)}return err
}func InsertOrderByOrderYear(db *gorm.DB) error {ctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "order_year")db = db.WithContext(ctx)orderYear := 2024// 示例:插入訂單數(shù)據(jù)order := Order{OrderId: "20240101ORDER0002",UserID: 1,ProductID: 100,OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),OrderYear: orderYear,}err := db.Create(&order).Errorif err != nil {fmt.Println("Error creating order:", err)}orderYear = 2025order2 := Order{OrderId: "20250101ORDER0002",UserID: 1,ProductID: 100,OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),OrderYear: orderYear,}err = db.Create(&order2).Errorif err != nil {fmt.Println("Error creating order:", err)}return err
}
總結(jié)
通過改造gorm.io/sharding
組件,我們實現(xiàn)了根據(jù)表名+分表鍵獲取對應(yīng)分表策略的邏輯。這一改造使得組件能夠支持單表多個分表策略,更加靈活和強(qiáng)大。目前,我們已經(jīng)簡單測試了查詢和插入場景,更復(fù)雜的場景和并發(fā)情況還需進(jìn)一步測試和優(yōu)化。通過這一改造,我們?yōu)闃I(yè)務(wù)邏輯的執(zhí)行提供了更加精準(zhǔn)和高效的分表策略定位。