|
@@ -0,0 +1,148 @@
|
|
|
+package com.tzld.piaoquan.ad.engine.service.score.container;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.aliyun.oss.OSS;
|
|
|
+import com.aliyun.oss.OSSClientBuilder;
|
|
|
+import com.aliyun.oss.common.auth.CredentialsProvider;
|
|
|
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
|
|
|
+import com.aliyun.oss.model.CopyObjectResult;
|
|
|
+import com.aliyun.oss.model.OSSObject;
|
|
|
+import com.aliyun.oss.model.PutObjectResult;
|
|
|
+import com.tzld.piaoquan.ad.engine.service.predict.impl.PredictModelServiceImpl;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Component
|
|
|
+public class PidLambdaContainer {
|
|
|
+ private final static Logger log = LoggerFactory.getLogger(PidLambdaContainer.class);
|
|
|
+
|
|
|
+ private static final int SCHEDULE_PERIOD = 10;
|
|
|
+ private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
|
|
+ @Value("${model.oss.internal.endpoint:oss-cn-hangzhou.aliyuncs.com}")
|
|
|
+ String endpoint = "";
|
|
|
+ @Value("${model.oss.accessKeyId:LTAIP6x1l3DXfSxm}")
|
|
|
+ String accessKeyId = "";
|
|
|
+ @Value("${model.oss.accessKetSecret:KbTaM9ars4OX3PMS6Xm7rtxGr1FLon}")
|
|
|
+ String accessKetSecret = "";
|
|
|
+ @Value("${model.oss.bucketName:art-recommend}")
|
|
|
+ String bucketName = "";
|
|
|
+
|
|
|
+ @Value("${model.oss.filename.lambda:art-recommend}")
|
|
|
+ String lambdaFileName = "";
|
|
|
+
|
|
|
+ @Value("${model.oss.filename.dCpa:art-recommend}")
|
|
|
+ String dCpaFileName = "";
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.kp:0}")
|
|
|
+ Double kp = 0d;
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.ki:0}")
|
|
|
+ Double ki = 0d;
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.kd:0}")
|
|
|
+ Double kd = 0d;
|
|
|
+
|
|
|
+ OSS client;
|
|
|
+
|
|
|
+ private static ConcurrentHashMap<Long,Double> lambdaCache=new ConcurrentHashMap<>();
|
|
|
+ private Date cacheDate;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ private void init(){
|
|
|
+ instanceClient();
|
|
|
+ final Runnable task = new Runnable() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ loadAndCalIfNeed();
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ scheduler.scheduleAtFixedRate(task, 10, SCHEDULE_PERIOD, TimeUnit.MINUTES); // 10分钟
|
|
|
+ }
|
|
|
+
|
|
|
+ private void instanceClient(){
|
|
|
+ CredentialsProvider credentialsProvider = new DefaultCredentialProvider(accessKeyId, accessKetSecret);
|
|
|
+ this.client = new OSSClientBuilder().build(endpoint, credentialsProvider);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadAndCalIfNeed(){
|
|
|
+ loadLambdaFile();
|
|
|
+ OSSObject dCpaFileOjb=client.getObject(bucketName,dCpaFileName);
|
|
|
+ if(dCpaFileOjb.getObjectMetadata().getLastModified().after(cacheDate)){
|
|
|
+ calNewLambda(dCpaFileOjb);
|
|
|
+ writeLambdaFileToOss();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void calNewLambda(OSSObject object) {
|
|
|
+ try {
|
|
|
+ InputStream is=object.getObjectContent();
|
|
|
+ InputStreamReader isr=new InputStreamReader(is);
|
|
|
+ BufferedReader bufferedReader = new BufferedReader(isr);
|
|
|
+ String line = null;
|
|
|
+ while ((line = bufferedReader.readLine()) != null){
|
|
|
+ try {
|
|
|
+ String[] cols=line.split(",");
|
|
|
+ Long creativeId=Long.parseLong(cols[0]);
|
|
|
+ Double lambdaNew=lambdaCache.getOrDefault(creativeId,0d)+
|
|
|
+ kp*Double.parseDouble(cols[1])+ki*Double.parseDouble(cols[2])+kd*Double.parseDouble(cols[3]);
|
|
|
+ lambdaCache.put(creativeId,lambdaNew);
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("svc=calNewLambda status=failed error={}", Arrays.toString(e.getStackTrace()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeLambdaFileToOss(){
|
|
|
+ //先不考虑各种更新失败及重复更新问题。
|
|
|
+ String tempFile=lambdaFileName+"_temp";
|
|
|
+ String content= JSONObject.toJSONString(lambdaCache);
|
|
|
+ PutObjectResult putObjectResult=client.putObject(bucketName,tempFile,new ByteArrayInputStream(content.getBytes()));
|
|
|
+ CopyObjectResult copyObjectResult=client.copyObject(bucketName, tempFile, bucketName, lambdaFileName);
|
|
|
+ this.cacheDate= copyObjectResult.getLastModified();
|
|
|
+ client.deleteObject(bucketName, tempFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadLambdaFile(){
|
|
|
+ try {
|
|
|
+ OSSObject object=client.getObject(bucketName,lambdaFileName);
|
|
|
+ if(cacheDate!=null&& !cacheDate.before(object.getObjectMetadata().getLastModified())) return;
|
|
|
+// if(cacheDate!=null&& cacheDate.after(object.getObjectMetadata().getLastModified())) return;
|
|
|
+ StringBuilder builder=new StringBuilder();
|
|
|
+ InputStream is=object.getObjectContent();
|
|
|
+ InputStreamReader isr=new InputStreamReader(is);
|
|
|
+ BufferedReader bufferedReader = new BufferedReader(isr);
|
|
|
+ String line = null;
|
|
|
+ while ((line=bufferedReader.readLine())!=null){
|
|
|
+ builder.append(line);
|
|
|
+ }
|
|
|
+ lambdaCache=JSONObject.parseObject(builder.toString(),ConcurrentHashMap.class);
|
|
|
+ this.cacheDate=object.getObjectMetadata().getLastModified();
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Double getPidLambda(Long creativeId){
|
|
|
+ return lambdaCache.getOrDefault(creativeId,0d);
|
|
|
+ }
|
|
|
+}
|