背景

在 office-gateway 三期优化过程中,我们针对任务数据实现了缓存机制,旨在提高查询性能。考虑到数据写操作频繁,且当前仅有一个接口负责读取,属于典型的写多读少场景。于是决定选用 GORM 的 Hook 机制来更新缓存。

实现方案

在 Model 结构体上实现 GORM 的 AfterUpdate 和 AfterCreate 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (c *GatewayTasks) AfterUpdate(tx *gorm.DB) error {
    err := invokerbase.TaskCache.SetMainTaskCache(context.Background(), c.TaskID, c.TaskType, c.Bytes())
    if err != nil {
       elog.Error("hook.GatewayTasks.SetCache.failed", l.E(err))
    }

    return nil
}

func (c *GatewayTasks) AfterCreate(tx *gorm.DB) error {
    err := invokerbase.TaskCache.SetMainTaskCache(context.Background(), c.TaskID, c.TaskType, c.Bytes())
    if err != nil {
       elog.Error("hook.GatewayTasks.SetCache.failed", l.E(err))
    }

    return nil
}

问题诊断

代码发布后,我们发现了以下异常:

  1. 缓存更新仅在创建数据时生效
  2. 更新数据表时虽然触发了 Hook 函数,但写入的数据不正确
  3. 测试反馈获取进度非常缓慢,不论文件大小,耗时均约 5s

问题分析

通过观察发现:

  • 新部署机器规格较高,转码时间很快
  • 缓存过期时间为 5s
  • 文件大小不同,转码耗时却保持一致

这些现象强烈暗示可能存在缓存脏数据问题,导致流程无法正常结束。仅在缓存过期后,系统才能穿透到数据表获取最终结果。

源码分析

GORM 回调机制简述

GORM 的 Update 方法实质上是触发一系列回调函数,而非直接执行 SQL:

1
2
3
4
5
6

// Update updates column with value using callbacks. Reference: <https://gorm.io/docs/update.html#Update-Changed-Fieldsfunc> (db *DB) Update(column string, value interface{}) (tx *DB) {
tx = db.getInstance()
tx.Statement.Dest = map[string]interface{}{column: value}
return tx.callbacks.Update().Execute(tx)
}

回调注册与执行

  • gorm.Open 时会初始化 Dialecter
  • Dialecter 初始化时通过 RegisterDefaultCallbacks 注册默认回调函数
    • after_update 就是在这里初始化的
  • 执行顺序由 sortCallbacks 决定回调执行顺序
  • 不同类型的回调(如 before_updateafter_update)在数据库操作的不同阶段被触发

AfterUpdate 定义

通过 AfterUpdate 来看,传给自定义回调函数中的 Model 本质上是参数 i,而 i 的值是 callmethod 通过 value 给的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// AfterUpdate after update hooksfunc AfterUpdate(db *gorm.DB) {
    if db.Error == nil && db.Statement.Schema != nil && !db.Statement.SkipHooks && (db.Statement.Schema.AfterSave || db.Statement.Schema.AfterUpdate) {
       callMethod(db, func(value interface{}, tx *gorm.DB) (called bool) {
          if db.Statement.Schema.AfterUpdate {
             if i, ok := value.(AfterUpdateInterface); ok {
                called = true
                db.AddError(i.AfterUpdate(tx))
             }
          }

          if db.Statement.Schema.AfterSave {
             if i, ok := value.(AfterSaveInterface); ok {
                called = true
                db.AddError(i.AfterSave(tx))
             }
          }

          return called
       })
    }
}

再看 callmethod 方法的定义,不难发现,db.Statement.ReflectValue 最终会影响 value 的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func callMethod(db *gorm.DB, fc func(value interface{}, tx *gorm.DB) bool) {
    tx := db.Session(&gorm.Session{NewDB: true})
    if called := fc(db.Statement.ReflectValue.Interface(), tx); !called {
       switch db.Statement.ReflectValue.Kind() {
       case reflect.Slice, reflect.Array:
          db.Statement.CurDestIndex = 0
          for i := 0; i < db.Statement.ReflectValue.Len(); i++ {
             if value := reflect.Indirect(db.Statement.ReflectValue.Index(i)); value.CanAddr() {
                fc(value.Addr().Interface(), tx)
             } else {
                db.AddError(gorm.ErrInvalidValue)
                return
             }
             db.Statement.CurDestIndex++
          }
       case reflect.Struct:
          if db.Statement.ReflectValue.CanAddr() {
             fc(db.Statement.ReflectValue.Addr().Interface(), tx)
          } else {
             db.AddError(gorm.ErrInvalidValue)
          }
       }
    }
}

继续追踪源码发现,Update 的 ReflectValue 是通过 SetupUpdateReflectValue 这个 Hook 函数实现的,最终是来自 db.Statement.Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func SetupUpdateReflectValue(db *gorm.DB) {
    if db.Error == nil && db.Statement.Schema != nil {
       if !db.Statement.ReflectValue.CanAddr() || db.Statement.Model != db.Statement.Dest {
          db.Statement.ReflectValue = reflect.ValueOf(db.Statement.Model)
          for db.Statement.ReflectValue.Kind() == reflect.Ptr {
             db.Statement.ReflectValue = db.Statement.ReflectValue.Elem()
          }

          if dest, ok := db.Statement.Dest.(map[string]interface{}); ok {
             for _, rel := range db.Statement.Schema.Relationships.BelongsTo {
                if _, ok := dest[rel.Name]; ok {
                   db.AddError(rel.Field.Set(db.Statement.Context, db.Statement.ReflectValue, dest[rel.Name]))
                }
             }
          }
       }
    }
}

关键点在于 db.Statement.Model 的赋值机制:

  1. Model 方法通常传入一个空结构体:db.Model(&GatewayTasks{})
  2. Update 回调在与数据库交互后会更新 db.Statement.Model
  3. AfterUpdate Hook 中的 tx 参数即是这个更新后的 Statement

根本原因

问题的根源在于 Update 调用时仅更新部分字段(如 taskStatus),而未包含构建缓存所需的 taskIDtaskType。这导致 Hook 函数虽然执行,但无法正确设置缓存。 而 Create 时,包含了这些字段,导致在 Create 阶段产生了缓存,后续的更新操作未更新缓存,导致了脏数据。

解决方法

通过分析业务代码发现,更新任务之前,都会先拿到任务信息,都具备获取到 Model 结构体。 调整更新逻辑:

  1. 更新前获取完整任务信息
  2. 直接修改 Model 中需要变更的值
  3. 使用完整 Model 执行 Update 操作

总结

  • GORM 不会凭空产生数据,并不具备类似 binlog 的能力
  • GORM Hook 中的 Model 的数据,是从 Update 方法中解析出来的
  • 在使用 Hook 时,一定要清楚 Model 中哪些字段能够拿到,哪些拿不到