|
@@ -9,8 +9,12 @@ import com.aliyun.odps.data.Record;
|
|
|
import com.aliyun.odps.task.SQLTask;
|
|
|
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
|
|
|
import com.google.common.collect.ListMultimap;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
|
|
|
import com.tzld.piaoquan.recommend.server.common.base.*;
|
|
|
import com.tzld.piaoquan.recommend.server.dataloader.OfflineSamplesLoader;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
@@ -20,41 +24,34 @@ import java.util.Map;
|
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
+public class FeatureToRedisLoader {
|
|
|
|
|
|
-class Task implements Runnable {
|
|
|
- // 使用list避免线程队列增长无限制,所以采用10万条内容提交一次线程
|
|
|
- private List<String> keys;
|
|
|
- public Task(List<String> keys){
|
|
|
- this.keys = keys;
|
|
|
- }
|
|
|
- public void run() {
|
|
|
- //TODO write List<String> to Redis
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
+ @Autowired
|
|
|
+ private RedisTemplate<String, String> redisTemplate;
|
|
|
+ private final String userKeyFormat = "user:%s";
|
|
|
+ private ExecutorService pool = ThreadPoolFactory.defaultPool();
|
|
|
|
|
|
-public class FeatureToRedisLoader {
|
|
|
|
|
|
- private static final ExecutorService persistOnlineFeaturesExecutor = Executors.newFixedThreadPool(10);
|
|
|
|
|
|
- public static void loadFeatureToRedis(String userTable, String dt) {
|
|
|
+ public void loadFeatureToRedis(String userTable, String dt) {
|
|
|
TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(userTable, dt);
|
|
|
Record record;
|
|
|
- ArrayList<String> keyList = new ArrayList<>();
|
|
|
+ Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
|
|
|
int count = 0;
|
|
|
try {
|
|
|
while ((record = reader.read()) != null) {
|
|
|
UserFeature userFeature = FeatureConstructor.constructUserFeature(record);
|
|
|
- // TODO add string format for redis
|
|
|
- String tmp = userFeature.parsetoRedisString();
|
|
|
- keyList.add(tmp);
|
|
|
- if(count < 100000) {
|
|
|
+ String key = String.format(userKeyFormat, userFeature.getKey());
|
|
|
+ String value = userFeature.getValue();
|
|
|
+ userFeaRedisFormat.put(key, value);
|
|
|
+ if(count < 10000) {
|
|
|
count++;
|
|
|
- } else if (count == 100000) {
|
|
|
- persistOnlineFeaturesExecutor.execute(new Task(keyList));
|
|
|
- keyList = new ArrayList<>();
|
|
|
+ } else if (count == 10000) {
|
|
|
+ redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
|
|
|
+ userFeaRedisFormat = new HashMap<String, String>();
|
|
|
count = 0;
|
|
|
}
|
|
|
}
|
|
@@ -65,7 +62,6 @@ public class FeatureToRedisLoader {
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
- String Path = "";
|
|
|
|
|
|
}
|
|
|
|