فهرست منبع

广告ctr模型特征抽取 1213

sunmingze 1 سال پیش
والد
کامیت
3dae11b0b0

+ 6 - 1
pom.xml

@@ -39,13 +39,18 @@
             <version>3.12.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.tzld.piaoquan</groupId>
+            <artifactId>ad-engine-commons</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>recommend-server-client</artifactId>
             <version>1.0.1</version>
         </dependency>
 
-
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>

+ 2 - 2
src/main/java/com/tzld/piaoquan/data/dataloader/RedisFeatureConstructor.java → src/main/java/examples/dataloader/AdRedisFeatureConstructor.java

@@ -1,4 +1,4 @@
-package com.tzld.piaoquan.data.dataloader;
+package examples.dataloader;
 
 
 import com.aliyun.odps.account.Account;
@@ -12,7 +12,7 @@ import com.tzld.piaoquan.data.base.UserFeature;
 import java.util.HashMap;
 import java.util.Map;
 
-public class RedisFeatureConstructor {
+public class AdRedisFeatureConstructor {
 
     private static final String BUCKET_NAME = "ali-recommend";
     private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();

+ 141 - 0
src/main/java/examples/dataloader/AdSampleConstructor.java

@@ -0,0 +1,141 @@
+package examples.dataloader;
+
+
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.ad.engine.commons.base.AdActionFeature;
+import com.tzld.piaoquan.ad.engine.commons.base.AdRequestContext;
+import com.tzld.piaoquan.ad.engine.commons.base.UserAdFeature;
+import com.tzld.piaoquan.ad.engine.commons.base.AdItemFeature;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AdSampleConstructor {
+
+    private static final String BUCKET_NAME = "ali-recommend";
+    private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();
+
+    static {
+        ODPS_CONFIG.put("ENDPOINT", "http://service.cn.maxcompute.aliyun.com/api");
+        ODPS_CONFIG.put("ACCESSID", "LTAIWYUujJAm7CbH");
+        ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
+    }
+
+    private static final Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
+
+
+    public static AdRequestContext constructRequestContext(Record record) {
+        AdRequestContext requestContext = new AdRequestContext();
+        requestContext.setApptype(record.getString("apptype"));
+        requestContext.setMachineinfoBrand(record.getString("machineinfo_brand"));
+        requestContext.setMachineinfoModel(record.getString("machineinfo_model"));
+        requestContext.setMachineinfoSdkversion(record.getString("machineinfo_sdkversion"));
+        requestContext.setMachineinfoSdkversion(record.getString("machineinfo_wechatversion"));
+        requestContext.setDay(record.getString("day"));
+        requestContext.setWeek(record.getString("week"));
+        requestContext.setHour(record.getString("hour"));
+        requestContext.setRegion(record.getString("province"));
+        requestContext.setCity(record.getString("city"));
+        return requestContext;
+    }
+
+
+    public static UserAdFeature constructUserFeature(Record record) {
+        UserAdFeature userFeature = new UserAdFeature();
+        userFeature.setMid(record.get("machinecode").toString());
+
+        // 1day features
+        AdActionFeature user1dayActionFeature = new AdActionFeature();
+        user1dayActionFeature.setAdView(record.getString("view_ad_1day"));
+        user1dayActionFeature.setAdClick(record.getString("click_ad_1day"));
+        user1dayActionFeature.setAdConversion(record.getString("conversion_ad_1day"));
+        user1dayActionFeature.setCtr(record.getString("ctr_ad_1day"));
+        user1dayActionFeature.setCvr(record.getString("cvr_ad_1day"));
+        userFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        // 3day features
+        AdActionFeature user3dayActionFeature = new AdActionFeature();
+        user3dayActionFeature.setAdView(record.getString("view_ad_3day"));
+        user3dayActionFeature.setAdClick(record.getString("click_ad_3day"));
+        user3dayActionFeature.setAdConversion(record.getString("conversion_ad_3day"));
+        user3dayActionFeature.setCtr(record.getString("ctr_ad_3day"));
+        user3dayActionFeature.setCvr(record.getString("cvr_ad_3day"));
+        userFeature.setDay3_cnt_features(user3dayActionFeature);
+
+
+        // 7day features
+        AdActionFeature user7dayActionFeature = new AdActionFeature();
+        user7dayActionFeature.setAdView(record.getString("view_ad_7day"));
+        user7dayActionFeature.setAdClick(record.getString("click_ad_7day"));
+        user7dayActionFeature.setAdConversion(record.getString("conversion_ad_7day"));
+        user7dayActionFeature.setCtr(record.getString("ctr_ad_7day"));
+        user7dayActionFeature.setCvr(record.getString("cvr_ad_7day"));
+        userFeature.setDay3_cnt_features(user7dayActionFeature);
+
+        // 3month features
+        AdActionFeature user3MonthActionFeature = new AdActionFeature();
+        user3MonthActionFeature.setAdView(record.getString("view_ad_3month"));
+        user3MonthActionFeature.setAdClick(record.getString("click_ad_3month"));
+        user3MonthActionFeature.setAdConversion(record.getString("conversion_ad_3month"));
+        user3MonthActionFeature.setCtr(record.getString("ctr_ad_3month"));
+        user3MonthActionFeature.setCvr(record.getString("cvr_ad_3month"));
+        userFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        return userFeature;
+    }
+
+
+    public static AdItemFeature constructItemFeature(Record record) {
+        AdItemFeature itemFeature = new AdItemFeature();
+
+
+        itemFeature.setAdId(record.getString("adid"));
+        itemFeature.setAdvertiserId(record.getString("advertiserid"));
+        itemFeature.setCampaignId(record.getString("campaignid"));
+        itemFeature.setCreativeId(record.getString("create"));
+
+        // 1day features
+        AdActionFeature user1dayActionFeature = new AdActionFeature();
+        user1dayActionFeature.setAdView(record.getString("view_ad_1day"));
+        user1dayActionFeature.setAdClick(record.getString("click_ad_1day"));
+        user1dayActionFeature.setAdConversion(record.getString("conversion_ad_1day"));
+        user1dayActionFeature.setCtr(record.getString("ctr_ad_1day"));
+        user1dayActionFeature.setCvr(record.getString("cvr_ad_1day"));
+        itemFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        // 3day features
+        AdActionFeature user3dayActionFeature = new AdActionFeature();
+        user3dayActionFeature.setAdView(record.getString("view_ad_3day"));
+        user3dayActionFeature.setAdClick(record.getString("click_ad_3day"));
+        user3dayActionFeature.setAdConversion(record.getString("conversion_ad_3day"));
+        user3dayActionFeature.setCtr(record.getString("ctr_ad_3day"));
+        user3dayActionFeature.setCvr(record.getString("cvr_ad_3day"));
+        itemFeature.setDay3_cnt_features(user3dayActionFeature);
+
+
+        // 7day features
+        AdActionFeature user7dayActionFeature = new AdActionFeature();
+        user7dayActionFeature.setAdView(record.getString("view_ad_7day"));
+        user7dayActionFeature.setAdClick(record.getString("click_ad_7day"));
+        user7dayActionFeature.setAdConversion(record.getString("conversion_ad_7day"));
+        user7dayActionFeature.setCtr(record.getString("ctr_ad_7day"));
+        user7dayActionFeature.setCvr(record.getString("cvr_ad_7day"));
+        itemFeature.setDay3_cnt_features(user7dayActionFeature);
+
+        // 3month features
+        AdActionFeature user3MonthActionFeature = new AdActionFeature();
+        user3MonthActionFeature.setAdView(record.getString("view_ad_3month"));
+        user3MonthActionFeature.setAdClick(record.getString("click_ad_3month"));
+        user3MonthActionFeature.setAdConversion(record.getString("conversion_ad_3month"));
+        user3MonthActionFeature.setCtr(record.getString("ctr_ad_3month"));
+        user3MonthActionFeature.setCvr(record.getString("cvr_ad_3month"));
+        itemFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        return itemFeature;
+    }
+
+
+}

+ 2 - 2
src/main/java/com/tzld/piaoquan/data/dataloader/SampleFeatureConstructor.java → src/main/java/examples/dataloader/RecommRedisFeatureConstructor.java

@@ -1,4 +1,4 @@
-package com.tzld.piaoquan.data.dataloader;
+package examples.dataloader;
 
 
 import com.aliyun.odps.account.Account;
@@ -12,7 +12,7 @@ import com.tzld.piaoquan.data.base.UserFeature;
 import java.util.HashMap;
 import java.util.Map;
 
-public class SampleFeatureConstructor {
+public class RecommRedisFeatureConstructor {
 
     private static final String BUCKET_NAME = "ali-recommend";
     private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();

+ 165 - 0
src/main/java/examples/dataloader/RecommendSampleConstructor.java

@@ -0,0 +1,165 @@
+package examples.dataloader;
+
+
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.data.base.ItemFeature;
+import com.tzld.piaoquan.data.base.RequestContext;
+import com.tzld.piaoquan.data.base.UserActionFeature;
+import com.tzld.piaoquan.data.base.UserFeature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RecommendSampleConstructor {
+
+    private static final String BUCKET_NAME = "ali-recommend";
+    private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();
+
+    static {
+        ODPS_CONFIG.put("ENDPOINT", "http://service.cn.maxcompute.aliyun.com/api");
+        ODPS_CONFIG.put("ACCESSID", "LTAIWYUujJAm7CbH");
+        ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
+    }
+
+    ;
+
+    private static final Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
+
+
+    public static RequestContext constructRequestContext(Record record) {
+        RequestContext requestContext = new RequestContext();
+        requestContext.setApptype(record.getString("apptype"));
+        requestContext.setMachineinfo_brand(record.getString("machineinfo_brand"));
+        requestContext.setMachineinfo_model(record.getString("machineinfo_model"));
+        requestContext.setMachineinfo_platform(record.getString("machineinfo_platform"));
+        requestContext.setMachineinfo_sdkversion(record.getString("machineinfo_sdkversion"));
+        requestContext.setMachineinfo_system(record.getString("machineinfo_system"));
+        requestContext.setMachineinfo_wechatversion(record.getString("machineinfo_wechatversion"));
+        requestContext.setDay(record.getString("ctx_day"));
+        requestContext.setWeek(record.getString("ctx_week"));
+        requestContext.setHour(record.getString("ctx_hour"));
+        requestContext.setRegion(record.getString("ctx_region"));
+        requestContext.setCity(record.getString("ctx_city"));
+        return requestContext;
+    }
+
+
+    public static UserFeature constructUserFeature(Record record) {
+        UserFeature userFeature = new UserFeature();
+        userFeature.setUid(record.get("uid").toString());
+        userFeature.setUser_cycle_bucket_7days(record.getString("u_cycle_bucket_7days"));
+        userFeature.setUser_cycle_bucket_30days(record.getString("u_cycle_bucket_30days"));
+        userFeature.setUser_share_bucket_30days(record.getString("u_share_bucket_30days"));
+
+
+        // 1day features
+        UserActionFeature user1dayActionFeature = new UserActionFeature();
+        user1dayActionFeature.setExp_cnt(record.getString("u_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getString("u_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getString("u_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getString("u_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getString("u_ctr_1day"));
+        user1dayActionFeature.setStr(record.getString("u_str_1day"));
+        user1dayActionFeature.setRov(record.getString("u_rov_1day"));
+        user1dayActionFeature.setRos(record.getString("u_ros_1day"));
+        userFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        // 3day features
+        UserActionFeature user3dayActionFeature = new UserActionFeature();
+        user3dayActionFeature.setExp_cnt(record.getString("u_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getString("u_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getString("u_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getString("u_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getString("u_ctr_3day"));
+        user3dayActionFeature.setStr(record.getString("u_str_3day"));
+        user3dayActionFeature.setRov(record.getString("u_rov_3day"));
+        user3dayActionFeature.setRos(record.getString("u_ros_3day"));
+        userFeature.setDay3_cnt_features(user3dayActionFeature);
+
+        // 7day features
+        UserActionFeature user7dayActionFeature = new UserActionFeature();
+        user7dayActionFeature.setExp_cnt(record.getString("u_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getString("u_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getString("u_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getString("u_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getString("u_ctr_7day"));
+        user7dayActionFeature.setStr(record.getString("u_str_7day"));
+        user7dayActionFeature.setRov(record.getString("u_rov_7day"));
+        user7dayActionFeature.setRos(record.getString("u_ros_7day"));
+        userFeature.setDay7_cnt_features(user7dayActionFeature);
+
+        // 3month features
+        UserActionFeature user3monthActionFeature = new UserActionFeature();
+        user3monthActionFeature.setExp_cnt(record.getString("u_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getString("u_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getString("u_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getString("u_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getString("u_ctr_3month"));
+        user3monthActionFeature.setStr(record.getString("u_str_3month"));
+        user3monthActionFeature.setRov(record.getString("u_rov_3month"));
+        user3monthActionFeature.setRos(record.getString("u_ros_3month"));
+        userFeature.setMonth3_cnt_features(user3monthActionFeature);
+
+        return userFeature;
+    }
+
+
+    public static ItemFeature constructItemFeature(Record record) {
+        ItemFeature itemFeature = new ItemFeature();
+        itemFeature.setVideoId(record.getString("videoid"));
+        itemFeature.setUpId(record.getString("uid"));
+        itemFeature.setTitleLength(record.getString("i_title_len"));
+        itemFeature.setPlayLength(record.getString("i_play_len"));
+        itemFeature.setTotalTime(record.getString("total_time"));
+        itemFeature.setDaysSinceUpload(record.getString("i_days_since_upload"));
+
+        UserActionFeature user1dayActionFeature = new UserActionFeature();
+        user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getString("i_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getString("i_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getString("i_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getString("i_ctr_1day"));
+        user1dayActionFeature.setStr(record.getString("i_str_1day"));
+        user1dayActionFeature.setRov(record.getString("i_rov_1day"));
+        user1dayActionFeature.setRos(record.getString("i_ros_1day"));
+        itemFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user3dayActionFeature = new UserActionFeature();
+        user3dayActionFeature.setExp_cnt(record.getString("i_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getString("i_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getString("i_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getString("i_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getString("i_ctr_3day"));
+        user3dayActionFeature.setStr(record.getString("i_str_3day"));
+        user3dayActionFeature.setRov(record.getString("i_rov_3day"));
+        user3dayActionFeature.setRos(record.getString("i_ros_3day"));
+        itemFeature.setDay3_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user7dayActionFeature = new UserActionFeature();
+        user7dayActionFeature.setExp_cnt(record.getString("i_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getString("i_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getString("i_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getString("i_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getString("i_ctr_7day"));
+        user7dayActionFeature.setStr(record.getString("i_str_7day"));
+        user7dayActionFeature.setRov(record.getString("i_rov_7day"));
+        user7dayActionFeature.setRos(record.getString("i_ros_7day"));
+        itemFeature.setDay7_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user3monthActionFeature = new UserActionFeature();
+        user3monthActionFeature.setExp_cnt(record.getString("i_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getString("i_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getString("i_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getString("i_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getString("i_ctr_3month"));
+        user3monthActionFeature.setStr(record.getString("i_str_3month"));
+        user3monthActionFeature.setRov(record.getString("i_rov_3month"));
+        user3monthActionFeature.setRos(record.getString("i_ros_3month"));
+        itemFeature.setMonth3_cnt_features(user3monthActionFeature);
+        return itemFeature;
+    }
+
+
+}

+ 100 - 0
src/main/java/examples/sparksql/SparkAdCTRSampleLoader.java

@@ -0,0 +1,100 @@
+package examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.google.common.collect.ListMultimap;
+import com.tzld.piaoquan.ad.engine.commons.base.*;
+import com.tzld.piaoquan.data.base.*;
+import com.tzld.piaoquan.ad.engine.commons.score.feature.VlogAdCtrLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
+import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
+import examples.dataloader.AdSampleConstructor;
+import examples.dataloader.RecommendSampleConstructor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+public class SparkAdCTRSampleLoader {
+
+    public static void main(String[] args) {
+
+        String partition = args[0];
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "alg_recsys_view_sample";
+        String hdfsPath = "/dw/recommend/model/ad_ctr_samples/" + partition;
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+        JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(50));
+        readData.saveAsTextFile(hdfsPath);
+    }
+
+
+    static class RecordsToSamples implements Function2<Record, TableSchema, String> {
+        @Override
+        public String call(Record record, TableSchema schema) throws Exception {
+            String labelName = "adclick_ornot";
+            String ret = singleParse(record, labelName);
+            return ret;
+        }
+    }
+
+
+    // 单条日志处理逻辑
+    public static String singleParse(Record record, String labelName) {
+        // 数据解析
+        String label = record.getString(labelName);
+        if (label == null || label == "0") {
+            label = "1";
+        } else {
+            label = "0";
+        }
+
+        // 从sql的 record中 初始化对象内容
+        AdRequestContext requestContext = AdSampleConstructor.constructRequestContext(record);
+        UserAdFeature userFeature = AdSampleConstructor.constructUserFeature(record);
+        AdItemFeature itemFeature = AdSampleConstructor.constructItemFeature(record);
+
+        // 转化成bytes
+        AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
+        UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
+        AdItemBytesFeature  videoBytesFeature = new AdItemBytesFeature(itemFeature);
+
+        // 特征抽取
+        VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
+
+        bytesFeatureExtractor.getUserFeatures(userBytesFeature);
+        bytesFeatureExtractor.getItemFeature(videoBytesFeature);
+        bytesFeatureExtractor.getContextFeatures(adRequestContextBytesFeature);
+
+        ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();
+        return parseSamplesToString(label, featureMap);
+    }
+
+    // 构建样本的字符串
+    public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
+        ArrayList<String> featureList = new ArrayList<String>();
+        for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
+            FeatureGroup groupedFeature = entry.getKey();
+            BaseFeature baseFeature = entry.getValue();
+            Long featureIdentifier = baseFeature.getIdentifier();
+            featureList.add(String.valueOf(featureIdentifier) + ":1");
+        }
+        return label + "\t" + String.join("\t", featureList);
+    }
+
+}

+ 1 - 1
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRLoaderDemo.java → src/main/java/examples/sparksql/SparkEMRLoaderDemo.java

@@ -1,4 +1,4 @@
-package com.aliyun.odps.spark.examples.sparksql;
+package examples.sparksql;
 
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.data.Record;

+ 1 - 1
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkGrapDataLoader.java → src/main/java/examples/sparksql/SparkGrapDataLoader.java

@@ -1,4 +1,4 @@
-package com.aliyun.odps.spark.examples.sparksql;
+package examples.sparksql;
 
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.data.Record;

+ 10 - 8
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRShareRatioSampleLoader.java → src/main/java/examples/sparksql/SparkShareRatioSampleLoader.java

@@ -1,10 +1,10 @@
-package com.aliyun.odps.spark.examples.sparksql;
+package examples.sparksql;
 
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.data.Record;
 import com.google.common.collect.ListMultimap;
 import com.tzld.piaoquan.data.base.*;
-import com.tzld.piaoquan.data.dataloader.SampleFeatureConstructor;
+import examples.dataloader.RecommendSampleConstructor;
 import com.tzld.piaoquan.data.score.feature.VlogShareLRFeatureExtractor;
 import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
 import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
@@ -18,11 +18,11 @@ import java.util.ArrayList;
 import java.util.Map;
 
 
-public class SparkEMRShareRatioSampleLoader {
+public class SparkShareRatioSampleLoader {
 
     public static void main(String[] args) {
 
-        String partition = "dt=20231211";
+        String partition = args[0];
         String accessId = "LTAIWYUujJAm7CbH";
         String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
         String odpsUrl = "http://service.odps.aliyun.com/api";
@@ -55,14 +55,16 @@ public class SparkEMRShareRatioSampleLoader {
     public static String singleParse(Record record, String labelName) {
         // 数据解析
         String label = record.getString(labelName);
-        if (label == null) {
+        if (label == null || label == "1") {
             label = "0";
+        } else {
+            label = "1";
         }
 
         // 从sql的 record中 初始化对象内容
-        RequestContext requestContext = SampleFeatureConstructor.constructRequestContext(record);
-        UserFeature userFeature = SampleFeatureConstructor.constructUserFeature(record);
-        ItemFeature itemFeature = SampleFeatureConstructor.constructItemFeature(record);
+        RequestContext requestContext = RecommendSampleConstructor.constructRequestContext(record);
+        UserFeature userFeature = RecommendSampleConstructor.constructUserFeature(record);
+        ItemFeature itemFeature = RecommendSampleConstructor.constructItemFeature(record);
 
         // 转化成bytes
         RequestContextBytesFeature requestContextBytesFeature = new RequestContextBytesFeature(requestContext);

+ 5 - 9
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkUserFeaToRedisLoader.java → src/main/java/examples/sparksql/SparkUserFeaToRedisLoader.java

@@ -1,8 +1,8 @@
-package com.aliyun.odps.spark.examples.sparksql;
+package examples.sparksql;
 
 import com.aliyun.odps.data.Record;
 import com.tzld.piaoquan.data.base.*;
-import com.tzld.piaoquan.data.dataloader.RedisFeatureConstructor;
+import examples.dataloader.RecommRedisFeatureConstructor;
 import org.apache.spark.SparkConf;
 import org.apache.spark.aliyun.odps.OdpsOps;
 import org.apache.spark.api.java.JavaRDD;
@@ -36,7 +36,7 @@ public class SparkUserFeaToRedisLoader {
 
     public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
         Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
-        UserFeature userFeature = RedisFeatureConstructor.constructUserFeature(line);
+        UserFeature userFeature = RecommRedisFeatureConstructor.constructUserFeature(line);
         String key = String.format(userKeyFormat, userFeature.getKey());
         String value = userFeature.getValue();
         userFeaRedisFormat.put(key, value);
@@ -46,13 +46,13 @@ public class SparkUserFeaToRedisLoader {
 
     public static void main(String[] args) {
 
-        String partition = "dt=20231211";
+        String partition = args[0];
         String accessId = "LTAIWYUujJAm7CbH";
         String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
         String odpsUrl = "http://service.odps.aliyun.com/api";
         String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
         String project = "loghubods";
-        String table = "alg_recsys_view_sample";
+        String table = "alg_recsys_user_info";
 
         SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
@@ -68,10 +68,6 @@ public class SparkUserFeaToRedisLoader {
                 }
         );
 
-
-
-
-
     }