zhangbo hai 1 ano
pai
achega
382916607c

+ 13 - 6
src/main/java/examples/dataloader/RequestContextOffline.java

@@ -3,8 +3,11 @@ package examples.dataloader;
 import com.tzld.piaoquan.recommend.feature.domain.video.base.RequestContext;
 import com.aliyun.odps.data.Record;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class RequestContextOffline extends RequestContext {
     public Map<String, Object> featureMap = new HashMap<>();
@@ -49,19 +52,23 @@ public class RequestContextOffline extends RequestContext {
         setKVinMap(record, "ctx_city", "string","");
     }
     public void setKVinMap(Record record, String key, String instance, String cntOrRate){
-        if (record.getString(key) == null){
+        if (!Arrays.stream(record.getColumns()).map(r-> r.getName()).collect(Collectors.toSet()).contains(key)){
+            return;
+        }
+        String value = record.getString(key);
+        if (value == null){
             return;
         }
         String ins = instance.toLowerCase();
         switch (ins){
             case "string":
-                featureMap.put(key, record.getString(key));
+                featureMap.put(key, value);
                 return;
             case "double":
                 if ("cnt".equals(cntOrRate)){
-                    featureMap.put(key, this.bucketRatioFeature(Double.valueOf(record.getString(key))));
+                    featureMap.put(key, this.bucketRatioFeature(Double.valueOf(value)));
                 }else if ("rate".equals(cntOrRate)){
-                    featureMap.put(key, this.ceilLog(Double.valueOf(record.getString(key))));
+                    featureMap.put(key, this.ceilLog(Double.valueOf(value)));
                 }
                 return;
             case "int":
@@ -74,11 +81,11 @@ public class RequestContextOffline extends RequestContext {
     }
 
 
-    private double ceilLog(Double key) {
+    public double ceilLog(Double key) {
         return Math.ceil(Math.log(key + 1.0));
     }
 
-    private double bucketRatioFeature(Double key) {
+    public double bucketRatioFeature(Double key) {
         long bucket = Math.round(Math.log((key + 1.0) * 50.0));
         if (bucket > 50L) {
             bucket = 50L;

+ 14 - 5
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -133,16 +133,25 @@ object makedata_02_writeredis {
     val videoid = record.getBigint(videoKey).toString
     val reqContext: RequestContextOffline = new RequestContextOffline()
 
-    //todo 有特征不在表里 临时修复
-    record.setString("i_title_len", if (record.getString("title") != null) record.getString("title").length.toString else "")
-    if (record.getDatetime("gmt_create") != null){
+    //---------todo 有特征不在表里 临时修复---------
+    val i_title_len =  if (record.getString("title") != null) record.getString("title").length.toString else ""
+    val i_days_since_upload = if (record.getDatetime("gmt_create") != null){
       val format = new SimpleDateFormat("yyyyMMdd")
       val dateOld = format.format(record.getDatetime("gmt_create"))
       val dayDiff = MyDateUtils.calculateDateDifference(dateOld, date)
-      record.setString("i_days_since_upload", dayDiff.toString)
+      dayDiff.toString
     }else{
-      record.setString("i_days_since_upload", "")
+      ""
     }
+    if (i_title_len.nonEmpty){
+      reqContext.featureMap.put("i_title_len", reqContext.bucketRatioFeature(i_title_len.toDouble))
+    }
+    if (i_days_since_upload.nonEmpty) {
+      reqContext.featureMap.put("i_days_since_upload", reqContext.bucketRatioFeature(i_days_since_upload.toDouble))
+    }
+    //------修复完成---------
+
+
 
 
     reqContext.putItemFeature(record)