|
@@ -5,6 +5,8 @@ import com.aliyun.odps.TableSchema
|
|
|
import com.aliyun.odps.data.Record
|
|
|
import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
|
import examples.extractor.v20250218.ExtractFeature20250218
|
|
|
+import examples.extractor.ExtractorUtils
|
|
|
+import examples.utils.SimilarityUtils
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
import org.xm.Similarity
|
|
@@ -51,102 +53,110 @@ object makedata_recsys_41_originData_20250218 {
|
|
|
partition = partition,
|
|
|
transfer = func,
|
|
|
numPartition = tablePart)
|
|
|
- .map(record => {
|
|
|
-
|
|
|
- val featureMap = new JSONObject()
|
|
|
- val vid = if (record.isNull("vid")) "" else record.getString("vid")
|
|
|
-
|
|
|
- // a 视频特征
|
|
|
- val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else JSON.parseObject(record.getString("b1_feature"))
|
|
|
- val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else JSON.parseObject(record.getString("b2_feature"))
|
|
|
- val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else JSON.parseObject(record.getString("b3_feature"))
|
|
|
- val b4: JSONObject = if (record.isNull("b4_feature")) new JSONObject() else JSON.parseObject(record.getString("b3_feature"))
|
|
|
- val b5: JSONObject = if (record.isNull("b5_feature")) new JSONObject() else JSON.parseObject(record.getString("b3_feature"))
|
|
|
- val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else JSON.parseObject(record.getString("b6_feature"))
|
|
|
- val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else JSON.parseObject(record.getString("b7_feature"))
|
|
|
- val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else JSON.parseObject(record.getString("b8_feature"))
|
|
|
- val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else JSON.parseObject(record.getString("b9_feature"))
|
|
|
- val b10: JSONObject = if (record.isNull("b10_feature")) new JSONObject() else JSON.parseObject(record.getString("b10_feature"))
|
|
|
- val b11: JSONObject = if (record.isNull("b11_feature")) new JSONObject() else JSON.parseObject(record.getString("b11_feature"))
|
|
|
- val b12: JSONObject = if (record.isNull("b12_feature")) new JSONObject() else JSON.parseObject(record.getString("b12_feature"))
|
|
|
- val b13: JSONObject = if (record.isNull("b13_feature")) new JSONObject() else JSON.parseObject(record.getString("b13_feature"))
|
|
|
-
|
|
|
- // 用户特征
|
|
|
- val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else JSON.parseObject(record.getString("c1_feature"))
|
|
|
- val c2: JSONObject = if (record.isNull("c2_feature")) new JSONObject() else JSON.parseObject(record.getString("c2_feature"))
|
|
|
- val c3: JSONObject = if (record.isNull("c3_feature")) new JSONObject() else JSON.parseObject(record.getString("c3_feature"))
|
|
|
- val c4: JSONObject = if (record.isNull("c4_feature")) new JSONObject() else JSON.parseObject(record.getString("c3_feature"))
|
|
|
- val c5: JSONObject = if (record.isNull("c5_feature")) new JSONObject() else JSON.parseObject(record.getString("c3_feature"))
|
|
|
- val c6: JSONObject = if (record.isNull("c6_feature")) new JSONObject() else JSON.parseObject(record.getString("c6_feature"))
|
|
|
- val c7: JSONObject = if (record.isNull("c7_feature")) new JSONObject() else JSON.parseObject(record.getString("c7_feature"))
|
|
|
- val c8: JSONObject = if (record.isNull("c8_feature")) new JSONObject() else JSON.parseObject(record.getString("c8_feature"))
|
|
|
-
|
|
|
- // 视频基础信息 v1-待推荐视频,v2-头部视频
|
|
|
- val v1: JSONObject = if (record.isNull("v1_feature")) new JSONObject() else JSON.parseObject(record.getString("v1_feature"))
|
|
|
- val v2: JSONObject = if (record.isNull("v2_feature")) new JSONObject() else JSON.parseObject(record.getString("v2_feature"))
|
|
|
-
|
|
|
- // CF特征
|
|
|
- val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else JSON.parseObject(record.getString("d1_feature"))
|
|
|
- val d2: JSONObject = if (record.isNull("d2_feature")) new JSONObject() else JSON.parseObject(record.getString("d2_feature"))
|
|
|
- val d3: JSONObject = if (record.isNull("d3_feature")) new JSONObject() else JSON.parseObject(record.getString("d3_feature"))
|
|
|
-
|
|
|
- val bFeatureMap = new util.HashMap[String, util.Map[String, Object]]();
|
|
|
- bFeatureMap.put("b1", b1);
|
|
|
- bFeatureMap.put("b2", b2);
|
|
|
- bFeatureMap.put("b3", b3);
|
|
|
- bFeatureMap.put("b4", b4);
|
|
|
- bFeatureMap.put("b5", b5);
|
|
|
- bFeatureMap.put("b6", b6);
|
|
|
- bFeatureMap.put("b7", b7);
|
|
|
- bFeatureMap.put("b8", b8);
|
|
|
- bFeatureMap.put("b9", b9);
|
|
|
- bFeatureMap.put("b10", b10);
|
|
|
- bFeatureMap.put("b11", b11);
|
|
|
- bFeatureMap.put("b12", b12);
|
|
|
- bFeatureMap.put("b13", b13);
|
|
|
-
|
|
|
- ExtractFeature20250218.handleB1ToB13(bFeatureMap, featureMap);
|
|
|
- ExtractFeature20250218.handleC1(c1, featureMap)
|
|
|
- ExtractFeature20250218.handleC2ToC3(c2, c3, featureMap)
|
|
|
- ExtractFeature20250218.handleC4(c4, featureMap)
|
|
|
- ExtractFeature20250218.handleC5ToC6(c5, c6, v1, featureMap)
|
|
|
-
|
|
|
- val c67Map = ExtractFeature20250218.handleC6ToC7(c6, c7)
|
|
|
- ExtractFeature20250218.useC6ToC7(c67Map, vid, featureMap)
|
|
|
-
|
|
|
- ExtractFeature20250218.handleD1(d1, featureMap)
|
|
|
- ExtractFeature20250218.handleD2(d2, featureMap)
|
|
|
- ExtractFeature20250218.handleD3(d3, featureMap)
|
|
|
- ExtractFeature20250218.handleVideoBasicFeature(v1, featureMap)
|
|
|
- ExtractFeature20250218.handleVideoSimilarity(v1, v2, featureMap)
|
|
|
-
|
|
|
- //4 处理label信息。
|
|
|
- val labels = new JSONObject
|
|
|
- for (labelKey <- List(
|
|
|
- "is_share", "share_cnt",
|
|
|
- "is_return_1", "return_1_uv",
|
|
|
- "is_return_n", "return_n_uv",
|
|
|
- "is_return_noself", "return_1_uv_noself",
|
|
|
- "is_return_n_noself", "return_n_uv_noself"
|
|
|
- )) {
|
|
|
- if (!record.isNull(labelKey)) {
|
|
|
- labels.put(labelKey, record.getString(labelKey))
|
|
|
+ .mapPartitions(p => {
|
|
|
+ SimilarityUtils.init()
|
|
|
+ p.map(record => {
|
|
|
+
|
|
|
+ val featureMap = new JSONObject()
|
|
|
+ val vid = if (record.isNull("vid")) "" else record.getString("vid")
|
|
|
+ // vid 已经提取了
|
|
|
+ val ts = record.getString("ts").toLong
|
|
|
+ // a 视频特征
|
|
|
+ val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else JSON.parseObject(record.getString("b1_feature"))
|
|
|
+ val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else JSON.parseObject(record.getString("b2_feature"))
|
|
|
+ val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else JSON.parseObject(record.getString("b3_feature"))
|
|
|
+ val b4: JSONObject = if (record.isNull("b4_feature")) new JSONObject() else JSON.parseObject(record.getString("b3_feature"))
|
|
|
+ val b5: JSONObject = if (record.isNull("b5_feature")) new JSONObject() else JSON.parseObject(record.getString("b3_feature"))
|
|
|
+ val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else JSON.parseObject(record.getString("b6_feature"))
|
|
|
+ val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else JSON.parseObject(record.getString("b7_feature"))
|
|
|
+ val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else JSON.parseObject(record.getString("b8_feature"))
|
|
|
+ val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else JSON.parseObject(record.getString("b9_feature"))
|
|
|
+ val b10: JSONObject = if (record.isNull("b10_feature")) new JSONObject() else JSON.parseObject(record.getString("b10_feature"))
|
|
|
+ val b11: JSONObject = if (record.isNull("b11_feature")) new JSONObject() else JSON.parseObject(record.getString("b11_feature"))
|
|
|
+ val b12: JSONObject = if (record.isNull("b12_feature")) new JSONObject() else JSON.parseObject(record.getString("b12_feature"))
|
|
|
+ val b13: JSONObject = if (record.isNull("b13_feature")) new JSONObject() else JSON.parseObject(record.getString("b13_feature"))
|
|
|
+
|
|
|
+ // 用户特征
|
|
|
+ val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else JSON.parseObject(record.getString("c1_feature"))
|
|
|
+ val c2: JSONObject = if (record.isNull("c2_feature")) new JSONObject() else JSON.parseObject(record.getString("c2_feature"))
|
|
|
+ val c3: JSONObject = if (record.isNull("c3_feature")) new JSONObject() else JSON.parseObject(record.getString("c3_feature"))
|
|
|
+ val c4: JSONObject = if (record.isNull("c4_feature")) new JSONObject() else JSON.parseObject(record.getString("c3_feature"))
|
|
|
+ val c5: JSONObject = if (record.isNull("c5_feature")) new JSONObject() else JSON.parseObject(record.getString("c3_feature"))
|
|
|
+ val c6: JSONObject = if (record.isNull("c6_feature")) new JSONObject() else JSON.parseObject(record.getString("c6_feature"))
|
|
|
+ val c7: JSONObject = if (record.isNull("c7_feature")) new JSONObject() else JSON.parseObject(record.getString("c7_feature"))
|
|
|
+ val c8: JSONObject = if (record.isNull("c8_feature")) new JSONObject() else JSON.parseObject(record.getString("c8_feature"))
|
|
|
+
|
|
|
+ // 视频基础信息 v1-待推荐视频,v2-头部视频
|
|
|
+ val v1: JSONObject = if (record.isNull("v1_feature")) new JSONObject() else JSON.parseObject(record.getString("v1_feature"))
|
|
|
+ val v2: JSONObject = if (record.isNull("v2_feature")) new JSONObject() else JSON.parseObject(record.getString("v2_feature"))
|
|
|
+
|
|
|
+ // CF特征
|
|
|
+ val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else JSON.parseObject(record.getString("d1_feature"))
|
|
|
+ val d2: JSONObject = if (record.isNull("d2_feature")) new JSONObject() else JSON.parseObject(record.getString("d2_feature"))
|
|
|
+ val d3: JSONObject = if (record.isNull("d3_feature")) new JSONObject() else JSON.parseObject(record.getString("d3_feature"))
|
|
|
+
|
|
|
+ val bFeatureMap = new util.HashMap[String, util.Map[String, Object]]();
|
|
|
+ bFeatureMap.put("b1", b1);
|
|
|
+ bFeatureMap.put("b2", b2);
|
|
|
+ bFeatureMap.put("b3", b3);
|
|
|
+ bFeatureMap.put("b4", b4);
|
|
|
+ bFeatureMap.put("b5", b5);
|
|
|
+ bFeatureMap.put("b6", b6);
|
|
|
+ bFeatureMap.put("b7", b7);
|
|
|
+ bFeatureMap.put("b8", b8);
|
|
|
+ bFeatureMap.put("b9", b9);
|
|
|
+ bFeatureMap.put("b10", b10);
|
|
|
+ bFeatureMap.put("b11", b11);
|
|
|
+ bFeatureMap.put("b13", b13);
|
|
|
+
|
|
|
+ ExtractFeature20250218.handleB1ToB11AndB13(bFeatureMap, featureMap);
|
|
|
+ ExtractFeature20250218.handleB12(b12, featureMap)
|
|
|
+ ExtractFeature20250218.handleC1(c1, featureMap)
|
|
|
+ ExtractFeature20250218.handleC2ToC3(c2, c3, featureMap)
|
|
|
+ ExtractFeature20250218.handleC4(c4, featureMap)
|
|
|
+ ExtractFeature20250218.handleC5ToC6(c5, c6, v1, featureMap)
|
|
|
+
|
|
|
+ val c78Map = ExtractFeature20250218.handleC7ToC8(c7, c8)
|
|
|
+ ExtractFeature20250218.useC7ToC8(c78Map, vid, featureMap)
|
|
|
+
|
|
|
+ ExtractFeature20250218.handleD1(d1, featureMap)
|
|
|
+ ExtractFeature20250218.handleD2(d2, featureMap)
|
|
|
+ ExtractFeature20250218.handleD3(d3, featureMap)
|
|
|
+ ExtractFeature20250218.handleVideoBasicFeature(v1, ts, featureMap)
|
|
|
+ ExtractFeature20250218.handleVideoSimilarity(v1, v2, featureMap)
|
|
|
+
|
|
|
+
|
|
|
+ //4 处理label信息。
|
|
|
+ val labels = new JSONObject
|
|
|
+ for (labelKey <- List(
|
|
|
+ "is_share", "share_cnt",
|
|
|
+ "is_return_1", "return_1_uv",
|
|
|
+ "is_return_n", "return_n_uv",
|
|
|
+ "is_return_noself", "return_1_uv_noself",
|
|
|
+ "is_return_n_noself", "return_n_uv_noself"
|
|
|
+ )) {
|
|
|
+ if (!record.isNull(labelKey)) {
|
|
|
+ labels.put(labelKey, record.getString(labelKey))
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- //5 处理log key表头。
|
|
|
- val apptype = record.getString("apptype")
|
|
|
- val pagesource = record.getString("pagesource")
|
|
|
- val mid = record.getString("mid")
|
|
|
- // vid 已经提取了
|
|
|
- val ts = record.getString("ts")
|
|
|
- val abcode = record.getString("abcode")
|
|
|
- val level = if (record.isNull("level")) "0" else record.getString("level")
|
|
|
- val logKey = (apptype, pagesource, mid, vid, ts, abcode, level).productIterator.mkString(",")
|
|
|
- val labelKey = labels.toString()
|
|
|
- val featureKey = featureMap.toString()
|
|
|
- //6 拼接数据,保存。
|
|
|
- logKey + "\t" + labelKey + "\t" + featureKey
|
|
|
|
|
|
+ //5 处理log key表头。
|
|
|
+ val logs = new JSONObject()
|
|
|
+ for (key <- List("apptype", "abcode", "mid", "vid", "page", "recommendpagetype", "level", "ts", "headvideoid")) {
|
|
|
+ if (!record.isNull(key)) {
|
|
|
+ logs.put(key, record.getString(key))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ logs.put("hour", ExtractorUtils.getHourByTimestamp(ts))
|
|
|
+
|
|
|
+ val logKey = logs.toString()
|
|
|
+ val labelKey = labels.toString()
|
|
|
+ val featureKey = featureMap.toString()
|
|
|
+ //6 拼接数据,保存。
|
|
|
+ logKey + "\t" + labelKey + "\t" + featureKey
|
|
|
+
|
|
|
+ })
|
|
|
})
|
|
|
|
|
|
// 4 保存数据到hdfs
|
|
@@ -162,6 +172,7 @@ object makedata_recsys_41_originData_20250218 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
def func(record: Record, schema: TableSchema): Record = {
|
|
|
record
|
|
|
}
|