Просмотр исходного кода

feat: add panic recovery and retry mechanism for InitChannelCache; improve batch deletion of abilities in FixAbility

CaIon 9 месяцев назад
Родитель
Сommit
c53a48cde5
4 измененных файлов с 68 добавлено и 16 удалено
  1. 16 3
      main.go
  2. 36 11
      model/ability.go
  3. 5 2
      model/cache.go
  4. 11 0
      model/channel.go

+ 16 - 3
main.go

@@ -89,9 +89,22 @@ func main() {
 	if common.MemoryCacheEnabled {
 		common.SysLog("memory cache enabled")
 		common.SysError(fmt.Sprintf("sync frequency: %d seconds", common.SyncFrequency))
-		model.InitChannelCache()
-	}
-	if common.MemoryCacheEnabled {
+
+		// Add panic recovery and retry for InitChannelCache
+		func() {
+			defer func() {
+				if r := recover(); r != nil {
+					common.SysError(fmt.Sprintf("InitChannelCache panic: %v, retrying once", r))
+					// Retry once
+					_, fixErr := model.FixAbility()
+					if fixErr != nil {
+						common.SysError(fmt.Sprintf("InitChannelCache failed: %s", fixErr.Error()))
+					}
+				}
+			}()
+			model.InitChannelCache()
+		}()
+
 		go model.SyncOptions(common.SyncFrequency)
 		go model.SyncChannelCache(common.SyncFrequency)
 	}

+ 36 - 11
model/ability.go

@@ -50,7 +50,7 @@ func getPriority(group string, model string, retry int) (int, error) {
 	err := DB.Model(&Ability{}).
 		Select("DISTINCT(priority)").
 		Where(groupCol+" = ? and model = ? and enabled = "+trueVal, group, model).
-		Order("priority DESC").              // 按优先级降序排序
+		Order("priority DESC"). // 按优先级降序排序
 		Pluck("priority", &priorities).Error // Pluck用于将查询的结果直接扫描到一个切片中
 
 	if err != nil {
@@ -261,12 +261,28 @@ func FixAbility() (int, error) {
 		common.SysError(fmt.Sprintf("Get channel ids from channel table failed: %s", err.Error()))
 		return 0, err
 	}
-	// Delete abilities of channels that are not in channel table
-	err = DB.Where("channel_id NOT IN (?)", channelIds).Delete(&Ability{}).Error
-	if err != nil {
-		common.SysError(fmt.Sprintf("Delete abilities of channels that are not in channel table failed: %s", err.Error()))
-		return 0, err
+
+	// Delete abilities of channels that are not in channel table - in batches to avoid too many placeholders
+	if len(channelIds) > 0 {
+		// Process deletion in chunks to avoid "too many placeholders" error
+		for _, chunk := range lo.Chunk(channelIds, 100) {
+			err = DB.Where("channel_id NOT IN (?)", chunk).Delete(&Ability{}).Error
+			if err != nil {
+				common.SysError(fmt.Sprintf("Delete abilities of channels (batch) that are not in channel table failed: %s", err.Error()))
+				return 0, err
+			}
+		}
+	} else {
+		// If no channels exist, delete all abilities
+		err = DB.Delete(&Ability{}).Error
+		if err != nil {
+			common.SysError(fmt.Sprintf("Delete all abilities failed: %s", err.Error()))
+			return 0, err
+		}
+		common.SysLog("Delete all abilities successfully")
+		return 0, nil
 	}
+
 	common.SysLog(fmt.Sprintf("Delete abilities of channels that are not in channel table successfully, ids: %v", channelIds))
 	count += len(channelIds)
 
@@ -275,17 +291,26 @@ func FixAbility() (int, error) {
 	err = DB.Table("abilities").Distinct("channel_id").Pluck("channel_id", &abilityChannelIds).Error
 	if err != nil {
 		common.SysError(fmt.Sprintf("Get channel ids from abilities table failed: %s", err.Error()))
-		return 0, err
+		return count, err
 	}
+
 	var channels []Channel
 	if len(abilityChannelIds) == 0 {
 		err = DB.Find(&channels).Error
 	} else {
-		err = DB.Where("id NOT IN (?)", abilityChannelIds).Find(&channels).Error
-	}
-	if err != nil {
-		return 0, err
+		// Process query in chunks to avoid "too many placeholders" error
+		err = nil
+		for _, chunk := range lo.Chunk(abilityChannelIds, 100) {
+			var channelsChunk []Channel
+			err = DB.Where("id NOT IN (?)", chunk).Find(&channelsChunk).Error
+			if err != nil {
+				common.SysError(fmt.Sprintf("Find channels not in abilities table failed: %s", err.Error()))
+				return count, err
+			}
+			channels = append(channels, channelsChunk...)
+		}
 	}
+
 	for _, channel := range channels {
 		err := channel.UpdateAbilities(nil)
 		if err != nil {

+ 5 - 2
model/cache.go

@@ -16,6 +16,9 @@ var channelsIDM map[int]*Channel
 var channelSyncLock sync.RWMutex
 
 func InitChannelCache() {
+	if !common.MemoryCacheEnabled {
+		return
+	}
 	newChannelId2channel := make(map[int]*Channel)
 	var channels []*Channel
 	DB.Where("status = ?", common.ChannelStatusEnabled).Find(&channels)
@@ -84,11 +87,11 @@ func CacheGetRandomSatisfiedChannel(group string, model string, retry int) (*Cha
 	if !common.MemoryCacheEnabled {
 		return GetRandomSatisfiedChannel(group, model, retry)
 	}
-	
+
 	channelSyncLock.RLock()
 	channels := group2model2channels[group][model]
 	channelSyncLock.RUnlock()
-	
+
 	if len(channels) == 0 {
 		return nil, errors.New("channel not found")
 	}

+ 11 - 0
model/channel.go

@@ -46,6 +46,17 @@ func (channel *Channel) GetModels() []string {
 	return strings.Split(strings.Trim(channel.Models, ","), ",")
 }
 
+func (channel *Channel) GetGroups() []string {
+	if channel.Group == "" {
+		return []string{}
+	}
+	groups := strings.Split(strings.Trim(channel.Group, ","), ",")
+	for i, group := range groups {
+		groups[i] = strings.TrimSpace(group)
+	}
+	return groups
+}
+
 func (channel *Channel) GetOtherInfo() map[string]interface{} {
 	otherInfo := make(map[string]interface{})
 	if channel.OtherInfo != "" {