|
@@ -0,0 +1,61 @@
|
|
|
+package com.tzld.piaoquan.recommend.server.dataloader;
|
|
|
+
|
|
|
+import com.aliyun.odps.data.Record;
|
|
|
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.base.ItemFeature;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.base.UserFeature;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+
|
|
|
+
|
|
|
+@Component
|
|
|
+public class ItemFeatureToRedisLoader {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RedisTemplate<String, String> redisTemplate;
|
|
|
+ private final String videoKeyFormat = "video:%s";
|
|
|
+ private ExecutorService pool = ThreadPoolFactory.defaultPool();
|
|
|
+
|
|
|
+ public void loadFeatureToRedis(String table, String dt) {
|
|
|
+ String sql = String.format("select * from %s where dt ='%s';", table, dt);
|
|
|
+ TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql, table, dt);
|
|
|
+ Record record;
|
|
|
+ Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
|
|
|
+ int count = 0;
|
|
|
+ try {
|
|
|
+ while ((record = reader.read()) != null) {
|
|
|
+ ItemFeature itemFeature = FeatureConstructor.constructItemFeature(record);
|
|
|
+ String key = String.format(videoKeyFormat, itemFeature.getKey());
|
|
|
+ String value = itemFeature.getValue();
|
|
|
+ userFeaRedisFormat.put(key, value);
|
|
|
+ if (count < 10000) {
|
|
|
+ count++;
|
|
|
+ } else if (count == 10000) {
|
|
|
+ redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
|
|
|
+ userFeaRedisFormat = new HashMap<String, String>();
|
|
|
+ count = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public static void main(String[] args) {
|
|
|
+ if (args.length < 2) {
|
|
|
+ System.out.println("--------args 缺失---------");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ ItemFeatureToRedisLoader itemFeatureToRedisLoader = new ItemFeatureToRedisLoader();
|
|
|
+ itemFeatureToRedisLoader.loadFeatureToRedis(args[0], args[1]);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|