channel_affinity.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966
  1. package service
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/pkg/cachex"
  13. "github.com/QuantumNous/new-api/setting/operation_setting"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/samber/hot"
  17. "github.com/tidwall/gjson"
  18. )
  19. const (
  20. ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
  21. ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
  22. ginKeyChannelAffinityMeta = "channel_affinity_meta"
  23. ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
  24. ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
  25. channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
  26. channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
  27. )
  28. var (
  29. channelAffinityCacheOnce sync.Once
  30. channelAffinityCache *cachex.HybridCache[int]
  31. channelAffinityUsageCacheStatsOnce sync.Once
  32. channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
  33. channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
  34. )
  35. type channelAffinityMeta struct {
  36. CacheKey string
  37. TTLSeconds int
  38. RuleName string
  39. SkipRetry bool
  40. ParamTemplate map[string]interface{}
  41. KeySourceType string
  42. KeySourceKey string
  43. KeySourcePath string
  44. KeyHint string
  45. KeyFingerprint string
  46. UsingGroup string
  47. ModelName string
  48. RequestPath string
  49. }
  50. type ChannelAffinityStatsContext struct {
  51. RuleName string
  52. UsingGroup string
  53. KeyFingerprint string
  54. TTLSeconds int64
  55. }
  56. const (
  57. cacheTokenRateModeCachedOverPrompt = "cached_over_prompt"
  58. cacheTokenRateModeCachedOverPromptPlusCached = "cached_over_prompt_plus_cached"
  59. cacheTokenRateModeMixed = "mixed"
  60. )
  61. type ChannelAffinityCacheStats struct {
  62. Enabled bool `json:"enabled"`
  63. Total int `json:"total"`
  64. Unknown int `json:"unknown"`
  65. ByRuleName map[string]int `json:"by_rule_name"`
  66. CacheCapacity int `json:"cache_capacity"`
  67. CacheAlgo string `json:"cache_algo"`
  68. }
  69. func getChannelAffinityCache() *cachex.HybridCache[int] {
  70. channelAffinityCacheOnce.Do(func() {
  71. setting := operation_setting.GetChannelAffinitySetting()
  72. capacity := setting.MaxEntries
  73. if capacity <= 0 {
  74. capacity = 100_000
  75. }
  76. defaultTTLSeconds := setting.DefaultTTLSeconds
  77. if defaultTTLSeconds <= 0 {
  78. defaultTTLSeconds = 3600
  79. }
  80. channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
  81. Namespace: cachex.Namespace(channelAffinityCacheNamespace),
  82. Redis: common.RDB,
  83. RedisEnabled: func() bool {
  84. return common.RedisEnabled && common.RDB != nil
  85. },
  86. RedisCodec: cachex.IntCodec{},
  87. Memory: func() *hot.HotCache[string, int] {
  88. return hot.NewHotCache[string, int](hot.LRU, capacity).
  89. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  90. WithJanitor().
  91. Build()
  92. },
  93. })
  94. })
  95. return channelAffinityCache
  96. }
  97. func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
  98. setting := operation_setting.GetChannelAffinitySetting()
  99. if setting == nil {
  100. return ChannelAffinityCacheStats{
  101. Enabled: false,
  102. Total: 0,
  103. Unknown: 0,
  104. ByRuleName: map[string]int{},
  105. }
  106. }
  107. cache := getChannelAffinityCache()
  108. mainCap, _ := cache.Capacity()
  109. mainAlgo, _ := cache.Algorithm()
  110. rules := setting.Rules
  111. ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
  112. for _, r := range rules {
  113. name := strings.TrimSpace(r.Name)
  114. if name == "" {
  115. continue
  116. }
  117. if !r.IncludeRuleName {
  118. continue
  119. }
  120. ruleByName[name] = r
  121. }
  122. byRuleName := make(map[string]int, len(ruleByName))
  123. for name := range ruleByName {
  124. byRuleName[name] = 0
  125. }
  126. keys, err := cache.Keys()
  127. if err != nil {
  128. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  129. keys = nil
  130. }
  131. total := len(keys)
  132. unknown := 0
  133. for _, k := range keys {
  134. prefix := channelAffinityCacheNamespace + ":"
  135. if !strings.HasPrefix(k, prefix) {
  136. unknown++
  137. continue
  138. }
  139. rest := strings.TrimPrefix(k, prefix)
  140. parts := strings.Split(rest, ":")
  141. if len(parts) < 2 {
  142. unknown++
  143. continue
  144. }
  145. ruleName := parts[0]
  146. rule, ok := ruleByName[ruleName]
  147. if !ok {
  148. unknown++
  149. continue
  150. }
  151. if rule.IncludeModelName {
  152. if len(parts) < 3 {
  153. unknown++
  154. continue
  155. }
  156. }
  157. if rule.IncludeUsingGroup {
  158. minParts := 3
  159. if rule.IncludeModelName {
  160. minParts = 4
  161. }
  162. if len(parts) < minParts {
  163. unknown++
  164. continue
  165. }
  166. }
  167. byRuleName[ruleName]++
  168. }
  169. return ChannelAffinityCacheStats{
  170. Enabled: setting.Enabled,
  171. Total: total,
  172. Unknown: unknown,
  173. ByRuleName: byRuleName,
  174. CacheCapacity: mainCap,
  175. CacheAlgo: mainAlgo,
  176. }
  177. }
  178. func ClearChannelAffinityCacheAll() int {
  179. cache := getChannelAffinityCache()
  180. keys, err := cache.Keys()
  181. if err != nil {
  182. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  183. keys = nil
  184. }
  185. if len(keys) > 0 {
  186. if _, err := cache.DeleteMany(keys); err != nil {
  187. common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
  188. }
  189. }
  190. return len(keys)
  191. }
  192. func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
  193. ruleName = strings.TrimSpace(ruleName)
  194. if ruleName == "" {
  195. return 0, fmt.Errorf("rule_name 不能为空")
  196. }
  197. setting := operation_setting.GetChannelAffinitySetting()
  198. if setting == nil {
  199. return 0, fmt.Errorf("channel_affinity_setting 未初始化")
  200. }
  201. var matchedRule *operation_setting.ChannelAffinityRule
  202. for i := range setting.Rules {
  203. r := &setting.Rules[i]
  204. if strings.TrimSpace(r.Name) != ruleName {
  205. continue
  206. }
  207. matchedRule = r
  208. break
  209. }
  210. if matchedRule == nil {
  211. return 0, fmt.Errorf("未知规则名称")
  212. }
  213. if !matchedRule.IncludeRuleName {
  214. return 0, fmt.Errorf("该规则未启用 include_rule_name,无法按规则清空缓存")
  215. }
  216. cache := getChannelAffinityCache()
  217. deleted, err := cache.DeleteByPrefix(ruleName)
  218. if err != nil {
  219. return 0, err
  220. }
  221. return deleted, nil
  222. }
  223. func matchAnyRegexCached(patterns []string, s string) bool {
  224. if len(patterns) == 0 || s == "" {
  225. return false
  226. }
  227. for _, pattern := range patterns {
  228. if pattern == "" {
  229. continue
  230. }
  231. re, ok := channelAffinityRegexCache.Load(pattern)
  232. if !ok {
  233. compiled, err := regexp.Compile(pattern)
  234. if err != nil {
  235. continue
  236. }
  237. re = compiled
  238. channelAffinityRegexCache.Store(pattern, re)
  239. }
  240. if re.(*regexp.Regexp).MatchString(s) {
  241. return true
  242. }
  243. }
  244. return false
  245. }
  246. func matchAnyIncludeFold(patterns []string, s string) bool {
  247. if len(patterns) == 0 || s == "" {
  248. return false
  249. }
  250. sLower := strings.ToLower(s)
  251. for _, p := range patterns {
  252. p = strings.TrimSpace(p)
  253. if p == "" {
  254. continue
  255. }
  256. if strings.Contains(sLower, strings.ToLower(p)) {
  257. return true
  258. }
  259. }
  260. return false
  261. }
  262. func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
  263. switch src.Type {
  264. case "context_int":
  265. if src.Key == "" {
  266. return ""
  267. }
  268. v := c.GetInt(src.Key)
  269. if v <= 0 {
  270. return ""
  271. }
  272. return strconv.Itoa(v)
  273. case "context_string":
  274. if src.Key == "" {
  275. return ""
  276. }
  277. return strings.TrimSpace(c.GetString(src.Key))
  278. case "gjson":
  279. if src.Path == "" {
  280. return ""
  281. }
  282. storage, err := common.GetBodyStorage(c)
  283. if err != nil {
  284. return ""
  285. }
  286. body, err := storage.Bytes()
  287. if err != nil || len(body) == 0 {
  288. return ""
  289. }
  290. res := gjson.GetBytes(body, src.Path)
  291. if !res.Exists() {
  292. return ""
  293. }
  294. switch res.Type {
  295. case gjson.String, gjson.Number, gjson.True, gjson.False:
  296. return strings.TrimSpace(res.String())
  297. default:
  298. return strings.TrimSpace(res.Raw)
  299. }
  300. default:
  301. return ""
  302. }
  303. }
  304. func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, modelName string, usingGroup string, affinityValue string) string {
  305. parts := make([]string, 0, 4)
  306. if rule.IncludeRuleName && rule.Name != "" {
  307. parts = append(parts, rule.Name)
  308. }
  309. if rule.IncludeModelName && modelName != "" {
  310. parts = append(parts, modelName)
  311. }
  312. if rule.IncludeUsingGroup && usingGroup != "" {
  313. parts = append(parts, usingGroup)
  314. }
  315. parts = append(parts, affinityValue)
  316. return strings.Join(parts, ":")
  317. }
  318. func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
  319. c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
  320. c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
  321. c.Set(ginKeyChannelAffinityMeta, meta)
  322. }
  323. func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
  324. keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
  325. if !ok {
  326. return "", 0, false
  327. }
  328. key, ok := keyAny.(string)
  329. if !ok || key == "" {
  330. return "", 0, false
  331. }
  332. ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
  333. if !ok {
  334. return key, 0, true
  335. }
  336. ttlSeconds, _ := ttlAny.(int)
  337. return key, ttlSeconds, true
  338. }
  339. func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
  340. anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
  341. if !ok {
  342. return channelAffinityMeta{}, false
  343. }
  344. meta, ok := anyMeta.(channelAffinityMeta)
  345. if !ok {
  346. return channelAffinityMeta{}, false
  347. }
  348. return meta, true
  349. }
  350. func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
  351. if c == nil {
  352. return ChannelAffinityStatsContext{}, false
  353. }
  354. meta, ok := getChannelAffinityMeta(c)
  355. if !ok {
  356. return ChannelAffinityStatsContext{}, false
  357. }
  358. ruleName := strings.TrimSpace(meta.RuleName)
  359. keyFp := strings.TrimSpace(meta.KeyFingerprint)
  360. usingGroup := strings.TrimSpace(meta.UsingGroup)
  361. if ruleName == "" || keyFp == "" {
  362. return ChannelAffinityStatsContext{}, false
  363. }
  364. ttlSeconds := int64(meta.TTLSeconds)
  365. if ttlSeconds <= 0 {
  366. return ChannelAffinityStatsContext{}, false
  367. }
  368. return ChannelAffinityStatsContext{
  369. RuleName: ruleName,
  370. UsingGroup: usingGroup,
  371. KeyFingerprint: keyFp,
  372. TTLSeconds: ttlSeconds,
  373. }, true
  374. }
  375. func affinityFingerprint(s string) string {
  376. if s == "" {
  377. return ""
  378. }
  379. hex := common.Sha1([]byte(s))
  380. if len(hex) >= 8 {
  381. return hex[:8]
  382. }
  383. return hex
  384. }
  385. func buildChannelAffinityKeyHint(s string) string {
  386. s = strings.TrimSpace(s)
  387. if s == "" {
  388. return ""
  389. }
  390. s = strings.ReplaceAll(s, "\n", " ")
  391. s = strings.ReplaceAll(s, "\r", " ")
  392. if len(s) <= 12 {
  393. return s
  394. }
  395. return s[:4] + "..." + s[len(s)-4:]
  396. }
  397. func cloneStringAnyMap(src map[string]interface{}) map[string]interface{} {
  398. if len(src) == 0 {
  399. return map[string]interface{}{}
  400. }
  401. dst := make(map[string]interface{}, len(src))
  402. for k, v := range src {
  403. dst[k] = v
  404. }
  405. return dst
  406. }
  407. func mergeChannelOverride(base map[string]interface{}, tpl map[string]interface{}) map[string]interface{} {
  408. if len(base) == 0 && len(tpl) == 0 {
  409. return map[string]interface{}{}
  410. }
  411. if len(tpl) == 0 {
  412. return base
  413. }
  414. out := cloneStringAnyMap(base)
  415. for k, v := range tpl {
  416. if strings.EqualFold(strings.TrimSpace(k), "operations") {
  417. baseOps, hasBaseOps := extractParamOperations(out[k])
  418. tplOps, hasTplOps := extractParamOperations(v)
  419. if hasTplOps {
  420. if hasBaseOps {
  421. out[k] = append(tplOps, baseOps...)
  422. } else {
  423. out[k] = tplOps
  424. }
  425. continue
  426. }
  427. }
  428. if _, exists := out[k]; exists {
  429. continue
  430. }
  431. out[k] = v
  432. }
  433. return out
  434. }
  435. func extractParamOperations(value interface{}) ([]interface{}, bool) {
  436. switch ops := value.(type) {
  437. case []interface{}:
  438. if len(ops) == 0 {
  439. return []interface{}{}, true
  440. }
  441. cloned := make([]interface{}, 0, len(ops))
  442. cloned = append(cloned, ops...)
  443. return cloned, true
  444. case []map[string]interface{}:
  445. cloned := make([]interface{}, 0, len(ops))
  446. for _, op := range ops {
  447. cloned = append(cloned, op)
  448. }
  449. return cloned, true
  450. default:
  451. return nil, false
  452. }
  453. }
  454. func appendChannelAffinityTemplateAdminInfo(c *gin.Context, meta channelAffinityMeta) {
  455. if c == nil {
  456. return
  457. }
  458. if len(meta.ParamTemplate) == 0 {
  459. return
  460. }
  461. templateInfo := map[string]interface{}{
  462. "applied": true,
  463. "rule_name": meta.RuleName,
  464. "param_override_keys": len(meta.ParamTemplate),
  465. }
  466. if anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo); ok {
  467. if info, ok := anyInfo.(map[string]interface{}); ok {
  468. info["override_template"] = templateInfo
  469. c.Set(ginKeyChannelAffinityLogInfo, info)
  470. return
  471. }
  472. }
  473. c.Set(ginKeyChannelAffinityLogInfo, map[string]interface{}{
  474. "reason": meta.RuleName,
  475. "rule_name": meta.RuleName,
  476. "using_group": meta.UsingGroup,
  477. "model": meta.ModelName,
  478. "request_path": meta.RequestPath,
  479. "key_source": meta.KeySourceType,
  480. "key_key": meta.KeySourceKey,
  481. "key_path": meta.KeySourcePath,
  482. "key_hint": meta.KeyHint,
  483. "key_fp": meta.KeyFingerprint,
  484. "override_template": templateInfo,
  485. })
  486. }
  487. // ApplyChannelAffinityOverrideTemplate merges per-rule channel override templates onto the selected channel override config.
  488. func ApplyChannelAffinityOverrideTemplate(c *gin.Context, paramOverride map[string]interface{}) (map[string]interface{}, bool) {
  489. if c == nil {
  490. return paramOverride, false
  491. }
  492. meta, ok := getChannelAffinityMeta(c)
  493. if !ok {
  494. return paramOverride, false
  495. }
  496. if len(meta.ParamTemplate) == 0 {
  497. return paramOverride, false
  498. }
  499. mergedParam := mergeChannelOverride(paramOverride, meta.ParamTemplate)
  500. appendChannelAffinityTemplateAdminInfo(c, meta)
  501. return mergedParam, true
  502. }
  503. func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
  504. setting := operation_setting.GetChannelAffinitySetting()
  505. if setting == nil || !setting.Enabled {
  506. return 0, false
  507. }
  508. path := ""
  509. if c != nil && c.Request != nil && c.Request.URL != nil {
  510. path = c.Request.URL.Path
  511. }
  512. userAgent := ""
  513. if c != nil && c.Request != nil {
  514. userAgent = c.Request.UserAgent()
  515. }
  516. for _, rule := range setting.Rules {
  517. if !matchAnyRegexCached(rule.ModelRegex, modelName) {
  518. continue
  519. }
  520. if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
  521. continue
  522. }
  523. if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
  524. continue
  525. }
  526. var affinityValue string
  527. var usedSource operation_setting.ChannelAffinityKeySource
  528. for _, src := range rule.KeySources {
  529. affinityValue = extractChannelAffinityValue(c, src)
  530. if affinityValue != "" {
  531. usedSource = src
  532. break
  533. }
  534. }
  535. if affinityValue == "" {
  536. continue
  537. }
  538. if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
  539. continue
  540. }
  541. ttlSeconds := rule.TTLSeconds
  542. if ttlSeconds <= 0 {
  543. ttlSeconds = setting.DefaultTTLSeconds
  544. }
  545. cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, modelName, usingGroup, affinityValue)
  546. cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
  547. setChannelAffinityContext(c, channelAffinityMeta{
  548. CacheKey: cacheKeyFull,
  549. TTLSeconds: ttlSeconds,
  550. RuleName: rule.Name,
  551. SkipRetry: rule.SkipRetryOnFailure,
  552. ParamTemplate: cloneStringAnyMap(rule.ParamOverrideTemplate),
  553. KeySourceType: strings.TrimSpace(usedSource.Type),
  554. KeySourceKey: strings.TrimSpace(usedSource.Key),
  555. KeySourcePath: strings.TrimSpace(usedSource.Path),
  556. KeyHint: buildChannelAffinityKeyHint(affinityValue),
  557. KeyFingerprint: affinityFingerprint(affinityValue),
  558. UsingGroup: usingGroup,
  559. ModelName: modelName,
  560. RequestPath: path,
  561. })
  562. cache := getChannelAffinityCache()
  563. channelID, found, err := cache.Get(cacheKeySuffix)
  564. if err != nil {
  565. common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
  566. return 0, false
  567. }
  568. if found {
  569. return channelID, true
  570. }
  571. return 0, false
  572. }
  573. return 0, false
  574. }
  575. func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
  576. if c == nil {
  577. return false
  578. }
  579. v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
  580. if ok {
  581. b, ok := v.(bool)
  582. if ok {
  583. return b
  584. }
  585. }
  586. meta, ok := getChannelAffinityMeta(c)
  587. if !ok {
  588. return false
  589. }
  590. return meta.SkipRetry
  591. }
  592. func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
  593. if c == nil || channelID <= 0 {
  594. return
  595. }
  596. meta, ok := getChannelAffinityMeta(c)
  597. if !ok {
  598. return
  599. }
  600. c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
  601. info := map[string]interface{}{
  602. "reason": meta.RuleName,
  603. "rule_name": meta.RuleName,
  604. "using_group": meta.UsingGroup,
  605. "selected_group": selectedGroup,
  606. "model": meta.ModelName,
  607. "request_path": meta.RequestPath,
  608. "channel_id": channelID,
  609. "key_source": meta.KeySourceType,
  610. "key_key": meta.KeySourceKey,
  611. "key_path": meta.KeySourcePath,
  612. "key_hint": meta.KeyHint,
  613. "key_fp": meta.KeyFingerprint,
  614. }
  615. c.Set(ginKeyChannelAffinityLogInfo, info)
  616. }
  617. func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
  618. if c == nil || adminInfo == nil {
  619. return
  620. }
  621. anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
  622. if !ok || anyInfo == nil {
  623. return
  624. }
  625. adminInfo["channel_affinity"] = anyInfo
  626. }
  627. func RecordChannelAffinity(c *gin.Context, channelID int) {
  628. if channelID <= 0 {
  629. return
  630. }
  631. setting := operation_setting.GetChannelAffinitySetting()
  632. if setting == nil || !setting.Enabled {
  633. return
  634. }
  635. if setting.SwitchOnSuccess && c != nil {
  636. if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
  637. channelID = successChannelID
  638. }
  639. }
  640. cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
  641. if !ok {
  642. return
  643. }
  644. if ttlSeconds <= 0 {
  645. ttlSeconds = setting.DefaultTTLSeconds
  646. }
  647. if ttlSeconds <= 0 {
  648. ttlSeconds = 3600
  649. }
  650. cache := getChannelAffinityCache()
  651. if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
  652. common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
  653. }
  654. }
  655. type ChannelAffinityUsageCacheStats struct {
  656. RuleName string `json:"rule_name"`
  657. UsingGroup string `json:"using_group"`
  658. KeyFingerprint string `json:"key_fp"`
  659. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  660. Hit int64 `json:"hit"`
  661. Total int64 `json:"total"`
  662. WindowSeconds int64 `json:"window_seconds"`
  663. PromptTokens int64 `json:"prompt_tokens"`
  664. CompletionTokens int64 `json:"completion_tokens"`
  665. TotalTokens int64 `json:"total_tokens"`
  666. CachedTokens int64 `json:"cached_tokens"`
  667. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  668. LastSeenAt int64 `json:"last_seen_at"`
  669. }
  670. type ChannelAffinityUsageCacheCounters struct {
  671. CachedTokenRateMode string `json:"cached_token_rate_mode"`
  672. Hit int64 `json:"hit"`
  673. Total int64 `json:"total"`
  674. WindowSeconds int64 `json:"window_seconds"`
  675. PromptTokens int64 `json:"prompt_tokens"`
  676. CompletionTokens int64 `json:"completion_tokens"`
  677. TotalTokens int64 `json:"total_tokens"`
  678. CachedTokens int64 `json:"cached_tokens"`
  679. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  680. LastSeenAt int64 `json:"last_seen_at"`
  681. }
  682. var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
  683. // ObserveChannelAffinityUsageCacheByRelayFormat records usage cache stats with a stable rate mode derived from relay format.
  684. func ObserveChannelAffinityUsageCacheByRelayFormat(c *gin.Context, usage *dto.Usage, relayFormat types.RelayFormat) {
  685. ObserveChannelAffinityUsageCacheFromContext(c, usage, cachedTokenRateModeByRelayFormat(relayFormat))
  686. }
  687. func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage, cachedTokenRateMode string) {
  688. statsCtx, ok := GetChannelAffinityStatsContext(c)
  689. if !ok {
  690. return
  691. }
  692. observeChannelAffinityUsageCache(statsCtx, usage, cachedTokenRateMode)
  693. }
  694. func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
  695. ruleName = strings.TrimSpace(ruleName)
  696. usingGroup = strings.TrimSpace(usingGroup)
  697. keyFp = strings.TrimSpace(keyFp)
  698. entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
  699. if entryKey == "" {
  700. return ChannelAffinityUsageCacheStats{
  701. RuleName: ruleName,
  702. UsingGroup: usingGroup,
  703. KeyFingerprint: keyFp,
  704. }
  705. }
  706. cache := getChannelAffinityUsageCacheStatsCache()
  707. v, found, err := cache.Get(entryKey)
  708. if err != nil || !found {
  709. return ChannelAffinityUsageCacheStats{
  710. RuleName: ruleName,
  711. UsingGroup: usingGroup,
  712. KeyFingerprint: keyFp,
  713. }
  714. }
  715. return ChannelAffinityUsageCacheStats{
  716. CachedTokenRateMode: v.CachedTokenRateMode,
  717. RuleName: ruleName,
  718. UsingGroup: usingGroup,
  719. KeyFingerprint: keyFp,
  720. Hit: v.Hit,
  721. Total: v.Total,
  722. WindowSeconds: v.WindowSeconds,
  723. PromptTokens: v.PromptTokens,
  724. CompletionTokens: v.CompletionTokens,
  725. TotalTokens: v.TotalTokens,
  726. CachedTokens: v.CachedTokens,
  727. PromptCacheHitTokens: v.PromptCacheHitTokens,
  728. LastSeenAt: v.LastSeenAt,
  729. }
  730. }
  731. func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage, cachedTokenRateMode string) {
  732. entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
  733. if entryKey == "" {
  734. return
  735. }
  736. windowSeconds := statsCtx.TTLSeconds
  737. if windowSeconds <= 0 {
  738. return
  739. }
  740. cache := getChannelAffinityUsageCacheStatsCache()
  741. ttl := time.Duration(windowSeconds) * time.Second
  742. lock := channelAffinityUsageCacheStatsLock(entryKey)
  743. lock.Lock()
  744. defer lock.Unlock()
  745. prev, found, err := cache.Get(entryKey)
  746. if err != nil {
  747. return
  748. }
  749. next := prev
  750. if !found {
  751. next = ChannelAffinityUsageCacheCounters{}
  752. }
  753. currentMode := normalizeCachedTokenRateMode(cachedTokenRateMode)
  754. if currentMode != "" {
  755. if next.CachedTokenRateMode == "" {
  756. next.CachedTokenRateMode = currentMode
  757. } else if next.CachedTokenRateMode != currentMode && next.CachedTokenRateMode != cacheTokenRateModeMixed {
  758. next.CachedTokenRateMode = cacheTokenRateModeMixed
  759. }
  760. }
  761. next.Total++
  762. hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
  763. if hit {
  764. next.Hit++
  765. }
  766. next.WindowSeconds = windowSeconds
  767. next.LastSeenAt = time.Now().Unix()
  768. next.CachedTokens += cachedTokens
  769. next.PromptCacheHitTokens += promptCacheHitTokens
  770. next.PromptTokens += int64(usagePromptTokens(usage))
  771. next.CompletionTokens += int64(usageCompletionTokens(usage))
  772. next.TotalTokens += int64(usageTotalTokens(usage))
  773. _ = cache.SetWithTTL(entryKey, next, ttl)
  774. }
  775. func normalizeCachedTokenRateMode(mode string) string {
  776. switch mode {
  777. case cacheTokenRateModeCachedOverPrompt:
  778. return cacheTokenRateModeCachedOverPrompt
  779. case cacheTokenRateModeCachedOverPromptPlusCached:
  780. return cacheTokenRateModeCachedOverPromptPlusCached
  781. case cacheTokenRateModeMixed:
  782. return cacheTokenRateModeMixed
  783. default:
  784. return ""
  785. }
  786. }
  787. func cachedTokenRateModeByRelayFormat(relayFormat types.RelayFormat) string {
  788. switch relayFormat {
  789. case types.RelayFormatOpenAI, types.RelayFormatOpenAIResponses, types.RelayFormatOpenAIResponsesCompaction:
  790. return cacheTokenRateModeCachedOverPrompt
  791. case types.RelayFormatClaude:
  792. return cacheTokenRateModeCachedOverPromptPlusCached
  793. default:
  794. return ""
  795. }
  796. }
  797. func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
  798. ruleName = strings.TrimSpace(ruleName)
  799. usingGroup = strings.TrimSpace(usingGroup)
  800. keyFp = strings.TrimSpace(keyFp)
  801. if ruleName == "" || keyFp == "" {
  802. return ""
  803. }
  804. return ruleName + "\n" + usingGroup + "\n" + keyFp
  805. }
  806. func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
  807. if usage == nil {
  808. return false, 0, 0
  809. }
  810. cached := int64(0)
  811. if usage.PromptTokensDetails.CachedTokens > 0 {
  812. cached = int64(usage.PromptTokensDetails.CachedTokens)
  813. } else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
  814. cached = int64(usage.InputTokensDetails.CachedTokens)
  815. }
  816. pcht := int64(0)
  817. if usage.PromptCacheHitTokens > 0 {
  818. pcht = int64(usage.PromptCacheHitTokens)
  819. }
  820. return cached > 0 || pcht > 0, cached, pcht
  821. }
  822. func usagePromptTokens(usage *dto.Usage) int {
  823. if usage == nil {
  824. return 0
  825. }
  826. if usage.PromptTokens > 0 {
  827. return usage.PromptTokens
  828. }
  829. return usage.InputTokens
  830. }
  831. func usageCompletionTokens(usage *dto.Usage) int {
  832. if usage == nil {
  833. return 0
  834. }
  835. if usage.CompletionTokens > 0 {
  836. return usage.CompletionTokens
  837. }
  838. return usage.OutputTokens
  839. }
  840. func usageTotalTokens(usage *dto.Usage) int {
  841. if usage == nil {
  842. return 0
  843. }
  844. if usage.TotalTokens > 0 {
  845. return usage.TotalTokens
  846. }
  847. pt := usagePromptTokens(usage)
  848. ct := usageCompletionTokens(usage)
  849. if pt > 0 || ct > 0 {
  850. return pt + ct
  851. }
  852. return 0
  853. }
  854. func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
  855. channelAffinityUsageCacheStatsOnce.Do(func() {
  856. setting := operation_setting.GetChannelAffinitySetting()
  857. capacity := 100_000
  858. defaultTTLSeconds := 3600
  859. if setting != nil {
  860. if setting.MaxEntries > 0 {
  861. capacity = setting.MaxEntries
  862. }
  863. if setting.DefaultTTLSeconds > 0 {
  864. defaultTTLSeconds = setting.DefaultTTLSeconds
  865. }
  866. }
  867. channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
  868. Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
  869. Redis: common.RDB,
  870. RedisEnabled: func() bool {
  871. return common.RedisEnabled && common.RDB != nil
  872. },
  873. RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
  874. Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
  875. return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
  876. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  877. WithJanitor().
  878. Build()
  879. },
  880. })
  881. })
  882. return channelAffinityUsageCacheStatsCache
  883. }
  884. func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
  885. h := fnv.New32a()
  886. _, _ = h.Write([]byte(key))
  887. idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
  888. return &channelAffinityUsageCacheStatsLocks[idx]
  889. }