Explorar o código

ad feature to redis

sunmingze hai 1 ano
pai
achega
d2ddb94c61

+ 82 - 99
src/main/java/examples/dataloader/AdRedisFeatureConstructor.java

@@ -4,6 +4,9 @@ package examples.dataloader;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.ad.engine.commons.base.AdActionFeature;
+import com.tzld.piaoquan.ad.engine.commons.base.AdItemFeature;
+import com.tzld.piaoquan.ad.engine.commons.base.UserAdFeature;
 import com.tzld.piaoquan.data.base.ItemFeature;
 import com.tzld.piaoquan.data.base.RequestContext;
 import com.tzld.piaoquan.data.base.UserActionFeature;
@@ -46,118 +49,98 @@ public class AdRedisFeatureConstructor {
     }
 
 
-    public static UserFeature constructUserFeature(Record record) {
-        UserFeature userFeature = new UserFeature();
-        userFeature.setUid(record.get("uid").toString());
-        userFeature.setUser_cycle_bucket_7days(record.getString("u_cycle_bucket_7days"));
-        userFeature.setUser_cycle_bucket_30days(record.getString("u_cycle_bucket_30days"));
-        userFeature.setUser_share_bucket_30days(record.getString("u_share_bucket_30days"));
-
+    public static UserAdFeature constructUserFeature(Record record) {
+        UserAdFeature userFeature = new UserAdFeature();
+        userFeature.setMid(record.get("mids").toString());
 
         // 1day features
-        UserActionFeature user1dayActionFeature = new UserActionFeature();
-        user1dayActionFeature.setExp_cnt(record.getString("u_1day_exp_cnt"));
-        user1dayActionFeature.setClick_cnt(record.getString("u_1day_click_cnt"));
-        user1dayActionFeature.setShare_cnt(record.getString("u_1day_share_cnt"));
-        user1dayActionFeature.setReturn_cnt(record.getString("u_1day_return_cnt"));
-        user1dayActionFeature.setCtr(record.getString("u_ctr_1day"));
-        user1dayActionFeature.setStr(record.getString("u_str_1day"));
-        user1dayActionFeature.setRov(record.getString("u_rov_1day"));
-        user1dayActionFeature.setRos(record.getString("u_ros_1day"));
-        userFeature.setDay1_cnt_features(user1dayActionFeature);
+        AdActionFeature userAd1dayActionFeature = new AdActionFeature();
+        userAd1dayActionFeature.setAdView(record.getString("ad_view_1day"));
+        userAd1dayActionFeature.setAdClick(record.getString("ad_click_1day"));
+        userAd1dayActionFeature.setAdConversion(record.getString("ad_conversion_1day"));
+        userAd1dayActionFeature.setCtr(record.getString("ad_ctr_1day"));
+        userAd1dayActionFeature.setCvr(record.getString("ad_cvr_1day"));
+        userFeature.setDay1_cnt_features(userAd1dayActionFeature);
+
+
 
         // 3day features
-        UserActionFeature user3dayActionFeature = new UserActionFeature();
-        user3dayActionFeature.setExp_cnt(record.getString("u_3day_exp_cnt"));
-        user3dayActionFeature.setClick_cnt(record.getString("u_3day_click_cnt"));
-        user3dayActionFeature.setShare_cnt(record.getString("u_3day_share_cnt"));
-        user3dayActionFeature.setReturn_cnt(record.getString("u_3day_return_cnt"));
-        user3dayActionFeature.setCtr(record.getString("u_ctr_3day"));
-        user3dayActionFeature.setStr(record.getString("u_str_3day"));
-        user3dayActionFeature.setRov(record.getString("u_rov_3day"));
-        user3dayActionFeature.setRos(record.getString("u_ros_3day"));
-        userFeature.setDay3_cnt_features(user3dayActionFeature);
+        AdActionFeature userAd3dayActionFeature = new AdActionFeature();
+        userAd1dayActionFeature.setAdView(record.getString("ad_view_3day"));
+        userAd1dayActionFeature.setAdClick(record.getString("ad_click_3day"));
+        userAd1dayActionFeature.setAdConversion(record.getString("ad_conversion_3day"));
+        userAd1dayActionFeature.setCtr(record.getString("ad_ctr_3day"));
+        userAd1dayActionFeature.setCvr(record.getString("ad_cvr_3day"));
+        userFeature.setDay3_cnt_features(userAd3dayActionFeature);
+
 
         // 7day features
-        UserActionFeature user7dayActionFeature = new UserActionFeature();
-        user7dayActionFeature.setExp_cnt(record.getString("u_7day_exp_cnt"));
-        user7dayActionFeature.setClick_cnt(record.getString("u_7day_click_cnt"));
-        user7dayActionFeature.setShare_cnt(record.getString("u_7day_share_cnt"));
-        user7dayActionFeature.setReturn_cnt(record.getString("u_7day_return_cnt"));
-        user7dayActionFeature.setCtr(record.getString("u_ctr_7day"));
-        user7dayActionFeature.setStr(record.getString("u_str_7day"));
-        user7dayActionFeature.setRov(record.getString("u_rov_7day"));
-        user7dayActionFeature.setRos(record.getString("u_ros_7day"));
-        userFeature.setDay7_cnt_features(user7dayActionFeature);
+        AdActionFeature userAd7dayActionFeature = new AdActionFeature();
+        userAd1dayActionFeature.setAdView(record.getString("ad_view_7day"));
+        userAd1dayActionFeature.setAdClick(record.getString("ad_click7day"));
+        userAd1dayActionFeature.setAdConversion(record.getString("ad_conversion_7day"));
+        userAd1dayActionFeature.setCtr(record.getString("ad_ctr_7day"));
+        userAd1dayActionFeature.setCvr(record.getString("ad_cvr_7day"));
+        userFeature.setDay7_cnt_features(userAd7dayActionFeature);
+
+
 
         // 3month features
-        UserActionFeature user3monthActionFeature = new UserActionFeature();
-        user3monthActionFeature.setExp_cnt(record.getString("u_3month_exp_cnt"));
-        user3monthActionFeature.setClick_cnt(record.getString("u_3month_click_cnt"));
-        user3monthActionFeature.setShare_cnt(record.getString("u_3month_share_cnt"));
-        user3monthActionFeature.setReturn_cnt(record.getString("u_3month_return_cnt"));
-        user3monthActionFeature.setCtr(record.getString("u_ctr_3month"));
-        user3monthActionFeature.setStr(record.getString("u_str_3month"));
-        user3monthActionFeature.setRov(record.getString("u_rov_3month"));
-        user3monthActionFeature.setRos(record.getString("u_ros_3month"));
-        userFeature.setMonth3_cnt_features(user3monthActionFeature);
+        AdActionFeature userAd3MonthActionFeature = new AdActionFeature();
+        userAd3MonthActionFeature.setAdView(record.getString("ad_view_3month"));
+        userAd3MonthActionFeature.setAdClick(record.getString("ad_click_3month"));
+        userAd3MonthActionFeature.setAdConversion(record.getString("ad_conversion_3month"));
+        userAd3MonthActionFeature.setCtr(record.getString("ad_ctr_3month"));
+        userAd3MonthActionFeature.setCvr(record.getString("ad_cvr_3month"));
+        userFeature.setMonth3_cnt_features(userAd3MonthActionFeature);
+
 
         return userFeature;
     }
 
 
-    public static ItemFeature constructItemFeature(Record record) {
-        ItemFeature itemFeature = new ItemFeature();
-        itemFeature.setVideoId(record.getString("videoid"));
-        itemFeature.setUpId(record.getString("uid"));
-        itemFeature.setTitleLength(record.getString("i_title_len"));
-        itemFeature.setPlayLength(record.getString("i_play_len"));
-        itemFeature.setTotalTime(record.getString("total_time"));
-        itemFeature.setDaysSinceUpload(record.getString("i_days_since_upload"));
-
-        UserActionFeature user1dayActionFeature = new UserActionFeature();
-        user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt"));
-        user1dayActionFeature.setClick_cnt(record.getString("i_1day_click_cnt"));
-        user1dayActionFeature.setShare_cnt(record.getString("i_1day_share_cnt"));
-        user1dayActionFeature.setReturn_cnt(record.getString("i_1day_return_cnt"));
-        user1dayActionFeature.setCtr(record.getString("i_ctr_1day"));
-        user1dayActionFeature.setStr(record.getString("i_str_1day"));
-        user1dayActionFeature.setRov(record.getString("i_rov_1day"));
-        user1dayActionFeature.setRos(record.getString("i_ros_1day"));
-        itemFeature.setDay1_cnt_features(user1dayActionFeature);
-
-        UserActionFeature user3dayActionFeature = new UserActionFeature();
-        user3dayActionFeature.setExp_cnt(record.getString("i_3day_exp_cnt"));
-        user3dayActionFeature.setClick_cnt(record.getString("i_3day_click_cnt"));
-        user3dayActionFeature.setShare_cnt(record.getString("i_3day_share_cnt"));
-        user3dayActionFeature.setReturn_cnt(record.getString("i_3day_return_cnt"));
-        user3dayActionFeature.setCtr(record.getString("i_ctr_3day"));
-        user3dayActionFeature.setStr(record.getString("i_str_3day"));
-        user3dayActionFeature.setRov(record.getString("i_rov_3day"));
-        user3dayActionFeature.setRos(record.getString("i_ros_3day"));
-        itemFeature.setDay3_cnt_features(user1dayActionFeature);
-
-        UserActionFeature user7dayActionFeature = new UserActionFeature();
-        user7dayActionFeature.setExp_cnt(record.getString("i_7day_exp_cnt"));
-        user7dayActionFeature.setClick_cnt(record.getString("i_7day_click_cnt"));
-        user7dayActionFeature.setShare_cnt(record.getString("i_7day_share_cnt"));
-        user7dayActionFeature.setReturn_cnt(record.getString("i_7day_return_cnt"));
-        user7dayActionFeature.setCtr(record.getString("i_ctr_7day"));
-        user7dayActionFeature.setStr(record.getString("i_str_7day"));
-        user7dayActionFeature.setRov(record.getString("i_rov_7day"));
-        user7dayActionFeature.setRos(record.getString("i_ros_7day"));
-        itemFeature.setDay7_cnt_features(user1dayActionFeature);
-
-        UserActionFeature user3monthActionFeature = new UserActionFeature();
-        user3monthActionFeature.setExp_cnt(record.getString("i_3month_exp_cnt"));
-        user3monthActionFeature.setClick_cnt(record.getString("i_3month_click_cnt"));
-        user3monthActionFeature.setShare_cnt(record.getString("i_3month_share_cnt"));
-        user3monthActionFeature.setReturn_cnt(record.getString("i_3month_return_cnt"));
-        user3monthActionFeature.setCtr(record.getString("i_ctr_3month"));
-        user3monthActionFeature.setStr(record.getString("i_str_3month"));
-        user3monthActionFeature.setRov(record.getString("i_rov_3month"));
-        user3monthActionFeature.setRos(record.getString("i_ros_3month"));
-        itemFeature.setMonth3_cnt_features(user3monthActionFeature);
+    public static AdItemFeature constructItemFeature(Record record) {
+        AdItemFeature itemFeature = new AdItemFeature();
+        itemFeature.setAdId(record.getString("adid"));
+        itemFeature.setCampaignId(record.getString("campaignid"));
+        itemFeature.setAdvertiserId(record.getString("advertiserid"));
+        itemFeature.setCreativeId(record.getString("creativeid"));
+
+        // ad 维度特征
+        AdActionFeature adIdActionFeature1day = new AdActionFeature();
+        adIdActionFeature1day.setAdView(record.getString("view_ad_1day"));
+        adIdActionFeature1day.setAdClick(record.getString("click_ad_1day"));
+        adIdActionFeature1day.setAdConversion(record.getString("conversion_ad_1day"));
+        adIdActionFeature1day.setCtr(record.getString("ctr_ad_1day"));
+        adIdActionFeature1day.setCvr(record.getString("cvr_ad_1day"));
+        itemFeature.setDay1_cnt_features(adIdActionFeature1day);
+
+        AdActionFeature adIdActionFeature3day = new AdActionFeature();
+        adIdActionFeature3day.setAdView(record.getString("view_ad_3day"));
+        adIdActionFeature3day.setAdClick(record.getString("click_ad_3day"));
+        adIdActionFeature3day.setAdConversion(record.getString("conversion_ad_3day"));
+        adIdActionFeature3day.setCtr(record.getString("ctr_ad_3day"));
+        adIdActionFeature3day.setCvr(record.getString("cvr_ad_3day"));
+        itemFeature.setDay7_cnt_features(adIdActionFeature3day);
+
+        AdActionFeature adIdActionFeature7day = new AdActionFeature();
+        adIdActionFeature7day.setAdView(record.getString("view_ad_7day"));
+        adIdActionFeature7day.setAdClick(record.getString("click_ad_7day"));
+        adIdActionFeature7day.setAdConversion(record.getString("conversion_ad_7day"));
+        adIdActionFeature7day.setCtr(record.getString("ctr_ad_7day"));
+        adIdActionFeature7day.setCvr(record.getString("cvr_ad_7day"));
+        itemFeature.setDay7_cnt_features(adIdActionFeature7day);
+
+        AdActionFeature adIdActionFeature3month = new AdActionFeature();
+        adIdActionFeature3month.setAdView(record.getString("view_ad_7day"));
+        adIdActionFeature3month.setAdClick(record.getString("click_ad_7day"));
+        adIdActionFeature3month.setAdConversion(record.getString("conversion_ad_7day"));
+        adIdActionFeature3month.setCtr(record.getString("ctr_ad_7day"));
+        adIdActionFeature3month.setCvr(record.getString("cvr_ad_7day"));
+        itemFeature.setDay7_cnt_features(adIdActionFeature7day);
+
+        // TODO campaignId等维度特征
+
         return itemFeature;
     }
 

+ 2 - 2
src/main/java/examples/sparksql/SparkAdCTRSampleLoader.java

@@ -71,14 +71,14 @@ public class SparkAdCTRSampleLoader {
         // 转化成bytes
         AdRequestContextBytesFeature adRequestContextBytesFeature = new AdRequestContextBytesFeature(requestContext);
         UserAdBytesFeature userBytesFeature = new UserAdBytesFeature(userFeature);
-        AdItemBytesFeature  videoBytesFeature = new AdItemBytesFeature(itemFeature);
+        AdItemBytesFeature  adItemBytesFeature = new AdItemBytesFeature(itemFeature);
 
         // 特征抽取
         VlogAdCtrLRFeatureExtractor bytesFeatureExtractor;
         bytesFeatureExtractor = new VlogAdCtrLRFeatureExtractor();
 
         bytesFeatureExtractor.getUserFeatures(userBytesFeature);
-        bytesFeatureExtractor.getItemFeature(videoBytesFeature);
+        bytesFeatureExtractor.getItemFeature(adItemBytesFeature);
         bytesFeatureExtractor.getContextFeatures(adRequestContextBytesFeature);
 
         ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();

+ 103 - 0
src/main/java/examples/sparksql/SparkAdFeaToRedisLoader.java

@@ -0,0 +1,103 @@
+package examples.sparksql;
+
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.ad.engine.commons.base.AdItemFeature;
+import com.tzld.piaoquan.ad.engine.commons.base.UserAdFeature;
+import com.tzld.piaoquan.data.base.UserFeature;
+import examples.dataloader.AdRedisFeatureConstructor;
+import examples.dataloader.RecommRedisFeatureConstructor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class SparkAdFeaToRedisLoader {
+
+    private static final String userKeyFormat = "user:ad:%s";
+
+    private static final String adKeyFormat = "ad:%s";
+
+
+    public static RedisTemplate<String, String> buildRedisTemplate() {
+        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+        rsc.setPort(6379);
+        rsc.setPassword("Wqsd@2019");
+        rsc.setHostName("r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com");
+        RedisTemplate<String, String> template = new RedisTemplate<>();
+        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+        fac.afterPropertiesSet();
+        template.setDefaultSerializer(new StringRedisSerializer());
+        template.setConnectionFactory(fac);
+        template.afterPropertiesSet();
+        return template;
+    }
+
+    public static void loadUserAdFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
+        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
+        UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(line);
+        String key = String.format(userKeyFormat, userFeature.getKey());
+        String value = userFeature.getValue();
+        userFeaRedisFormat.put(key, value);
+        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+    }
+
+
+    public static void loadAdFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
+        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
+        AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(line);
+        String key = String.format(userKeyFormat, adItemFeature.getKey());
+        String value = adItemFeature.getValue();
+        userFeaRedisFormat.put(key, value);
+        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+    }
+
+
+    public static void main(String[] args) {
+
+        String partition = args[0];
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String tableAdInfo = "alg_ad_item_info";
+        String tableUserInfo = "alg_ad_user_info";
+
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+
+        // load Ad features
+        JavaRDD<Record> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, null, Integer.valueOf(10));
+        readAdData.sample(false, 0.0001).foreachPartition(
+                rowIterator -> {
+                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+                    rowIterator.forEachRemaining(line -> loadAdFeatureToRedis(redisTemplate, line));
+                }
+        );
+
+
+        // load user features
+        JavaRDD<Record> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, null, Integer.valueOf(10));
+        readUserData.sample(false, 0.0001).foreachPartition(
+                rowIterator -> {
+                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+                    rowIterator.forEachRemaining(line -> loadUserAdFeatureToRedis(redisTemplate, line));
+                }
+        );
+
+
+    }
+
+}