package controller import ( "fmt" "net/http" "slices" "strings" "sync" "sync/atomic" "time" "github.com/QuantumNous/new-api/common" "github.com/QuantumNous/new-api/constant" "github.com/QuantumNous/new-api/dto" "github.com/QuantumNous/new-api/model" "github.com/QuantumNous/new-api/relay/channel/gemini" "github.com/QuantumNous/new-api/relay/channel/ollama" "github.com/QuantumNous/new-api/service" "github.com/gin-gonic/gin" "github.com/samber/lo" ) const ( channelUpstreamModelUpdateTaskDefaultIntervalMinutes = 30 channelUpstreamModelUpdateTaskBatchSize = 100 channelUpstreamModelUpdateMinCheckIntervalSeconds = 300 channelUpstreamModelUpdateNotifySuppressWindowSeconds = 86400 channelUpstreamModelUpdateNotifyMaxChannelDetails = 8 channelUpstreamModelUpdateNotifyMaxModelDetails = 12 channelUpstreamModelUpdateNotifyMaxFailedChannelIDs = 10 ) var ( channelUpstreamModelUpdateTaskOnce sync.Once channelUpstreamModelUpdateTaskRunning atomic.Bool channelUpstreamModelUpdateNotifyState = struct { sync.Mutex lastNotifiedAt int64 lastChangedChannels int lastFailedChannels int }{} ) type applyChannelUpstreamModelUpdatesRequest struct { ID int `json:"id"` AddModels []string `json:"add_models"` RemoveModels []string `json:"remove_models"` IgnoreModels []string `json:"ignore_models"` } type applyAllChannelUpstreamModelUpdatesResult struct { ChannelID int `json:"channel_id"` ChannelName string `json:"channel_name"` AddedModels []string `json:"added_models"` RemovedModels []string `json:"removed_models"` RemainingModels []string `json:"remaining_models"` RemainingRemoveModels []string `json:"remaining_remove_models"` } type detectChannelUpstreamModelUpdatesResult struct { ChannelID int `json:"channel_id"` ChannelName string `json:"channel_name"` AddModels []string `json:"add_models"` RemoveModels []string `json:"remove_models"` LastCheckTime int64 `json:"last_check_time"` AutoAddedModels int `json:"auto_added_models"` } type upstreamModelUpdateChannelSummary struct { ChannelName string AddCount int RemoveCount int } func normalizeModelNames(models []string) []string { return lo.Uniq(lo.FilterMap(models, func(model string, _ int) (string, bool) { trimmed := strings.TrimSpace(model) return trimmed, trimmed != "" })) } func mergeModelNames(base []string, appended []string) []string { merged := normalizeModelNames(base) seen := make(map[string]struct{}, len(merged)) for _, model := range merged { seen[model] = struct{}{} } for _, model := range normalizeModelNames(appended) { if _, ok := seen[model]; ok { continue } seen[model] = struct{}{} merged = append(merged, model) } return merged } func subtractModelNames(base []string, removed []string) []string { removeSet := make(map[string]struct{}, len(removed)) for _, model := range normalizeModelNames(removed) { removeSet[model] = struct{}{} } return lo.Filter(normalizeModelNames(base), func(model string, _ int) bool { _, ok := removeSet[model] return !ok }) } func intersectModelNames(base []string, allowed []string) []string { allowedSet := make(map[string]struct{}, len(allowed)) for _, model := range normalizeModelNames(allowed) { allowedSet[model] = struct{}{} } return lo.Filter(normalizeModelNames(base), func(model string, _ int) bool { _, ok := allowedSet[model] return ok }) } func applySelectedModelChanges(originModels []string, addModels []string, removeModels []string) []string { // Add wins when the same model appears in both selected lists. normalizedAdd := normalizeModelNames(addModels) normalizedRemove := subtractModelNames(normalizeModelNames(removeModels), normalizedAdd) return subtractModelNames(mergeModelNames(originModels, normalizedAdd), normalizedRemove) } func normalizeChannelModelMapping(channel *model.Channel) map[string]string { if channel == nil || channel.ModelMapping == nil { return nil } rawMapping := strings.TrimSpace(*channel.ModelMapping) if rawMapping == "" || rawMapping == "{}" { return nil } parsed := make(map[string]string) if err := common.UnmarshalJsonStr(rawMapping, &parsed); err != nil { return nil } normalized := make(map[string]string, len(parsed)) for source, target := range parsed { normalizedSource := strings.TrimSpace(source) normalizedTarget := strings.TrimSpace(target) if normalizedSource == "" || normalizedTarget == "" { continue } normalized[normalizedSource] = normalizedTarget } if len(normalized) == 0 { return nil } return normalized } func collectPendingUpstreamModelChangesFromModels( localModels []string, upstreamModels []string, ignoredModels []string, modelMapping map[string]string, ) (pendingAddModels []string, pendingRemoveModels []string) { localSet := make(map[string]struct{}) localModels = normalizeModelNames(localModels) upstreamModels = normalizeModelNames(upstreamModels) for _, modelName := range localModels { localSet[modelName] = struct{}{} } upstreamSet := make(map[string]struct{}, len(upstreamModels)) for _, modelName := range upstreamModels { upstreamSet[modelName] = struct{}{} } ignoredSet := make(map[string]struct{}) for _, modelName := range normalizeModelNames(ignoredModels) { ignoredSet[modelName] = struct{}{} } redirectSourceSet := make(map[string]struct{}, len(modelMapping)) redirectTargetSet := make(map[string]struct{}, len(modelMapping)) for source, target := range modelMapping { redirectSourceSet[source] = struct{}{} redirectTargetSet[target] = struct{}{} } coveredUpstreamSet := make(map[string]struct{}, len(localSet)+len(redirectTargetSet)) for modelName := range localSet { coveredUpstreamSet[modelName] = struct{}{} } for modelName := range redirectTargetSet { coveredUpstreamSet[modelName] = struct{}{} } pendingAdd := lo.Filter(upstreamModels, func(modelName string, _ int) bool { if _, ok := coveredUpstreamSet[modelName]; ok { return false } if _, ok := ignoredSet[modelName]; ok { return false } return true }) pendingRemove := lo.Filter(localModels, func(modelName string, _ int) bool { // Redirect source models are virtual aliases and should not be removed // only because they are absent from upstream model list. if _, ok := redirectSourceSet[modelName]; ok { return false } _, ok := upstreamSet[modelName] return !ok }) return normalizeModelNames(pendingAdd), normalizeModelNames(pendingRemove) } func collectPendingUpstreamModelChanges(channel *model.Channel, settings dto.ChannelOtherSettings) (pendingAddModels []string, pendingRemoveModels []string, err error) { upstreamModels, err := fetchChannelUpstreamModelIDs(channel) if err != nil { return nil, nil, err } pendingAddModels, pendingRemoveModels = collectPendingUpstreamModelChangesFromModels( channel.GetModels(), upstreamModels, settings.UpstreamModelUpdateIgnoredModels, normalizeChannelModelMapping(channel), ) return pendingAddModels, pendingRemoveModels, nil } func getUpstreamModelUpdateMinCheckIntervalSeconds() int64 { interval := int64(common.GetEnvOrDefault( "CHANNEL_UPSTREAM_MODEL_UPDATE_MIN_CHECK_INTERVAL_SECONDS", channelUpstreamModelUpdateMinCheckIntervalSeconds, )) if interval < 0 { return channelUpstreamModelUpdateMinCheckIntervalSeconds } return interval } func fetchChannelUpstreamModelIDs(channel *model.Channel) ([]string, error) { baseURL := constant.ChannelBaseURLs[channel.Type] if channel.GetBaseURL() != "" { baseURL = channel.GetBaseURL() } if channel.Type == constant.ChannelTypeOllama { key := strings.TrimSpace(strings.Split(channel.Key, "\n")[0]) models, err := ollama.FetchOllamaModels(baseURL, key) if err != nil { return nil, err } return normalizeModelNames(lo.Map(models, func(item ollama.OllamaModel, _ int) string { return item.Name })), nil } if channel.Type == constant.ChannelTypeGemini { key, _, apiErr := channel.GetNextEnabledKey() if apiErr != nil { return nil, fmt.Errorf("获取渠道密钥失败: %w", apiErr) } key = strings.TrimSpace(key) models, err := gemini.FetchGeminiModels(baseURL, key, channel.GetSetting().Proxy) if err != nil { return nil, err } return normalizeModelNames(models), nil } var url string switch channel.Type { case constant.ChannelTypeAli: url = fmt.Sprintf("%s/compatible-mode/v1/models", baseURL) case constant.ChannelTypeZhipu_v4: if plan, ok := constant.ChannelSpecialBases[baseURL]; ok && plan.OpenAIBaseURL != "" { url = fmt.Sprintf("%s/models", plan.OpenAIBaseURL) } else { url = fmt.Sprintf("%s/api/paas/v4/models", baseURL) } case constant.ChannelTypeVolcEngine: if plan, ok := constant.ChannelSpecialBases[baseURL]; ok && plan.OpenAIBaseURL != "" { url = fmt.Sprintf("%s/v1/models", plan.OpenAIBaseURL) } else { url = fmt.Sprintf("%s/v1/models", baseURL) } case constant.ChannelTypeMoonshot: if plan, ok := constant.ChannelSpecialBases[baseURL]; ok && plan.OpenAIBaseURL != "" { url = fmt.Sprintf("%s/models", plan.OpenAIBaseURL) } else { url = fmt.Sprintf("%s/v1/models", baseURL) } default: url = fmt.Sprintf("%s/v1/models", baseURL) } key, _, apiErr := channel.GetNextEnabledKey() if apiErr != nil { return nil, fmt.Errorf("获取渠道密钥失败: %w", apiErr) } key = strings.TrimSpace(key) headers, err := buildFetchModelsHeaders(channel, key) if err != nil { return nil, err } body, err := GetResponseBody(http.MethodGet, url, channel, headers) if err != nil { return nil, err } var result OpenAIModelsResponse if err := common.Unmarshal(body, &result); err != nil { return nil, err } ids := lo.Map(result.Data, func(item OpenAIModel, _ int) string { if channel.Type == constant.ChannelTypeGemini { return strings.TrimPrefix(item.ID, "models/") } return item.ID }) return normalizeModelNames(ids), nil } func updateChannelUpstreamModelSettings(channel *model.Channel, settings dto.ChannelOtherSettings, updateModels bool) error { channel.SetOtherSettings(settings) updates := map[string]interface{}{ "settings": channel.OtherSettings, } if updateModels { updates["models"] = channel.Models } return model.DB.Model(&model.Channel{}).Where("id = ?", channel.Id).Updates(updates).Error } func checkAndPersistChannelUpstreamModelUpdates( channel *model.Channel, settings *dto.ChannelOtherSettings, force bool, allowAutoApply bool, ) (modelsChanged bool, autoAdded int, err error) { now := common.GetTimestamp() if !force { minInterval := getUpstreamModelUpdateMinCheckIntervalSeconds() if settings.UpstreamModelUpdateLastCheckTime > 0 && now-settings.UpstreamModelUpdateLastCheckTime < minInterval { return false, 0, nil } } pendingAddModels, pendingRemoveModels, fetchErr := collectPendingUpstreamModelChanges(channel, *settings) settings.UpstreamModelUpdateLastCheckTime = now if fetchErr != nil { if err = updateChannelUpstreamModelSettings(channel, *settings, false); err != nil { return false, 0, err } return false, 0, fetchErr } if allowAutoApply && settings.UpstreamModelUpdateAutoSyncEnabled && len(pendingAddModels) > 0 { originModels := normalizeModelNames(channel.GetModels()) mergedModels := mergeModelNames(originModels, pendingAddModels) if len(mergedModels) > len(originModels) { channel.Models = strings.Join(mergedModels, ",") autoAdded = len(mergedModels) - len(originModels) modelsChanged = true } settings.UpstreamModelUpdateLastDetectedModels = []string{} } else { settings.UpstreamModelUpdateLastDetectedModels = pendingAddModels } settings.UpstreamModelUpdateLastRemovedModels = pendingRemoveModels if err = updateChannelUpstreamModelSettings(channel, *settings, modelsChanged); err != nil { return false, autoAdded, err } if modelsChanged { if err = channel.UpdateAbilities(nil); err != nil { return true, autoAdded, err } } return modelsChanged, autoAdded, nil } func refreshChannelRuntimeCache() { if common.MemoryCacheEnabled { func() { defer func() { if r := recover(); r != nil { common.SysLog(fmt.Sprintf("InitChannelCache panic: %v", r)) } }() model.InitChannelCache() }() } service.ResetProxyClientCache() } func shouldSendUpstreamModelUpdateNotification(now int64, changedChannels int, failedChannels int) bool { if changedChannels <= 0 && failedChannels <= 0 { return true } channelUpstreamModelUpdateNotifyState.Lock() defer channelUpstreamModelUpdateNotifyState.Unlock() if channelUpstreamModelUpdateNotifyState.lastNotifiedAt > 0 && now-channelUpstreamModelUpdateNotifyState.lastNotifiedAt < channelUpstreamModelUpdateNotifySuppressWindowSeconds && channelUpstreamModelUpdateNotifyState.lastChangedChannels == changedChannels && channelUpstreamModelUpdateNotifyState.lastFailedChannels == failedChannels { return false } channelUpstreamModelUpdateNotifyState.lastNotifiedAt = now channelUpstreamModelUpdateNotifyState.lastChangedChannels = changedChannels channelUpstreamModelUpdateNotifyState.lastFailedChannels = failedChannels return true } func buildUpstreamModelUpdateTaskNotificationContent( checkedChannels int, changedChannels int, detectedAddModels int, detectedRemoveModels int, autoAddedModels int, failedChannelIDs []int, channelSummaries []upstreamModelUpdateChannelSummary, addModelSamples []string, removeModelSamples []string, ) string { var builder strings.Builder failedChannels := len(failedChannelIDs) builder.WriteString(fmt.Sprintf( "上游模型巡检摘要:检测渠道 %d 个,发现变更 %d 个,新增 %d 个,删除 %d 个,自动同步新增 %d 个,失败 %d 个。", checkedChannels, changedChannels, detectedAddModels, detectedRemoveModels, autoAddedModels, failedChannels, )) if len(channelSummaries) > 0 { displayCount := min(len(channelSummaries), channelUpstreamModelUpdateNotifyMaxChannelDetails) builder.WriteString(fmt.Sprintf("\n\n变更渠道明细(展示 %d/%d):", displayCount, len(channelSummaries))) for _, summary := range channelSummaries[:displayCount] { builder.WriteString(fmt.Sprintf("\n- %s (+%d / -%d)", summary.ChannelName, summary.AddCount, summary.RemoveCount)) } if len(channelSummaries) > displayCount { builder.WriteString(fmt.Sprintf("\n- 其余 %d 个渠道已省略", len(channelSummaries)-displayCount)) } } normalizedAddModelSamples := normalizeModelNames(addModelSamples) if len(normalizedAddModelSamples) > 0 { displayCount := min(len(normalizedAddModelSamples), channelUpstreamModelUpdateNotifyMaxModelDetails) builder.WriteString(fmt.Sprintf("\n\n新增模型示例(展示 %d/%d):%s", displayCount, len(normalizedAddModelSamples), strings.Join(normalizedAddModelSamples[:displayCount], ", "), )) if len(normalizedAddModelSamples) > displayCount { builder.WriteString(fmt.Sprintf("(其余 %d 个已省略)", len(normalizedAddModelSamples)-displayCount)) } } normalizedRemoveModelSamples := normalizeModelNames(removeModelSamples) if len(normalizedRemoveModelSamples) > 0 { displayCount := min(len(normalizedRemoveModelSamples), channelUpstreamModelUpdateNotifyMaxModelDetails) builder.WriteString(fmt.Sprintf("\n\n删除模型示例(展示 %d/%d):%s", displayCount, len(normalizedRemoveModelSamples), strings.Join(normalizedRemoveModelSamples[:displayCount], ", "), )) if len(normalizedRemoveModelSamples) > displayCount { builder.WriteString(fmt.Sprintf("(其余 %d 个已省略)", len(normalizedRemoveModelSamples)-displayCount)) } } if failedChannels > 0 { displayCount := min(failedChannels, channelUpstreamModelUpdateNotifyMaxFailedChannelIDs) displayIDs := lo.Map(failedChannelIDs[:displayCount], func(channelID int, _ int) string { return fmt.Sprintf("%d", channelID) }) builder.WriteString(fmt.Sprintf( "\n\n失败渠道 ID(展示 %d/%d):%s", displayCount, failedChannels, strings.Join(displayIDs, ", "), )) if failedChannels > displayCount { builder.WriteString(fmt.Sprintf("(其余 %d 个已省略)", failedChannels-displayCount)) } } return builder.String() } func runChannelUpstreamModelUpdateTaskOnce() { if !channelUpstreamModelUpdateTaskRunning.CompareAndSwap(false, true) { return } defer channelUpstreamModelUpdateTaskRunning.Store(false) checkedChannels := 0 failedChannels := 0 failedChannelIDs := make([]int, 0) changedChannels := 0 detectedAddModels := 0 detectedRemoveModels := 0 autoAddedModels := 0 channelSummaries := make([]upstreamModelUpdateChannelSummary, 0) addModelSamples := make([]string, 0) removeModelSamples := make([]string, 0) refreshNeeded := false lastID := 0 for { var channels []*model.Channel query := model.DB. Select("id", "name", "type", "key", "status", "base_url", "models", "settings", "setting", "other", "group", "priority", "weight", "tag", "channel_info", "header_override"). Where("status = ?", common.ChannelStatusEnabled). Order("id asc"). Limit(channelUpstreamModelUpdateTaskBatchSize) if lastID > 0 { query = query.Where("id > ?", lastID) } err := query.Find(&channels).Error if err != nil { common.SysLog(fmt.Sprintf("upstream model update task query failed: %v", err)) break } if len(channels) == 0 { break } lastID = channels[len(channels)-1].Id for _, channel := range channels { if channel == nil { continue } settings := channel.GetOtherSettings() if !settings.UpstreamModelUpdateCheckEnabled { continue } checkedChannels++ modelsChanged, autoAdded, err := checkAndPersistChannelUpstreamModelUpdates(channel, &settings, false, true) if err != nil { failedChannels++ failedChannelIDs = append(failedChannelIDs, channel.Id) common.SysLog(fmt.Sprintf("upstream model update check failed: channel_id=%d channel_name=%s err=%v", channel.Id, channel.Name, err)) continue } currentAddModels := normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels) currentRemoveModels := normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels) currentAddCount := len(currentAddModels) + autoAdded currentRemoveCount := len(currentRemoveModels) detectedAddModels += currentAddCount detectedRemoveModels += currentRemoveCount if currentAddCount > 0 || currentRemoveCount > 0 { changedChannels++ channelSummaries = append(channelSummaries, upstreamModelUpdateChannelSummary{ ChannelName: channel.Name, AddCount: currentAddCount, RemoveCount: currentRemoveCount, }) } addModelSamples = mergeModelNames(addModelSamples, currentAddModels) removeModelSamples = mergeModelNames(removeModelSamples, currentRemoveModels) if modelsChanged { refreshNeeded = true } autoAddedModels += autoAdded if common.RequestInterval > 0 { time.Sleep(common.RequestInterval) } } if len(channels) < channelUpstreamModelUpdateTaskBatchSize { break } } if refreshNeeded { refreshChannelRuntimeCache() } if checkedChannels > 0 || common.DebugEnabled { common.SysLog(fmt.Sprintf( "upstream model update task done: checked_channels=%d changed_channels=%d detected_add_models=%d detected_remove_models=%d failed_channels=%d auto_added_models=%d", checkedChannels, changedChannels, detectedAddModels, detectedRemoveModels, failedChannels, autoAddedModels, )) } if changedChannels > 0 || failedChannels > 0 { now := common.GetTimestamp() if !shouldSendUpstreamModelUpdateNotification(now, changedChannels, failedChannels) { common.SysLog(fmt.Sprintf( "upstream model update notification skipped in 24h window: changed_channels=%d failed_channels=%d", changedChannels, failedChannels, )) return } service.NotifyUpstreamModelUpdateWatchers( "上游模型巡检通知", buildUpstreamModelUpdateTaskNotificationContent( checkedChannels, changedChannels, detectedAddModels, detectedRemoveModels, autoAddedModels, failedChannelIDs, channelSummaries, addModelSamples, removeModelSamples, ), ) } } func StartChannelUpstreamModelUpdateTask() { channelUpstreamModelUpdateTaskOnce.Do(func() { if !common.IsMasterNode { return } if !common.GetEnvOrDefaultBool("CHANNEL_UPSTREAM_MODEL_UPDATE_TASK_ENABLED", true) { common.SysLog("upstream model update task disabled by CHANNEL_UPSTREAM_MODEL_UPDATE_TASK_ENABLED") return } intervalMinutes := common.GetEnvOrDefault( "CHANNEL_UPSTREAM_MODEL_UPDATE_TASK_INTERVAL_MINUTES", channelUpstreamModelUpdateTaskDefaultIntervalMinutes, ) if intervalMinutes < 1 { intervalMinutes = channelUpstreamModelUpdateTaskDefaultIntervalMinutes } interval := time.Duration(intervalMinutes) * time.Minute go func() { common.SysLog(fmt.Sprintf("upstream model update task started: interval=%s", interval)) runChannelUpstreamModelUpdateTaskOnce() ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { runChannelUpstreamModelUpdateTaskOnce() } }() }) } func ApplyChannelUpstreamModelUpdates(c *gin.Context) { var req applyChannelUpstreamModelUpdatesRequest if err := c.ShouldBindJSON(&req); err != nil { common.ApiError(c, err) return } if req.ID <= 0 { c.JSON(http.StatusOK, gin.H{ "success": false, "message": "invalid channel id", }) return } channel, err := model.GetChannelById(req.ID, true) if err != nil { common.ApiError(c, err) return } beforeSettings := channel.GetOtherSettings() ignoredModels := intersectModelNames(req.IgnoreModels, beforeSettings.UpstreamModelUpdateLastDetectedModels) addedModels, removedModels, remainingModels, remainingRemoveModels, modelsChanged, err := applyChannelUpstreamModelUpdates( channel, req.AddModels, req.IgnoreModels, req.RemoveModels, ) if err != nil { common.ApiError(c, err) return } if modelsChanged { refreshChannelRuntimeCache() } c.JSON(http.StatusOK, gin.H{ "success": true, "message": "", "data": gin.H{ "id": channel.Id, "added_models": addedModels, "removed_models": removedModels, "ignored_models": ignoredModels, "remaining_models": remainingModels, "remaining_remove_models": remainingRemoveModels, "models": channel.Models, "settings": channel.OtherSettings, }, }) } func DetectChannelUpstreamModelUpdates(c *gin.Context) { var req applyChannelUpstreamModelUpdatesRequest if err := c.ShouldBindJSON(&req); err != nil { common.ApiError(c, err) return } if req.ID <= 0 { c.JSON(http.StatusOK, gin.H{ "success": false, "message": "invalid channel id", }) return } channel, err := model.GetChannelById(req.ID, true) if err != nil { common.ApiError(c, err) return } settings := channel.GetOtherSettings() modelsChanged, autoAdded, err := checkAndPersistChannelUpstreamModelUpdates(channel, &settings, true, false) if err != nil { common.ApiError(c, err) return } if modelsChanged { refreshChannelRuntimeCache() } c.JSON(http.StatusOK, gin.H{ "success": true, "message": "", "data": detectChannelUpstreamModelUpdatesResult{ ChannelID: channel.Id, ChannelName: channel.Name, AddModels: normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels), RemoveModels: normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels), LastCheckTime: settings.UpstreamModelUpdateLastCheckTime, AutoAddedModels: autoAdded, }, }) } func applyChannelUpstreamModelUpdates( channel *model.Channel, addModelsInput []string, ignoreModelsInput []string, removeModelsInput []string, ) ( addedModels []string, removedModels []string, remainingModels []string, remainingRemoveModels []string, modelsChanged bool, err error, ) { settings := channel.GetOtherSettings() pendingAddModels := normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels) pendingRemoveModels := normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels) addModels := intersectModelNames(addModelsInput, pendingAddModels) ignoreModels := intersectModelNames(ignoreModelsInput, pendingAddModels) removeModels := intersectModelNames(removeModelsInput, pendingRemoveModels) removeModels = subtractModelNames(removeModels, addModels) originModels := normalizeModelNames(channel.GetModels()) nextModels := applySelectedModelChanges(originModels, addModels, removeModels) modelsChanged = !slices.Equal(originModels, nextModels) if modelsChanged { channel.Models = strings.Join(nextModels, ",") } settings.UpstreamModelUpdateIgnoredModels = mergeModelNames(settings.UpstreamModelUpdateIgnoredModels, ignoreModels) if len(addModels) > 0 { settings.UpstreamModelUpdateIgnoredModels = subtractModelNames(settings.UpstreamModelUpdateIgnoredModels, addModels) } remainingModels = subtractModelNames(pendingAddModels, append(addModels, ignoreModels...)) remainingRemoveModels = subtractModelNames(pendingRemoveModels, removeModels) settings.UpstreamModelUpdateLastDetectedModels = remainingModels settings.UpstreamModelUpdateLastRemovedModels = remainingRemoveModels settings.UpstreamModelUpdateLastCheckTime = common.GetTimestamp() if err := updateChannelUpstreamModelSettings(channel, settings, modelsChanged); err != nil { return nil, nil, nil, nil, false, err } if modelsChanged { if err := channel.UpdateAbilities(nil); err != nil { return addModels, removeModels, remainingModels, remainingRemoveModels, true, err } } return addModels, removeModels, remainingModels, remainingRemoveModels, modelsChanged, nil } func collectPendingApplyUpstreamModelChanges(settings dto.ChannelOtherSettings) (pendingAddModels []string, pendingRemoveModels []string) { return normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels), normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels) } func findEnabledChannelsAfterID(lastID int, batchSize int) ([]*model.Channel, error) { var channels []*model.Channel query := model.DB. Select("id", "name", "type", "key", "status", "base_url", "models", "settings", "setting", "other", "group", "priority", "weight", "tag", "channel_info", "header_override"). Where("status = ?", common.ChannelStatusEnabled). Order("id asc"). Limit(batchSize) if lastID > 0 { query = query.Where("id > ?", lastID) } return channels, query.Find(&channels).Error } func ApplyAllChannelUpstreamModelUpdates(c *gin.Context) { results := make([]applyAllChannelUpstreamModelUpdatesResult, 0) failed := make([]int, 0) refreshNeeded := false addedModelCount := 0 removedModelCount := 0 lastID := 0 for { channels, err := findEnabledChannelsAfterID(lastID, channelUpstreamModelUpdateTaskBatchSize) if err != nil { common.ApiError(c, err) return } if len(channels) == 0 { break } lastID = channels[len(channels)-1].Id for _, channel := range channels { if channel == nil { continue } settings := channel.GetOtherSettings() if !settings.UpstreamModelUpdateCheckEnabled { continue } pendingAddModels, pendingRemoveModels := collectPendingApplyUpstreamModelChanges(settings) if len(pendingAddModels) == 0 && len(pendingRemoveModels) == 0 { continue } addedModels, removedModels, remainingModels, remainingRemoveModels, modelsChanged, err := applyChannelUpstreamModelUpdates( channel, pendingAddModels, nil, pendingRemoveModels, ) if err != nil { failed = append(failed, channel.Id) continue } if modelsChanged { refreshNeeded = true } addedModelCount += len(addedModels) removedModelCount += len(removedModels) results = append(results, applyAllChannelUpstreamModelUpdatesResult{ ChannelID: channel.Id, ChannelName: channel.Name, AddedModels: addedModels, RemovedModels: removedModels, RemainingModels: remainingModels, RemainingRemoveModels: remainingRemoveModels, }) } if len(channels) < channelUpstreamModelUpdateTaskBatchSize { break } } if refreshNeeded { refreshChannelRuntimeCache() } c.JSON(http.StatusOK, gin.H{ "success": true, "message": "", "data": gin.H{ "processed_channels": len(results), "added_models": addedModelCount, "removed_models": removedModelCount, "failed_channel_ids": failed, "results": results, }, }) } func DetectAllChannelUpstreamModelUpdates(c *gin.Context) { results := make([]detectChannelUpstreamModelUpdatesResult, 0) failed := make([]int, 0) detectedAddCount := 0 detectedRemoveCount := 0 refreshNeeded := false lastID := 0 for { channels, err := findEnabledChannelsAfterID(lastID, channelUpstreamModelUpdateTaskBatchSize) if err != nil { common.ApiError(c, err) return } if len(channels) == 0 { break } lastID = channels[len(channels)-1].Id for _, channel := range channels { if channel == nil { continue } settings := channel.GetOtherSettings() if !settings.UpstreamModelUpdateCheckEnabled { continue } modelsChanged, autoAdded, err := checkAndPersistChannelUpstreamModelUpdates(channel, &settings, true, false) if err != nil { failed = append(failed, channel.Id) continue } if modelsChanged { refreshNeeded = true } addModels := normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels) removeModels := normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels) detectedAddCount += len(addModels) detectedRemoveCount += len(removeModels) results = append(results, detectChannelUpstreamModelUpdatesResult{ ChannelID: channel.Id, ChannelName: channel.Name, AddModels: addModels, RemoveModels: removeModels, LastCheckTime: settings.UpstreamModelUpdateLastCheckTime, AutoAddedModels: autoAdded, }) } if len(channels) < channelUpstreamModelUpdateTaskBatchSize { break } } if refreshNeeded { refreshChannelRuntimeCache() } c.JSON(http.StatusOK, gin.H{ "success": true, "message": "", "data": gin.H{ "processed_channels": len(results), "failed_channel_ids": failed, "detected_add_models": detectedAddCount, "detected_remove_models": detectedRemoveCount, "channel_detected_results": results, }, }) }