zhangbo vor 4 Monaten
Ursprung
Commit
cfeb03d925

+ 5 - 0
pom.xml

@@ -181,6 +181,11 @@
             <artifactId>xgboost4j-spark_2.11</artifactId>
             <version>1.1.2</version>
         </dependency>
+        <dependency>
+            <groupId>com.tzld.piaoquan</groupId>
+            <artifactId>recommend-similarity</artifactId>
+            <version>1.0.0</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 22 - 0
src/main/java/examples/utils/PropertiesUtil.java

@@ -0,0 +1,22 @@
+package examples.utils;
+
+import org.springframework.context.EnvironmentAware;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+@Component
+public class PropertiesUtil implements EnvironmentAware {
+
+
+    private static Environment environment;
+
+
+    @Override
+    public void setEnvironment(Environment environment) {
+        this.environment = environment;
+    }
+
+    public static String getString(String name) {
+        return environment.getProperty(name);
+    }
+}

+ 64 - 0
src/main/java/examples/utils/SimilarityUtils.java

@@ -0,0 +1,64 @@
+package examples.utils;
+
+import com.tzld.piaoquan.recommend.similarity.word2vec.Segment;
+import com.tzld.piaoquan.recommend.similarity.word2vec.Word2Vec;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author dyp
+ * 从recommend-server复制过来的 @zhangbo 20241211
+ */
+@Slf4j
+public final class SimilarityUtils {
+
+    private static Word2Vec vec = new Word2Vec();
+
+    private static final AtomicBoolean modelLoaded = new AtomicBoolean(false);
+    private static final AtomicBoolean init = new AtomicBoolean(false);
+
+    private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+    public static void init() {
+        if (init.compareAndSet(false, true)) {
+            Segment.getWords("1");
+            scheduler.scheduleAtFixedRate(() -> {
+                try {
+                    long start = System.currentTimeMillis();
+                    String endpoint = PropertiesUtil.getString("oss.endpoint");
+                    String bucketName = "art-recommend";
+                    String path = "similarity/word2vec/Google_word2vec_zhwiki210720_300d.bin";
+                    String accessKeyId = "LTAIP6x1l3DXfSxm";
+                    String accessKetSecret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon";
+                    Word2Vec temp = new Word2Vec();
+                    temp.loadGoogleModelFromOss(endpoint, bucketName, path, accessKeyId, accessKetSecret);
+                    vec = temp;
+                    long end = System.currentTimeMillis();
+
+                    if (modelLoaded.compareAndSet(false, true)) {
+                        scheduler.shutdown();
+                        log.info("Model loaded successfully cost {}. Scheduled tasks cancelled.", end - start);
+                    }
+
+                } catch (IOException e) {
+                    log.error("loadGoogleModelFromOss error", e);
+                }
+            }, 0, 5, TimeUnit.MINUTES);
+        }
+    }
+
+
+    public static float word2VecSimilarity(String str1, String str2) {
+        List<String> words1 = Segment.getWords(str1);
+        List<String> words2 = Segment.getWords(str2);
+        return vec.sentenceSimilarity(words1, words2);
+    }
+
+
+}

+ 30 - 35
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_61_originData_20241209.scala

@@ -8,7 +8,7 @@ import examples.extractor.RankExtractorFeature_20240530
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.SparkSession
 import org.xm.Similarity
-
+import examples.utils.SimilarityUtils
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 /*
@@ -25,15 +25,16 @@ object makedata_recsys_61_originData_20241209 {
 
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
+
+    val beginStr = param.getOrElse("beginStr", "2024120912")
+    val endStr = param.getOrElse("endStr", "2024120912")
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "alg_recsys_sample_all_v2")
     val tablePart = param.getOrElse("tablePart", "64").toInt
-    val beginStr = param.getOrElse("beginStr", "2023010100")
-    val endStr = param.getOrElse("endStr", "2023010123")
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_origin_data/")
-    val project = param.getOrElse("project", "loghubods")
-    val table = param.getOrElse("table", "XXXX")
     val repartition = param.getOrElse("repartition", "32").toInt
 
-    // 2 读取odps+表信息
+    // 2 odps
     val odpsOps = env.getODPS(sc)
 
     // 3 循环执行数据生产
@@ -49,9 +50,7 @@ object makedata_recsys_61_originData_20241209 {
           transfer = func,
           numPartition = tablePart)
         .map(record => {
-
           val featureMap = new JSONObject()
-
           // a 视频特征
           val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
             JSON.parseObject(record.getString("b1_feature"))
@@ -145,7 +144,7 @@ object makedata_recsys_61_originData_20241209 {
               for (key_time <- List("tags_1d", "tags_3d", "tags_7d")) {
                 val tags = if (c34567.containsKey(key_time)) c34567.getString(key_time) else ""
                 if (!tags.equals("")) {
-                  val (f1, f2, f3, f4) = funcC34567ForTags(tags, title)
+                  val (f1, f2, f3, f4) = funcC34567ForTagsW2V(tags, title)
                   featureMap.put(key_feature + "_" + key_time + "_matchnum", f1)
                   featureMap.put(key_feature + "_" + key_time + "_maxscore", f3)
                   featureMap.put(key_feature + "_" + key_time + "_avgscore", f4)
@@ -187,31 +186,27 @@ object makedata_recsys_61_originData_20241209 {
 
 
           /*
-
-
-          视频:
-          曝光使用pv 分享使用pv 回流使用uv --> 1h 2h 3h 4h 12h 1d 3d 7d
-          STR log(share) ROV log(return) ROV*log(return)
-          40个特征组合
-          整体、整体曝光对应、推荐非冷启root、推荐冷启root、分省份root
-          200个特征值
-
-          视频:
-          视频时长、比特率
-
-          人:
-          播放次数 --> 6h 1d 3d 7d --> 4个
-          带回来的分享pv 回流uv --> 12h 1d 3d 7d --> 8个
-          人+vid-title:
-          播放点/回流点/分享点/累积分享/累积回流 --> 1d 3d 7d --> 匹配数量 语义最高相似度分 语义平均相似度分 --> 45个
-          人+vid-cf
-          基于分享行为/基于回流行为 -->  “分享cf”+”回流点击cf“ 相似分 相似数量 相似rank的倒数 --> 12个
-
-          头部视频:
-          曝光 回流 ROVn 3个特征
-
+          视频特征: 5*6*5 = 240个
+                    曝光使用pv 分享使用pv 回流使用uv --> 1h 2h 3h 4h 12h 1d 3d 7d
+                    STR log(share) ROV log(return) ROV*log(return) ROS
+                    整体、整体曝光对应、推荐非冷启root、推荐冷启root、分省份root
+          视频基础: 2个   视频时长、比特率
+          用户: 4+8 = 12个
+                    播放次数 --> 6h 1d 3d 7d --> 4个
+                    带回来的分享pv 回流uv --> 12h 1d 3d 7d --> 8个
+          人+vid-title:  5*3*3 = 45
+                    播放点/回流点/分享点/累积分享/累积回流 --> 1d 3d 7d --> 匹配数量 语义最高相似度分 语义平均相似度分 --> 45个
+          人+vid-cf: 2*3*3 = 12
+                    基于分享行为/基于回流行为 -->  “分享cf”+”回流点击cf“ 相似分 相似数量 相似rank的倒数 --> 12个
+          头部视频:  3
+                    曝光 回流 ROVn 3个特征
           场景:
-          小时 星期 apptype city province pagesource 机器型号
+                    小时 星期 apptype city province pagesource 机器型号
+          总量:     240+2+12+45+12+3 = 314
+          ---------------------------------------------------------------
+          视频特征:
+                  
+
            */
 
 
@@ -258,7 +253,7 @@ object makedata_recsys_61_originData_20241209 {
     record
   }
 
-  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
+  def funcC34567ForTagsW2V(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
     // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
     val tagsList = tags.split(",")
     var d1 = 0.0
@@ -270,7 +265,7 @@ object makedata_recsys_61_originData_20241209 {
         d1 = d1 + 1.0
         d2.add(tag)
       }
-      val score = Similarity.conceptSimilarity(tag, title)
+      val score = SimilarityUtils.word2VecSimilarity(tag, title)
       d3 = if (score > d3) score else d3
       d4 = d4 + score
     }