|  | @@ -1,6 +1,9 @@
 | 
	
		
			
				|  |  |  package com.tzld.piaoquan.recommend.server.dataloader;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import com.aliyun.odps.data.ResultSet;
 | 
	
		
			
				|  |  | +import com.aliyun.odps.tunnel.InstanceTunnel;
 | 
	
		
			
				|  |  | +import com.aliyun.odps.tunnel.io.TunnelRecordReader;
 | 
	
		
			
				|  |  |  import com.fasterxml.jackson.databind.ser.Serializers;
 | 
	
		
			
				|  |  |  import com.google.common.collect.ListMultimap;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.recommend.server.common.base.*;
 | 
	
	
		
			
				|  | @@ -16,9 +19,11 @@ import com.aliyun.odps.account.Account;
 | 
	
		
			
				|  |  |  import com.aliyun.odps.account.AliyunAccount;
 | 
	
		
			
				|  |  |  import com.aliyun.odps.data.Record;
 | 
	
		
			
				|  |  |  import com.aliyun.odps.task.SQLTask;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import java.io.IOException;
 | 
	
		
			
				|  |  |  import java.util.*;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -public class FeatureConstructor {
 | 
	
		
			
				|  |  | +public class  FeatureConstructor {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private static final String BUCKET_NAME = "ali-recommend";
 | 
	
		
			
				|  |  |      private static  final Map<String, String> ODPS_CONFIG =  new HashMap<String, String>();
 | 
	
	
		
			
				|  | @@ -28,12 +33,14 @@ public class FeatureConstructor {
 | 
	
		
			
				|  |  |          ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
 | 
	
		
			
				|  |  |      };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private static final Account account =new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public static List<Record> loadStreamDataFromOSS(String table, String dt) {
 | 
	
		
			
				|  |  | -        Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
 | 
	
		
			
				|  |  |          Odps odps = new Odps(account);
 | 
	
		
			
				|  |  |          odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT"));
 | 
	
		
			
				|  |  |          odps.setDefaultProject("loghubods");
 | 
	
		
			
				|  |  | -        String sql = String.format("select * from %s where dt ='%s' limit 100;", table, dt);
 | 
	
		
			
				|  |  | +        String sql = String.format("select * from %s where dt ='%s' limit 100000;", table, dt);
 | 
	
		
			
				|  |  |          Instance instance;
 | 
	
		
			
				|  |  |          List<Record> records = new ArrayList<Record>();
 | 
	
		
			
				|  |  |          try {
 | 
	
	
		
			
				|  | @@ -47,21 +54,42 @@ public class FeatureConstructor {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public static TunnelRecordReader loadDataFromOSSSession(String table, String dt) {
 | 
	
		
			
				|  |  | +        Odps odps = new Odps(account);
 | 
	
		
			
				|  |  | +        odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT"));
 | 
	
		
			
				|  |  | +        odps.setDefaultProject("loghubods");
 | 
	
		
			
				|  |  | +        String sql = String.format("select * from %s where dt ='%s';", table, dt);
 | 
	
		
			
				|  |  | +        TunnelRecordReader reader = null;
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            Instance instance = SQLTask.run(odps, sql);
 | 
	
		
			
				|  |  | +            instance.waitForSuccess();
 | 
	
		
			
				|  |  | +            InstanceTunnel tunnel = new InstanceTunnel(odps);
 | 
	
		
			
				|  |  | +            InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), instance.getId());
 | 
	
		
			
				|  |  | +            long count = session.getRecordCount();
 | 
	
		
			
				|  |  | +            reader = session.openRecordReader(0, count);
 | 
	
		
			
				|  |  | +        } catch (OdpsException e) {
 | 
	
		
			
				|  |  | +            e.printStackTrace();
 | 
	
		
			
				|  |  | +        } catch (IOException e) {
 | 
	
		
			
				|  |  | +            e.printStackTrace();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        return reader;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public static RequestContext constructRequestContext(Record record) {
 | 
	
		
			
				|  |  |          RequestContext requestContext = new RequestContext();
 | 
	
		
			
				|  |  | -        requestContext.setApptype(record.get("apptype").toString());
 | 
	
		
			
				|  |  | -        requestContext.setMachineinfo_brand(record.get("machineinfo_brand").toString());
 | 
	
		
			
				|  |  | -        requestContext.setMachineinfo_model(record.get("machineinfo_model").toString());
 | 
	
		
			
				|  |  | -        requestContext.setMachineinfo_platform(record.get("machineinfo_platform").toString());
 | 
	
		
			
				|  |  | -        requestContext.setMachineinfo_sdkversion(record.get("machineinfo_sdkversion").toString());
 | 
	
		
			
				|  |  | -        requestContext.setMachineinfo_system(record.get("machineinfo_system").toString());
 | 
	
		
			
				|  |  | -        requestContext.setMachineinfo_wechatversion(record.get("machineinfo_wechatversion").toString());
 | 
	
		
			
				|  |  | -        requestContext.setDay(record.get("ctx_day").toString());
 | 
	
		
			
				|  |  | -        requestContext.setWeek(record.get("ctx_week").toString());
 | 
	
		
			
				|  |  | -        requestContext.setHour(record.get("ctx_hour").toString());
 | 
	
		
			
				|  |  | -        requestContext.setRegion(record.get("ctx_region").toString());
 | 
	
		
			
				|  |  | -        requestContext.setCity(record.get("ctx_city").toString());
 | 
	
		
			
				|  |  | +        requestContext.setApptype(record.getString("apptype"));
 | 
	
		
			
				|  |  | +        requestContext.setMachineinfo_brand(record.getString("machineinfo_brand"));
 | 
	
		
			
				|  |  | +        requestContext.setMachineinfo_model(record.getString("machineinfo_model"));
 | 
	
		
			
				|  |  | +        requestContext.setMachineinfo_platform(record.getString("machineinfo_platform"));
 | 
	
		
			
				|  |  | +        requestContext.setMachineinfo_sdkversion(record.getString("machineinfo_sdkversion"));
 | 
	
		
			
				|  |  | +        requestContext.setMachineinfo_system(record.getString("machineinfo_system"));
 | 
	
		
			
				|  |  | +        requestContext.setMachineinfo_wechatversion(record.getString("machineinfo_wechatversion"));
 | 
	
		
			
				|  |  | +        requestContext.setDay(record.getString("ctx_day"));
 | 
	
		
			
				|  |  | +        requestContext.setWeek(record.getString("ctx_week"));
 | 
	
		
			
				|  |  | +        requestContext.setHour(record.getString("ctx_hour"));
 | 
	
		
			
				|  |  | +        requestContext.setRegion(record.getString("ctx_region"));
 | 
	
		
			
				|  |  | +        requestContext.setCity(record.getString("ctx_city"));
 | 
	
		
			
				|  |  |          return requestContext;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -129,12 +157,12 @@ public class FeatureConstructor {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public static ItemFeature constructItemFeature(Record record){
 | 
	
		
			
				|  |  |          ItemFeature itemFeature = new ItemFeature();
 | 
	
		
			
				|  |  | -        itemFeature.setVideoId(record.get("videoid").toString());
 | 
	
		
			
				|  |  | -        itemFeature.setUpId(record.get("i_up_id").toString());
 | 
	
		
			
				|  |  | -        itemFeature.setTitleLength(record.get("i_title_len").toString());
 | 
	
		
			
				|  |  | -        itemFeature.setPlayLength(record.get("i_play_len").toString());
 | 
	
		
			
				|  |  | -        itemFeature.setTotalTime(record.get("total_time").toString());
 | 
	
		
			
				|  |  | -        itemFeature.setDaysSinceUpload(record.get("i_days_since_upload").toString());
 | 
	
		
			
				|  |  | +        itemFeature.setVideoId(record.getString("videoid"));
 | 
	
		
			
				|  |  | +        itemFeature.setUpId(record.getString("i_up_id"));
 | 
	
		
			
				|  |  | +        itemFeature.setTitleLength(record.getString("i_title_len"));
 | 
	
		
			
				|  |  | +        itemFeature.setPlayLength(record.getString("i_play_len"));
 | 
	
		
			
				|  |  | +        itemFeature.setTotalTime(record.getString("total_time"));
 | 
	
		
			
				|  |  | +        itemFeature.setDaysSinceUpload(record.getString("i_days_since_upload"));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          UserActionFeature user1dayActionFeature = new UserActionFeature();
 | 
	
		
			
				|  |  |          user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt"));
 |