| 
					
				 | 
			
			
				@@ -0,0 +1,171 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+package com.tzld.piaoquan.ad.engine.service.score.container; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.alibaba.fastjson.JSONObject; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.alibaba.fastjson.TypeReference; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.OSS; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.OSSClientBuilder; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.common.auth.CredentialsProvider; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.common.auth.DefaultCredentialProvider; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.model.CopyObjectResult; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.model.OSSObject; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.oss.model.PutObjectResult; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.slf4j.Logger; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.slf4j.LoggerFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Value; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.stereotype.Component; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import javax.annotation.PostConstruct; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.io.BufferedReader; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.io.ByteArrayInputStream; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.io.InputStream; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.io.InputStreamReader; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Arrays; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.Date; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.ConcurrentHashMap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.Executors; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.ScheduledExecutorService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@Component 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+public class PidLambdaV2Container { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private final static Logger log = LoggerFactory.getLogger(PidLambdaV2Container.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static final int SCHEDULE_PERIOD = 10; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${model.oss.internal.endpoint:oss-cn-hangzhou.aliyuncs.com}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    String endpoint = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${model.oss.accessKeyId:LTAIP6x1l3DXfSxm}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    String accessKeyId = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${model.oss.accessKetSecret:KbTaM9ars4OX3PMS6Xm7rtxGr1FLon}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    String accessKetSecret = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${model.oss.bucketName:art-recommend}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    String bucketName = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${model.oss.pid.v2.filename.lambda:pid/lambdaV2.txt}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    String lambdaFileName = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${model.oss.pid.v2.filename.dCpa:pid/dCpaV2.txt}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    String dCpaFileName = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${ad.model.pid.v2.lambda.max:5.0}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    Double maxLambda = 0d; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${ad.model.pid.v2.lambda.min:0.8}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    Double minLambda = 0d; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    OSS client; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static ConcurrentHashMap<Long,Double>  lambdaCache=new ConcurrentHashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private Date cacheDate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @PostConstruct 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void init(){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        instanceClient(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        final Runnable task = new Runnable() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            public void run() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    loadAndCalIfNeed(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }catch (Exception e){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    e.printStackTrace(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        scheduler.scheduleAtFixedRate(task, 0, SCHEDULE_PERIOD, TimeUnit.MINUTES); // 10分钟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void instanceClient(){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        CredentialsProvider credentialsProvider = new DefaultCredentialProvider(accessKeyId, accessKetSecret); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        this.client = new OSSClientBuilder().build(endpoint, credentialsProvider); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void loadAndCalIfNeed(){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        loadLambdaFile(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        OSSObject dCpaFileOjb=client.getObject(bucketName,dCpaFileName); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if(cacheDate==null||dCpaFileOjb.getObjectMetadata().getLastModified().after(cacheDate)){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            calNewLambda(dCpaFileOjb); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            writeLambdaFileToOss(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void calNewLambda(OSSObject object) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            InputStream is=object.getObjectContent(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            InputStreamReader isr=new InputStreamReader(is); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            BufferedReader bufferedReader = new BufferedReader(isr); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String line = null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ConcurrentHashMap<Long,Double>  temp=new ConcurrentHashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            while ((line = bufferedReader.readLine()) != null){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    String[] cols=line.split(","); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Long creativeId=Long.parseLong(cols[0]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Double conversion=Double.parseDouble(cols[1]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Double cpa=Double.parseDouble(cols[2]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Double realCost=Double.parseDouble(cols[3]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Double lambdaNew=1d; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if(((cpa*conversion)==0&&realCost.equals(0d))){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        lambdaNew=cpa*conversion/realCost; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if(lambdaNew>maxLambda){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        lambdaNew=maxLambda; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    }else if(lambdaNew<minLambda){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        lambdaNew=maxLambda; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    temp.put(creativeId,lambdaNew); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    log.info("svc=calNewLambdaV2 creativeId={} lambdaNew={}", creativeId,lambdaNew); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }catch (Exception e){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    e.printStackTrace(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            lambdaCache.clear(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            lambdaCache=temp; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }catch (Exception e){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log.error("svc=calNewLambdaV2 status=failed error={}", Arrays.toString(e.getStackTrace())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void writeLambdaFileToOss(){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        //先不考虑各种更新失败及重复更新问题。 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String tempFile=lambdaFileName+"_temp"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String content= JSONObject.toJSONString(lambdaCache); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            PutObjectResult putObjectResult=client.putObject(bucketName,tempFile,new ByteArrayInputStream(content.getBytes())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            CopyObjectResult copyObjectResult=client.copyObject(bucketName, tempFile, bucketName, lambdaFileName); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            this.cacheDate= copyObjectResult.getLastModified(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            client.deleteObject(bucketName, tempFile); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }catch (Exception e){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log.error("svc=writeLambdaV2FileToOss status=failed error={}", Arrays.toString(e.getStackTrace())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            e.printStackTrace(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void loadLambdaFile(){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            OSSObject object=client.getObject(bucketName,lambdaFileName); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if(object==null) return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if(cacheDate!=null&& !cacheDate.before(object.getObjectMetadata().getLastModified())) return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            StringBuilder builder=new StringBuilder(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            InputStream is=object.getObjectContent(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            InputStreamReader isr=new InputStreamReader(is); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            BufferedReader bufferedReader = new BufferedReader(isr); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String line = null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            while ((line=bufferedReader.readLine())!=null){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                builder.append(line); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            lambdaCache=JSONObject.parseObject(builder.toString(),new TypeReference<ConcurrentHashMap<Long,Double>>(){}); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            this.cacheDate=object.getObjectMetadata().getLastModified(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }catch (Exception e){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log.error("svc=loadLambdaV2File status=failed error={}", Arrays.toString(e.getStackTrace())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            e.printStackTrace(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public static Double getPidLambda(Long creativeId){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return lambdaCache.getOrDefault(creativeId,1d); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }catch (Exception e){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return 1d; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 |