|
@@ -0,0 +1,94 @@
|
|
|
+package com.tzld.piaoquan.recommend.server.dataloader;
|
|
|
+
|
|
|
+import com.aliyun.odps.data.Record;
|
|
|
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
|
|
|
+import com.google.common.collect.ListMultimap;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.base.*;
|
|
|
+import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
|
|
|
+import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.score.feature.VlogShareLRFeatureExtractor;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+
|
|
|
+public class OfflineShareSamplesLoader {
|
|
|
+
|
|
|
+ private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();
|
|
|
+ static {
|
|
|
+ ODPS_CONFIG.put("ENDPOINT", "http://service.cn.maxcompute.aliyun.com/api");
|
|
|
+ ODPS_CONFIG.put("ACCESSID", "LTAIWYUujJAm7CbH");
|
|
|
+ ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
+ // 单条日志处理逻辑
|
|
|
+ public static String singleParse(Record record, String labelName) {
|
|
|
+ // 数据解析
|
|
|
+ String label = record.getString(labelName);
|
|
|
+ if(label == null){
|
|
|
+ label = "0";
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从sql的 record中 初始化对象内容
|
|
|
+ RequestContext requestContext = FeatureConstructor.constructRequestContext(record);
|
|
|
+ UserFeature userFeature = FeatureConstructor.constructUserFeature(record);
|
|
|
+ ItemFeature itemFeature = FeatureConstructor.constructItemFeature(record);
|
|
|
+
|
|
|
+ // 转化成bytes
|
|
|
+ RequestContextBytesFeature requestContextBytesFeature = new RequestContextBytesFeature(requestContext);
|
|
|
+ UserBytesFeature userBytesFeature = new UserBytesFeature(userFeature);
|
|
|
+ VideoBytesFeature videoBytesFeature = new VideoBytesFeature(itemFeature);
|
|
|
+
|
|
|
+ // 特征抽取
|
|
|
+ VlogShareLRFeatureExtractor bytesFeatureExtractor;
|
|
|
+ bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
|
|
|
+
|
|
|
+ bytesFeatureExtractor.getUserFeatures(userBytesFeature);
|
|
|
+ bytesFeatureExtractor.getItemFeature(videoBytesFeature);
|
|
|
+ bytesFeatureExtractor.getContextFeatures(requestContextBytesFeature);
|
|
|
+
|
|
|
+ ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();
|
|
|
+ return parseSamplesToString(label, featureMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 构建样本的字符串
|
|
|
+ public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
|
|
|
+ ArrayList<String> featureList = new ArrayList<String>();
|
|
|
+ for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
|
|
|
+ FeatureGroup groupedFeature = entry.getKey();
|
|
|
+ BaseFeature baseFeature = entry.getValue();
|
|
|
+ Long featureIdentifier = baseFeature.getIdentifier();
|
|
|
+ featureList.add(String.valueOf(featureIdentifier) + ":1");
|
|
|
+ }
|
|
|
+ return label + "\t" + String.join("\t", featureList);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 主处理逻辑
|
|
|
+ public static void mutiplyParser(String table, String startDay, String endDay, String labelName) {
|
|
|
+ String sql = String.format("select * from %s apptype != '13' and dt >='%s' and dt <='%s';", table, startDay, endDay);
|
|
|
+ TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql);
|
|
|
+ Record record;
|
|
|
+ try {
|
|
|
+ while ((record = reader.read()) != null) {
|
|
|
+ String samples = singleParse(record, labelName);
|
|
|
+ System.out.println(samples);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public static void main(String[] args) {
|
|
|
+ if(args.length < 2){
|
|
|
+ System.out.println("--------args 缺失---------");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ OfflineShareSamplesLoader.mutiplyParser(args[0], args[1], args[2], args[3]);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|