| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975 |
- package controller
- import (
- "fmt"
- "net/http"
- "slices"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/QuantumNous/new-api/common"
- "github.com/QuantumNous/new-api/constant"
- "github.com/QuantumNous/new-api/dto"
- "github.com/QuantumNous/new-api/model"
- "github.com/QuantumNous/new-api/relay/channel/gemini"
- "github.com/QuantumNous/new-api/relay/channel/ollama"
- "github.com/QuantumNous/new-api/service"
- "github.com/gin-gonic/gin"
- "github.com/samber/lo"
- )
- const (
- channelUpstreamModelUpdateTaskDefaultIntervalMinutes = 30
- channelUpstreamModelUpdateTaskBatchSize = 100
- channelUpstreamModelUpdateMinCheckIntervalSeconds = 300
- channelUpstreamModelUpdateNotifySuppressWindowSeconds = 86400
- channelUpstreamModelUpdateNotifyMaxChannelDetails = 8
- channelUpstreamModelUpdateNotifyMaxModelDetails = 12
- channelUpstreamModelUpdateNotifyMaxFailedChannelIDs = 10
- )
- var (
- channelUpstreamModelUpdateTaskOnce sync.Once
- channelUpstreamModelUpdateTaskRunning atomic.Bool
- channelUpstreamModelUpdateNotifyState = struct {
- sync.Mutex
- lastNotifiedAt int64
- lastChangedChannels int
- lastFailedChannels int
- }{}
- )
- type applyChannelUpstreamModelUpdatesRequest struct {
- ID int `json:"id"`
- AddModels []string `json:"add_models"`
- RemoveModels []string `json:"remove_models"`
- IgnoreModels []string `json:"ignore_models"`
- }
- type applyAllChannelUpstreamModelUpdatesResult struct {
- ChannelID int `json:"channel_id"`
- ChannelName string `json:"channel_name"`
- AddedModels []string `json:"added_models"`
- RemovedModels []string `json:"removed_models"`
- RemainingModels []string `json:"remaining_models"`
- RemainingRemoveModels []string `json:"remaining_remove_models"`
- }
- type detectChannelUpstreamModelUpdatesResult struct {
- ChannelID int `json:"channel_id"`
- ChannelName string `json:"channel_name"`
- AddModels []string `json:"add_models"`
- RemoveModels []string `json:"remove_models"`
- LastCheckTime int64 `json:"last_check_time"`
- AutoAddedModels int `json:"auto_added_models"`
- }
- type upstreamModelUpdateChannelSummary struct {
- ChannelName string
- AddCount int
- RemoveCount int
- }
- func normalizeModelNames(models []string) []string {
- return lo.Uniq(lo.FilterMap(models, func(model string, _ int) (string, bool) {
- trimmed := strings.TrimSpace(model)
- return trimmed, trimmed != ""
- }))
- }
- func mergeModelNames(base []string, appended []string) []string {
- merged := normalizeModelNames(base)
- seen := make(map[string]struct{}, len(merged))
- for _, model := range merged {
- seen[model] = struct{}{}
- }
- for _, model := range normalizeModelNames(appended) {
- if _, ok := seen[model]; ok {
- continue
- }
- seen[model] = struct{}{}
- merged = append(merged, model)
- }
- return merged
- }
- func subtractModelNames(base []string, removed []string) []string {
- removeSet := make(map[string]struct{}, len(removed))
- for _, model := range normalizeModelNames(removed) {
- removeSet[model] = struct{}{}
- }
- return lo.Filter(normalizeModelNames(base), func(model string, _ int) bool {
- _, ok := removeSet[model]
- return !ok
- })
- }
- func intersectModelNames(base []string, allowed []string) []string {
- allowedSet := make(map[string]struct{}, len(allowed))
- for _, model := range normalizeModelNames(allowed) {
- allowedSet[model] = struct{}{}
- }
- return lo.Filter(normalizeModelNames(base), func(model string, _ int) bool {
- _, ok := allowedSet[model]
- return ok
- })
- }
- func applySelectedModelChanges(originModels []string, addModels []string, removeModels []string) []string {
- // Add wins when the same model appears in both selected lists.
- normalizedAdd := normalizeModelNames(addModels)
- normalizedRemove := subtractModelNames(normalizeModelNames(removeModels), normalizedAdd)
- return subtractModelNames(mergeModelNames(originModels, normalizedAdd), normalizedRemove)
- }
- func normalizeChannelModelMapping(channel *model.Channel) map[string]string {
- if channel == nil || channel.ModelMapping == nil {
- return nil
- }
- rawMapping := strings.TrimSpace(*channel.ModelMapping)
- if rawMapping == "" || rawMapping == "{}" {
- return nil
- }
- parsed := make(map[string]string)
- if err := common.UnmarshalJsonStr(rawMapping, &parsed); err != nil {
- return nil
- }
- normalized := make(map[string]string, len(parsed))
- for source, target := range parsed {
- normalizedSource := strings.TrimSpace(source)
- normalizedTarget := strings.TrimSpace(target)
- if normalizedSource == "" || normalizedTarget == "" {
- continue
- }
- normalized[normalizedSource] = normalizedTarget
- }
- if len(normalized) == 0 {
- return nil
- }
- return normalized
- }
- func collectPendingUpstreamModelChangesFromModels(
- localModels []string,
- upstreamModels []string,
- ignoredModels []string,
- modelMapping map[string]string,
- ) (pendingAddModels []string, pendingRemoveModels []string) {
- localSet := make(map[string]struct{})
- localModels = normalizeModelNames(localModels)
- upstreamModels = normalizeModelNames(upstreamModels)
- for _, modelName := range localModels {
- localSet[modelName] = struct{}{}
- }
- upstreamSet := make(map[string]struct{}, len(upstreamModels))
- for _, modelName := range upstreamModels {
- upstreamSet[modelName] = struct{}{}
- }
- ignoredSet := make(map[string]struct{})
- for _, modelName := range normalizeModelNames(ignoredModels) {
- ignoredSet[modelName] = struct{}{}
- }
- redirectSourceSet := make(map[string]struct{}, len(modelMapping))
- redirectTargetSet := make(map[string]struct{}, len(modelMapping))
- for source, target := range modelMapping {
- redirectSourceSet[source] = struct{}{}
- redirectTargetSet[target] = struct{}{}
- }
- coveredUpstreamSet := make(map[string]struct{}, len(localSet)+len(redirectTargetSet))
- for modelName := range localSet {
- coveredUpstreamSet[modelName] = struct{}{}
- }
- for modelName := range redirectTargetSet {
- coveredUpstreamSet[modelName] = struct{}{}
- }
- pendingAdd := lo.Filter(upstreamModels, func(modelName string, _ int) bool {
- if _, ok := coveredUpstreamSet[modelName]; ok {
- return false
- }
- if _, ok := ignoredSet[modelName]; ok {
- return false
- }
- return true
- })
- pendingRemove := lo.Filter(localModels, func(modelName string, _ int) bool {
- // Redirect source models are virtual aliases and should not be removed
- // only because they are absent from upstream model list.
- if _, ok := redirectSourceSet[modelName]; ok {
- return false
- }
- _, ok := upstreamSet[modelName]
- return !ok
- })
- return normalizeModelNames(pendingAdd), normalizeModelNames(pendingRemove)
- }
- func collectPendingUpstreamModelChanges(channel *model.Channel, settings dto.ChannelOtherSettings) (pendingAddModels []string, pendingRemoveModels []string, err error) {
- upstreamModels, err := fetchChannelUpstreamModelIDs(channel)
- if err != nil {
- return nil, nil, err
- }
- pendingAddModels, pendingRemoveModels = collectPendingUpstreamModelChangesFromModels(
- channel.GetModels(),
- upstreamModels,
- settings.UpstreamModelUpdateIgnoredModels,
- normalizeChannelModelMapping(channel),
- )
- return pendingAddModels, pendingRemoveModels, nil
- }
- func getUpstreamModelUpdateMinCheckIntervalSeconds() int64 {
- interval := int64(common.GetEnvOrDefault(
- "CHANNEL_UPSTREAM_MODEL_UPDATE_MIN_CHECK_INTERVAL_SECONDS",
- channelUpstreamModelUpdateMinCheckIntervalSeconds,
- ))
- if interval < 0 {
- return channelUpstreamModelUpdateMinCheckIntervalSeconds
- }
- return interval
- }
- func fetchChannelUpstreamModelIDs(channel *model.Channel) ([]string, error) {
- baseURL := constant.ChannelBaseURLs[channel.Type]
- if channel.GetBaseURL() != "" {
- baseURL = channel.GetBaseURL()
- }
- if channel.Type == constant.ChannelTypeOllama {
- key := strings.TrimSpace(strings.Split(channel.Key, "\n")[0])
- models, err := ollama.FetchOllamaModels(baseURL, key)
- if err != nil {
- return nil, err
- }
- return normalizeModelNames(lo.Map(models, func(item ollama.OllamaModel, _ int) string {
- return item.Name
- })), nil
- }
- if channel.Type == constant.ChannelTypeGemini {
- key, _, apiErr := channel.GetNextEnabledKey()
- if apiErr != nil {
- return nil, fmt.Errorf("获取渠道密钥失败: %w", apiErr)
- }
- key = strings.TrimSpace(key)
- models, err := gemini.FetchGeminiModels(baseURL, key, channel.GetSetting().Proxy)
- if err != nil {
- return nil, err
- }
- return normalizeModelNames(models), nil
- }
- var url string
- switch channel.Type {
- case constant.ChannelTypeAli:
- url = fmt.Sprintf("%s/compatible-mode/v1/models", baseURL)
- case constant.ChannelTypeZhipu_v4:
- if plan, ok := constant.ChannelSpecialBases[baseURL]; ok && plan.OpenAIBaseURL != "" {
- url = fmt.Sprintf("%s/models", plan.OpenAIBaseURL)
- } else {
- url = fmt.Sprintf("%s/api/paas/v4/models", baseURL)
- }
- case constant.ChannelTypeVolcEngine:
- if plan, ok := constant.ChannelSpecialBases[baseURL]; ok && plan.OpenAIBaseURL != "" {
- url = fmt.Sprintf("%s/v1/models", plan.OpenAIBaseURL)
- } else {
- url = fmt.Sprintf("%s/v1/models", baseURL)
- }
- case constant.ChannelTypeMoonshot:
- if plan, ok := constant.ChannelSpecialBases[baseURL]; ok && plan.OpenAIBaseURL != "" {
- url = fmt.Sprintf("%s/models", plan.OpenAIBaseURL)
- } else {
- url = fmt.Sprintf("%s/v1/models", baseURL)
- }
- default:
- url = fmt.Sprintf("%s/v1/models", baseURL)
- }
- key, _, apiErr := channel.GetNextEnabledKey()
- if apiErr != nil {
- return nil, fmt.Errorf("获取渠道密钥失败: %w", apiErr)
- }
- key = strings.TrimSpace(key)
- headers, err := buildFetchModelsHeaders(channel, key)
- if err != nil {
- return nil, err
- }
- body, err := GetResponseBody(http.MethodGet, url, channel, headers)
- if err != nil {
- return nil, err
- }
- var result OpenAIModelsResponse
- if err := common.Unmarshal(body, &result); err != nil {
- return nil, err
- }
- ids := lo.Map(result.Data, func(item OpenAIModel, _ int) string {
- if channel.Type == constant.ChannelTypeGemini {
- return strings.TrimPrefix(item.ID, "models/")
- }
- return item.ID
- })
- return normalizeModelNames(ids), nil
- }
- func updateChannelUpstreamModelSettings(channel *model.Channel, settings dto.ChannelOtherSettings, updateModels bool) error {
- channel.SetOtherSettings(settings)
- updates := map[string]interface{}{
- "settings": channel.OtherSettings,
- }
- if updateModels {
- updates["models"] = channel.Models
- }
- return model.DB.Model(&model.Channel{}).Where("id = ?", channel.Id).Updates(updates).Error
- }
- func checkAndPersistChannelUpstreamModelUpdates(
- channel *model.Channel,
- settings *dto.ChannelOtherSettings,
- force bool,
- allowAutoApply bool,
- ) (modelsChanged bool, autoAdded int, err error) {
- now := common.GetTimestamp()
- if !force {
- minInterval := getUpstreamModelUpdateMinCheckIntervalSeconds()
- if settings.UpstreamModelUpdateLastCheckTime > 0 &&
- now-settings.UpstreamModelUpdateLastCheckTime < minInterval {
- return false, 0, nil
- }
- }
- pendingAddModels, pendingRemoveModels, fetchErr := collectPendingUpstreamModelChanges(channel, *settings)
- settings.UpstreamModelUpdateLastCheckTime = now
- if fetchErr != nil {
- if err = updateChannelUpstreamModelSettings(channel, *settings, false); err != nil {
- return false, 0, err
- }
- return false, 0, fetchErr
- }
- if allowAutoApply && settings.UpstreamModelUpdateAutoSyncEnabled && len(pendingAddModels) > 0 {
- originModels := normalizeModelNames(channel.GetModels())
- mergedModels := mergeModelNames(originModels, pendingAddModels)
- if len(mergedModels) > len(originModels) {
- channel.Models = strings.Join(mergedModels, ",")
- autoAdded = len(mergedModels) - len(originModels)
- modelsChanged = true
- }
- settings.UpstreamModelUpdateLastDetectedModels = []string{}
- } else {
- settings.UpstreamModelUpdateLastDetectedModels = pendingAddModels
- }
- settings.UpstreamModelUpdateLastRemovedModels = pendingRemoveModels
- if err = updateChannelUpstreamModelSettings(channel, *settings, modelsChanged); err != nil {
- return false, autoAdded, err
- }
- if modelsChanged {
- if err = channel.UpdateAbilities(nil); err != nil {
- return true, autoAdded, err
- }
- }
- return modelsChanged, autoAdded, nil
- }
- func refreshChannelRuntimeCache() {
- if common.MemoryCacheEnabled {
- func() {
- defer func() {
- if r := recover(); r != nil {
- common.SysLog(fmt.Sprintf("InitChannelCache panic: %v", r))
- }
- }()
- model.InitChannelCache()
- }()
- }
- service.ResetProxyClientCache()
- }
- func shouldSendUpstreamModelUpdateNotification(now int64, changedChannels int, failedChannels int) bool {
- if changedChannels <= 0 && failedChannels <= 0 {
- return true
- }
- channelUpstreamModelUpdateNotifyState.Lock()
- defer channelUpstreamModelUpdateNotifyState.Unlock()
- if channelUpstreamModelUpdateNotifyState.lastNotifiedAt > 0 &&
- now-channelUpstreamModelUpdateNotifyState.lastNotifiedAt < channelUpstreamModelUpdateNotifySuppressWindowSeconds &&
- channelUpstreamModelUpdateNotifyState.lastChangedChannels == changedChannels &&
- channelUpstreamModelUpdateNotifyState.lastFailedChannels == failedChannels {
- return false
- }
- channelUpstreamModelUpdateNotifyState.lastNotifiedAt = now
- channelUpstreamModelUpdateNotifyState.lastChangedChannels = changedChannels
- channelUpstreamModelUpdateNotifyState.lastFailedChannels = failedChannels
- return true
- }
- func buildUpstreamModelUpdateTaskNotificationContent(
- checkedChannels int,
- changedChannels int,
- detectedAddModels int,
- detectedRemoveModels int,
- autoAddedModels int,
- failedChannelIDs []int,
- channelSummaries []upstreamModelUpdateChannelSummary,
- addModelSamples []string,
- removeModelSamples []string,
- ) string {
- var builder strings.Builder
- failedChannels := len(failedChannelIDs)
- builder.WriteString(fmt.Sprintf(
- "上游模型巡检摘要:检测渠道 %d 个,发现变更 %d 个,新增 %d 个,删除 %d 个,自动同步新增 %d 个,失败 %d 个。",
- checkedChannels,
- changedChannels,
- detectedAddModels,
- detectedRemoveModels,
- autoAddedModels,
- failedChannels,
- ))
- if len(channelSummaries) > 0 {
- displayCount := min(len(channelSummaries), channelUpstreamModelUpdateNotifyMaxChannelDetails)
- builder.WriteString(fmt.Sprintf("\n\n变更渠道明细(展示 %d/%d):", displayCount, len(channelSummaries)))
- for _, summary := range channelSummaries[:displayCount] {
- builder.WriteString(fmt.Sprintf("\n- %s (+%d / -%d)", summary.ChannelName, summary.AddCount, summary.RemoveCount))
- }
- if len(channelSummaries) > displayCount {
- builder.WriteString(fmt.Sprintf("\n- 其余 %d 个渠道已省略", len(channelSummaries)-displayCount))
- }
- }
- normalizedAddModelSamples := normalizeModelNames(addModelSamples)
- if len(normalizedAddModelSamples) > 0 {
- displayCount := min(len(normalizedAddModelSamples), channelUpstreamModelUpdateNotifyMaxModelDetails)
- builder.WriteString(fmt.Sprintf("\n\n新增模型示例(展示 %d/%d):%s",
- displayCount,
- len(normalizedAddModelSamples),
- strings.Join(normalizedAddModelSamples[:displayCount], ", "),
- ))
- if len(normalizedAddModelSamples) > displayCount {
- builder.WriteString(fmt.Sprintf("(其余 %d 个已省略)", len(normalizedAddModelSamples)-displayCount))
- }
- }
- normalizedRemoveModelSamples := normalizeModelNames(removeModelSamples)
- if len(normalizedRemoveModelSamples) > 0 {
- displayCount := min(len(normalizedRemoveModelSamples), channelUpstreamModelUpdateNotifyMaxModelDetails)
- builder.WriteString(fmt.Sprintf("\n\n删除模型示例(展示 %d/%d):%s",
- displayCount,
- len(normalizedRemoveModelSamples),
- strings.Join(normalizedRemoveModelSamples[:displayCount], ", "),
- ))
- if len(normalizedRemoveModelSamples) > displayCount {
- builder.WriteString(fmt.Sprintf("(其余 %d 个已省略)", len(normalizedRemoveModelSamples)-displayCount))
- }
- }
- if failedChannels > 0 {
- displayCount := min(failedChannels, channelUpstreamModelUpdateNotifyMaxFailedChannelIDs)
- displayIDs := lo.Map(failedChannelIDs[:displayCount], func(channelID int, _ int) string {
- return fmt.Sprintf("%d", channelID)
- })
- builder.WriteString(fmt.Sprintf(
- "\n\n失败渠道 ID(展示 %d/%d):%s",
- displayCount,
- failedChannels,
- strings.Join(displayIDs, ", "),
- ))
- if failedChannels > displayCount {
- builder.WriteString(fmt.Sprintf("(其余 %d 个已省略)", failedChannels-displayCount))
- }
- }
- return builder.String()
- }
- func runChannelUpstreamModelUpdateTaskOnce() {
- if !channelUpstreamModelUpdateTaskRunning.CompareAndSwap(false, true) {
- return
- }
- defer channelUpstreamModelUpdateTaskRunning.Store(false)
- checkedChannels := 0
- failedChannels := 0
- failedChannelIDs := make([]int, 0)
- changedChannels := 0
- detectedAddModels := 0
- detectedRemoveModels := 0
- autoAddedModels := 0
- channelSummaries := make([]upstreamModelUpdateChannelSummary, 0)
- addModelSamples := make([]string, 0)
- removeModelSamples := make([]string, 0)
- refreshNeeded := false
- lastID := 0
- for {
- var channels []*model.Channel
- query := model.DB.
- Select("id", "name", "type", "key", "status", "base_url", "models", "settings", "setting", "other", "group", "priority", "weight", "tag", "channel_info", "header_override").
- Where("status = ?", common.ChannelStatusEnabled).
- Order("id asc").
- Limit(channelUpstreamModelUpdateTaskBatchSize)
- if lastID > 0 {
- query = query.Where("id > ?", lastID)
- }
- err := query.Find(&channels).Error
- if err != nil {
- common.SysLog(fmt.Sprintf("upstream model update task query failed: %v", err))
- break
- }
- if len(channels) == 0 {
- break
- }
- lastID = channels[len(channels)-1].Id
- for _, channel := range channels {
- if channel == nil {
- continue
- }
- settings := channel.GetOtherSettings()
- if !settings.UpstreamModelUpdateCheckEnabled {
- continue
- }
- checkedChannels++
- modelsChanged, autoAdded, err := checkAndPersistChannelUpstreamModelUpdates(channel, &settings, false, true)
- if err != nil {
- failedChannels++
- failedChannelIDs = append(failedChannelIDs, channel.Id)
- common.SysLog(fmt.Sprintf("upstream model update check failed: channel_id=%d channel_name=%s err=%v", channel.Id, channel.Name, err))
- continue
- }
- currentAddModels := normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels)
- currentRemoveModels := normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels)
- currentAddCount := len(currentAddModels) + autoAdded
- currentRemoveCount := len(currentRemoveModels)
- detectedAddModels += currentAddCount
- detectedRemoveModels += currentRemoveCount
- if currentAddCount > 0 || currentRemoveCount > 0 {
- changedChannels++
- channelSummaries = append(channelSummaries, upstreamModelUpdateChannelSummary{
- ChannelName: channel.Name,
- AddCount: currentAddCount,
- RemoveCount: currentRemoveCount,
- })
- }
- addModelSamples = mergeModelNames(addModelSamples, currentAddModels)
- removeModelSamples = mergeModelNames(removeModelSamples, currentRemoveModels)
- if modelsChanged {
- refreshNeeded = true
- }
- autoAddedModels += autoAdded
- if common.RequestInterval > 0 {
- time.Sleep(common.RequestInterval)
- }
- }
- if len(channels) < channelUpstreamModelUpdateTaskBatchSize {
- break
- }
- }
- if refreshNeeded {
- refreshChannelRuntimeCache()
- }
- if checkedChannels > 0 || common.DebugEnabled {
- common.SysLog(fmt.Sprintf(
- "upstream model update task done: checked_channels=%d changed_channels=%d detected_add_models=%d detected_remove_models=%d failed_channels=%d auto_added_models=%d",
- checkedChannels,
- changedChannels,
- detectedAddModels,
- detectedRemoveModels,
- failedChannels,
- autoAddedModels,
- ))
- }
- if changedChannels > 0 || failedChannels > 0 {
- now := common.GetTimestamp()
- if !shouldSendUpstreamModelUpdateNotification(now, changedChannels, failedChannels) {
- common.SysLog(fmt.Sprintf(
- "upstream model update notification skipped in 24h window: changed_channels=%d failed_channels=%d",
- changedChannels,
- failedChannels,
- ))
- return
- }
- service.NotifyUpstreamModelUpdateWatchers(
- "上游模型巡检通知",
- buildUpstreamModelUpdateTaskNotificationContent(
- checkedChannels,
- changedChannels,
- detectedAddModels,
- detectedRemoveModels,
- autoAddedModels,
- failedChannelIDs,
- channelSummaries,
- addModelSamples,
- removeModelSamples,
- ),
- )
- }
- }
- func StartChannelUpstreamModelUpdateTask() {
- channelUpstreamModelUpdateTaskOnce.Do(func() {
- if !common.IsMasterNode {
- return
- }
- if !common.GetEnvOrDefaultBool("CHANNEL_UPSTREAM_MODEL_UPDATE_TASK_ENABLED", true) {
- common.SysLog("upstream model update task disabled by CHANNEL_UPSTREAM_MODEL_UPDATE_TASK_ENABLED")
- return
- }
- intervalMinutes := common.GetEnvOrDefault(
- "CHANNEL_UPSTREAM_MODEL_UPDATE_TASK_INTERVAL_MINUTES",
- channelUpstreamModelUpdateTaskDefaultIntervalMinutes,
- )
- if intervalMinutes < 1 {
- intervalMinutes = channelUpstreamModelUpdateTaskDefaultIntervalMinutes
- }
- interval := time.Duration(intervalMinutes) * time.Minute
- go func() {
- common.SysLog(fmt.Sprintf("upstream model update task started: interval=%s", interval))
- runChannelUpstreamModelUpdateTaskOnce()
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for range ticker.C {
- runChannelUpstreamModelUpdateTaskOnce()
- }
- }()
- })
- }
- func ApplyChannelUpstreamModelUpdates(c *gin.Context) {
- var req applyChannelUpstreamModelUpdatesRequest
- if err := c.ShouldBindJSON(&req); err != nil {
- common.ApiError(c, err)
- return
- }
- if req.ID <= 0 {
- c.JSON(http.StatusOK, gin.H{
- "success": false,
- "message": "invalid channel id",
- })
- return
- }
- channel, err := model.GetChannelById(req.ID, true)
- if err != nil {
- common.ApiError(c, err)
- return
- }
- beforeSettings := channel.GetOtherSettings()
- ignoredModels := intersectModelNames(req.IgnoreModels, beforeSettings.UpstreamModelUpdateLastDetectedModels)
- addedModels, removedModels, remainingModels, remainingRemoveModels, modelsChanged, err := applyChannelUpstreamModelUpdates(
- channel,
- req.AddModels,
- req.IgnoreModels,
- req.RemoveModels,
- )
- if err != nil {
- common.ApiError(c, err)
- return
- }
- if modelsChanged {
- refreshChannelRuntimeCache()
- }
- c.JSON(http.StatusOK, gin.H{
- "success": true,
- "message": "",
- "data": gin.H{
- "id": channel.Id,
- "added_models": addedModels,
- "removed_models": removedModels,
- "ignored_models": ignoredModels,
- "remaining_models": remainingModels,
- "remaining_remove_models": remainingRemoveModels,
- "models": channel.Models,
- "settings": channel.OtherSettings,
- },
- })
- }
- func DetectChannelUpstreamModelUpdates(c *gin.Context) {
- var req applyChannelUpstreamModelUpdatesRequest
- if err := c.ShouldBindJSON(&req); err != nil {
- common.ApiError(c, err)
- return
- }
- if req.ID <= 0 {
- c.JSON(http.StatusOK, gin.H{
- "success": false,
- "message": "invalid channel id",
- })
- return
- }
- channel, err := model.GetChannelById(req.ID, true)
- if err != nil {
- common.ApiError(c, err)
- return
- }
- settings := channel.GetOtherSettings()
- modelsChanged, autoAdded, err := checkAndPersistChannelUpstreamModelUpdates(channel, &settings, true, false)
- if err != nil {
- common.ApiError(c, err)
- return
- }
- if modelsChanged {
- refreshChannelRuntimeCache()
- }
- c.JSON(http.StatusOK, gin.H{
- "success": true,
- "message": "",
- "data": detectChannelUpstreamModelUpdatesResult{
- ChannelID: channel.Id,
- ChannelName: channel.Name,
- AddModels: normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels),
- RemoveModels: normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels),
- LastCheckTime: settings.UpstreamModelUpdateLastCheckTime,
- AutoAddedModels: autoAdded,
- },
- })
- }
- func applyChannelUpstreamModelUpdates(
- channel *model.Channel,
- addModelsInput []string,
- ignoreModelsInput []string,
- removeModelsInput []string,
- ) (
- addedModels []string,
- removedModels []string,
- remainingModels []string,
- remainingRemoveModels []string,
- modelsChanged bool,
- err error,
- ) {
- settings := channel.GetOtherSettings()
- pendingAddModels := normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels)
- pendingRemoveModels := normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels)
- addModels := intersectModelNames(addModelsInput, pendingAddModels)
- ignoreModels := intersectModelNames(ignoreModelsInput, pendingAddModels)
- removeModels := intersectModelNames(removeModelsInput, pendingRemoveModels)
- removeModels = subtractModelNames(removeModels, addModels)
- originModels := normalizeModelNames(channel.GetModels())
- nextModels := applySelectedModelChanges(originModels, addModels, removeModels)
- modelsChanged = !slices.Equal(originModels, nextModels)
- if modelsChanged {
- channel.Models = strings.Join(nextModels, ",")
- }
- settings.UpstreamModelUpdateIgnoredModels = mergeModelNames(settings.UpstreamModelUpdateIgnoredModels, ignoreModels)
- if len(addModels) > 0 {
- settings.UpstreamModelUpdateIgnoredModels = subtractModelNames(settings.UpstreamModelUpdateIgnoredModels, addModels)
- }
- remainingModels = subtractModelNames(pendingAddModels, append(addModels, ignoreModels...))
- remainingRemoveModels = subtractModelNames(pendingRemoveModels, removeModels)
- settings.UpstreamModelUpdateLastDetectedModels = remainingModels
- settings.UpstreamModelUpdateLastRemovedModels = remainingRemoveModels
- settings.UpstreamModelUpdateLastCheckTime = common.GetTimestamp()
- if err := updateChannelUpstreamModelSettings(channel, settings, modelsChanged); err != nil {
- return nil, nil, nil, nil, false, err
- }
- if modelsChanged {
- if err := channel.UpdateAbilities(nil); err != nil {
- return addModels, removeModels, remainingModels, remainingRemoveModels, true, err
- }
- }
- return addModels, removeModels, remainingModels, remainingRemoveModels, modelsChanged, nil
- }
- func collectPendingApplyUpstreamModelChanges(settings dto.ChannelOtherSettings) (pendingAddModels []string, pendingRemoveModels []string) {
- return normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels), normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels)
- }
- func findEnabledChannelsAfterID(lastID int, batchSize int) ([]*model.Channel, error) {
- var channels []*model.Channel
- query := model.DB.
- Select("id", "name", "type", "key", "status", "base_url", "models", "settings", "setting", "other", "group", "priority", "weight", "tag", "channel_info", "header_override").
- Where("status = ?", common.ChannelStatusEnabled).
- Order("id asc").
- Limit(batchSize)
- if lastID > 0 {
- query = query.Where("id > ?", lastID)
- }
- return channels, query.Find(&channels).Error
- }
- func ApplyAllChannelUpstreamModelUpdates(c *gin.Context) {
- results := make([]applyAllChannelUpstreamModelUpdatesResult, 0)
- failed := make([]int, 0)
- refreshNeeded := false
- addedModelCount := 0
- removedModelCount := 0
- lastID := 0
- for {
- channels, err := findEnabledChannelsAfterID(lastID, channelUpstreamModelUpdateTaskBatchSize)
- if err != nil {
- common.ApiError(c, err)
- return
- }
- if len(channels) == 0 {
- break
- }
- lastID = channels[len(channels)-1].Id
- for _, channel := range channels {
- if channel == nil {
- continue
- }
- settings := channel.GetOtherSettings()
- if !settings.UpstreamModelUpdateCheckEnabled {
- continue
- }
- pendingAddModels, pendingRemoveModels := collectPendingApplyUpstreamModelChanges(settings)
- if len(pendingAddModels) == 0 && len(pendingRemoveModels) == 0 {
- continue
- }
- addedModels, removedModels, remainingModels, remainingRemoveModels, modelsChanged, err := applyChannelUpstreamModelUpdates(
- channel,
- pendingAddModels,
- nil,
- pendingRemoveModels,
- )
- if err != nil {
- failed = append(failed, channel.Id)
- continue
- }
- if modelsChanged {
- refreshNeeded = true
- }
- addedModelCount += len(addedModels)
- removedModelCount += len(removedModels)
- results = append(results, applyAllChannelUpstreamModelUpdatesResult{
- ChannelID: channel.Id,
- ChannelName: channel.Name,
- AddedModels: addedModels,
- RemovedModels: removedModels,
- RemainingModels: remainingModels,
- RemainingRemoveModels: remainingRemoveModels,
- })
- }
- if len(channels) < channelUpstreamModelUpdateTaskBatchSize {
- break
- }
- }
- if refreshNeeded {
- refreshChannelRuntimeCache()
- }
- c.JSON(http.StatusOK, gin.H{
- "success": true,
- "message": "",
- "data": gin.H{
- "processed_channels": len(results),
- "added_models": addedModelCount,
- "removed_models": removedModelCount,
- "failed_channel_ids": failed,
- "results": results,
- },
- })
- }
- func DetectAllChannelUpstreamModelUpdates(c *gin.Context) {
- results := make([]detectChannelUpstreamModelUpdatesResult, 0)
- failed := make([]int, 0)
- detectedAddCount := 0
- detectedRemoveCount := 0
- refreshNeeded := false
- lastID := 0
- for {
- channels, err := findEnabledChannelsAfterID(lastID, channelUpstreamModelUpdateTaskBatchSize)
- if err != nil {
- common.ApiError(c, err)
- return
- }
- if len(channels) == 0 {
- break
- }
- lastID = channels[len(channels)-1].Id
- for _, channel := range channels {
- if channel == nil {
- continue
- }
- settings := channel.GetOtherSettings()
- if !settings.UpstreamModelUpdateCheckEnabled {
- continue
- }
- modelsChanged, autoAdded, err := checkAndPersistChannelUpstreamModelUpdates(channel, &settings, true, false)
- if err != nil {
- failed = append(failed, channel.Id)
- continue
- }
- if modelsChanged {
- refreshNeeded = true
- }
- addModels := normalizeModelNames(settings.UpstreamModelUpdateLastDetectedModels)
- removeModels := normalizeModelNames(settings.UpstreamModelUpdateLastRemovedModels)
- detectedAddCount += len(addModels)
- detectedRemoveCount += len(removeModels)
- results = append(results, detectChannelUpstreamModelUpdatesResult{
- ChannelID: channel.Id,
- ChannelName: channel.Name,
- AddModels: addModels,
- RemoveModels: removeModels,
- LastCheckTime: settings.UpstreamModelUpdateLastCheckTime,
- AutoAddedModels: autoAdded,
- })
- }
- if len(channels) < channelUpstreamModelUpdateTaskBatchSize {
- break
- }
- }
- if refreshNeeded {
- refreshChannelRuntimeCache()
- }
- c.JSON(http.StatusOK, gin.H{
- "success": true,
- "message": "",
- "data": gin.H{
- "processed_channels": len(results),
- "failed_channel_ids": failed,
- "detected_add_models": detectedAddCount,
- "detected_remove_models": detectedRemoveCount,
- "channel_detected_results": results,
- },
- })
- }
|