feature.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package service
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/context"
  7. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/log"
  8. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/module"
  9. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/recconf"
  10. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/service/feature"
  11. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/utils"
  12. )
  13. var featureService *FeatureService
  14. type FeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item
  15. type LoadFeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext)
  16. func init() {
  17. featureService = &FeatureService{
  18. featureSceneMap: make(map[string][]*feature.Feature),
  19. featureSceneAsyncMap: make(map[string]bool),
  20. featureFuncMap: make(map[string]FeatureFunc),
  21. loadFeatureFuncMap: make(map[string]LoadFeatureFunc),
  22. }
  23. }
  24. func DefaultFeatureService() *FeatureService {
  25. return featureService
  26. }
  27. func RegisterFeatureFunc(sceneName string, f FeatureFunc) {
  28. DefaultFeatureService().AddFeatureFunc(sceneName, f)
  29. }
  30. func RegisterLoadFeatureFunc(sceneName string, f LoadFeatureFunc) {
  31. DefaultFeatureService().AddLoadFeatureFunc(sceneName, f)
  32. }
  33. type FeatureService struct {
  34. featureSceneMap map[string][]*feature.Feature
  35. featureSceneAsyncMap map[string]bool
  36. featureFuncMap map[string]FeatureFunc
  37. loadFeatureFuncMap map[string]LoadFeatureFunc
  38. }
  39. func (s *FeatureService) AddFeatureFunc(sceneName string, f FeatureFunc) {
  40. s.featureFuncMap[sceneName] = f
  41. }
  42. func (s *FeatureService) AddLoadFeatureFunc(sceneName string, f LoadFeatureFunc) {
  43. s.loadFeatureFuncMap[sceneName] = f
  44. }
  45. // LoadFeatures load user or item feature use feature.Feature
  46. func (s *FeatureService) LoadFeatures(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item {
  47. start := time.Now()
  48. var sceneName string
  49. if context.ExperimentResult != nil {
  50. sceneName = context.ExperimentResult.GetExperimentParams().GetString("features.scene.name", "")
  51. }
  52. if sceneName == "" {
  53. sceneName = context.GetParameter("scene").(string)
  54. }
  55. if features, ok := s.featureSceneMap[sceneName]; ok {
  56. async := s.featureSceneAsyncMap[sceneName]
  57. if async {
  58. var wg sync.WaitGroup
  59. for _, fea := range features {
  60. wg.Add(1)
  61. go func(fea *feature.Feature) {
  62. defer wg.Done()
  63. fea.LoadFeatures(user, items, context)
  64. }(fea)
  65. }
  66. if loadFeatureFunc, exist := s.loadFeatureFuncMap[sceneName]; exist {
  67. wg.Add(1)
  68. go func() {
  69. defer wg.Done()
  70. loadFeatureFunc(user, items, context)
  71. }()
  72. }
  73. wg.Wait()
  74. } else {
  75. for _, feature := range features {
  76. feature.LoadFeatures(user, items, context)
  77. }
  78. if loadFeatureFunc, exist := s.loadFeatureFuncMap[sceneName]; exist {
  79. loadFeatureFunc(user, items, context)
  80. }
  81. }
  82. featureFunc, ok := s.featureFuncMap[sceneName]
  83. if ok {
  84. items = featureFunc(user, items, context)
  85. }
  86. }
  87. log.Info(fmt.Sprintf("requestId=%s\tmodule=LoadFeatures\tcost=%d", context.RecommendId, utils.CostTime(start)))
  88. return items
  89. }
  90. func (s *FeatureService) LoadFeaturesForGeneralRank(user *module.User, items []*module.Item, context *context.RecommendContext) {
  91. start := time.Now()
  92. sceneName := context.GetParameter("scene").(string)
  93. if features, ok := s.featureSceneMap[sceneName]; ok {
  94. async := s.featureSceneAsyncMap[sceneName]
  95. if async {
  96. var wg sync.WaitGroup
  97. for _, fea := range features {
  98. wg.Add(1)
  99. go func(fea *feature.Feature) {
  100. defer wg.Done()
  101. fea.LoadFeatures(user, items, context)
  102. }(fea)
  103. }
  104. wg.Wait()
  105. } else {
  106. for _, feature := range features {
  107. feature.LoadFeatures(user, items, context)
  108. }
  109. }
  110. // featureFunc, ok := s.featureFuncMap[sceneName]
  111. // if ok {
  112. // featureFunc(user, items, context)
  113. // }
  114. }
  115. log.Info(fmt.Sprintf("requestId=%s\tmodule=LoadFeaturesForGeneralRank\tcost=%d", context.RecommendId, utils.CostTime(start)))
  116. }
  117. func LoadFeatureConfig(config *recconf.RecommendConfig) {
  118. for name, sceneConf := range config.FeatureConfs {
  119. var features []*feature.Feature
  120. for _, conf := range sceneConf.FeatureLoadConfs {
  121. f := feature.LoadWithConfig(conf)
  122. features = append(features, f)
  123. }
  124. featureService.featureSceneAsyncMap[name] = false
  125. if sceneConf.AsynLoadFeature {
  126. featureService.featureSceneAsyncMap[name] = true
  127. }
  128. featureService.featureSceneMap[name] = features
  129. }
  130. }