| 
					
				 | 
			
			
				@@ -1,6 +1,9 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 package com.tzld.piaoquan.recommend.server.dataloader; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.data.ResultSet; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.tunnel.InstanceTunnel; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.tunnel.io.TunnelRecordReader; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.fasterxml.jackson.databind.ser.Serializers; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.google.common.collect.ListMultimap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.tzld.piaoquan.recommend.server.common.base.*; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -16,9 +19,11 @@ import com.aliyun.odps.account.Account; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.aliyun.odps.account.AliyunAccount; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.aliyun.odps.data.Record; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.aliyun.odps.task.SQLTask; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.io.IOException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import java.util.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-public class FeatureConstructor { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+public class  FeatureConstructor { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private static final String BUCKET_NAME = "ali-recommend"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private static  final Map<String, String> ODPS_CONFIG =  new HashMap<String, String>(); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -28,12 +33,14 @@ public class FeatureConstructor { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static final Account account =new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     public static List<Record> loadStreamDataFromOSS(String table, String dt) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         Odps odps = new Odps(account); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         odps.setDefaultProject("loghubods"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        String sql = String.format("select * from %s where dt ='%s' limit 100;", table, dt); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        String sql = String.format("select * from %s where dt ='%s' limit 100000;", table, dt); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         Instance instance; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         List<Record> records = new ArrayList<Record>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         try { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -47,21 +54,42 @@ public class FeatureConstructor { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public static TunnelRecordReader loadDataFromOSSSession(String table, String dt) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Odps odps = new Odps(account); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        odps.setDefaultProject("loghubods"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        String sql = String.format("select * from %s where dt ='%s';", table, dt); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        TunnelRecordReader reader = null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Instance instance = SQLTask.run(odps, sql); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            instance.waitForSuccess(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            InstanceTunnel tunnel = new InstanceTunnel(odps); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), instance.getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            long count = session.getRecordCount(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            reader = session.openRecordReader(0, count); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } catch (OdpsException e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            e.printStackTrace(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } catch (IOException e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            e.printStackTrace(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return reader; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     public static RequestContext constructRequestContext(Record record) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         RequestContext requestContext = new RequestContext(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setApptype(record.get("apptype").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setMachineinfo_brand(record.get("machineinfo_brand").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setMachineinfo_model(record.get("machineinfo_model").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setMachineinfo_platform(record.get("machineinfo_platform").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setMachineinfo_sdkversion(record.get("machineinfo_sdkversion").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setMachineinfo_system(record.get("machineinfo_system").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setMachineinfo_wechatversion(record.get("machineinfo_wechatversion").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setDay(record.get("ctx_day").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setWeek(record.get("ctx_week").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setHour(record.get("ctx_hour").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setRegion(record.get("ctx_region").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        requestContext.setCity(record.get("ctx_city").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setApptype(record.getString("apptype")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setMachineinfo_brand(record.getString("machineinfo_brand")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setMachineinfo_model(record.getString("machineinfo_model")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setMachineinfo_platform(record.getString("machineinfo_platform")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setMachineinfo_sdkversion(record.getString("machineinfo_sdkversion")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setMachineinfo_system(record.getString("machineinfo_system")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setMachineinfo_wechatversion(record.getString("machineinfo_wechatversion")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setDay(record.getString("ctx_day")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setWeek(record.getString("ctx_week")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setHour(record.getString("ctx_hour")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setRegion(record.getString("ctx_region")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        requestContext.setCity(record.getString("ctx_city")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         return requestContext; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -129,12 +157,12 @@ public class FeatureConstructor { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     public static ItemFeature constructItemFeature(Record record){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         ItemFeature itemFeature = new ItemFeature(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        itemFeature.setVideoId(record.get("videoid").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        itemFeature.setUpId(record.get("i_up_id").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        itemFeature.setTitleLength(record.get("i_title_len").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        itemFeature.setPlayLength(record.get("i_play_len").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        itemFeature.setTotalTime(record.get("total_time").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        itemFeature.setDaysSinceUpload(record.get("i_days_since_upload").toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        itemFeature.setVideoId(record.getString("videoid")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        itemFeature.setUpId(record.getString("i_up_id")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        itemFeature.setTitleLength(record.getString("i_title_len")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        itemFeature.setPlayLength(record.getString("i_play_len")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        itemFeature.setTotalTime(record.getString("total_time")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        itemFeature.setDaysSinceUpload(record.getString("i_days_since_upload")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         UserActionFeature user1dayActionFeature = new UserActionFeature(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt")); 
			 |