|
@@ -0,0 +1,245 @@
|
|
|
+package com.tzld.piaoquan.ad.engine.service.score.container;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.alibaba.fastjson.TypeReference;
|
|
|
+import com.aliyun.oss.OSS;
|
|
|
+import com.aliyun.oss.OSSClientBuilder;
|
|
|
+import com.aliyun.oss.common.auth.CredentialsProvider;
|
|
|
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
|
|
|
+import com.aliyun.oss.model.CopyObjectResult;
|
|
|
+import com.aliyun.oss.model.OSSObject;
|
|
|
+import com.aliyun.oss.model.PutObjectResult;
|
|
|
+import com.tzld.piaoquan.ad.engine.commons.util.DateUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Component
|
|
|
+public class PidLambdaForCpcContainer {
|
|
|
+ private final static Logger log = LoggerFactory.getLogger(PidLambdaForCpcContainer.class);
|
|
|
+
|
|
|
+ private static final int SCHEDULE_PERIOD = 10;
|
|
|
+ private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
|
|
+ @Value("${model.oss.internal.endpoint:oss-cn-hangzhou.aliyuncs.com}")
|
|
|
+ String endpoint = "";
|
|
|
+ @Value("${model.oss.accessKeyId:LTAIP6x1l3DXfSxm}")
|
|
|
+ String accessKeyId = "";
|
|
|
+ @Value("${model.oss.accessKetSecret:KbTaM9ars4OX3PMS6Xm7rtxGr1FLon}")
|
|
|
+ String accessKetSecret = "";
|
|
|
+ @Value("${model.oss.bucketName:art-recommend}")
|
|
|
+ String bucketName = "";
|
|
|
+
|
|
|
+ @Value("${model.oss.pid.cpc.filename.lambda:pid/lambda_cpc.txt}")
|
|
|
+ String lambdaFileName = "";
|
|
|
+
|
|
|
+ @Value("${model.oss.pid.cpc.filename.dCpa:pid/dCpa_cpc.txt}")
|
|
|
+ String dCpaFileName = "";
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.cpc.kp:0.4}")
|
|
|
+ Double kp = 0d;
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.cpc.ki:0.4}")
|
|
|
+ Double ki = 0d;
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.cpc.kd:0.2}")
|
|
|
+ Double kd = 0d;
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.cpc.lambda.max:5.0}")
|
|
|
+ Double maxLambda = 0d;
|
|
|
+
|
|
|
+ @Value("${ad.model.pid.cpc.lambda.min:0.2}")
|
|
|
+ Double minLambda = 0d;
|
|
|
+ OSS client;
|
|
|
+
|
|
|
+ private static ConcurrentHashMap<Long, CpcCacheItem> lambdaCache=new ConcurrentHashMap<>();
|
|
|
+ private Date cacheDate;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ private void init(){
|
|
|
+ instanceClient();
|
|
|
+ final Runnable task = new Runnable() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ loadAndCalIfNeed();
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ scheduler.scheduleAtFixedRate(task, 0, SCHEDULE_PERIOD, TimeUnit.MINUTES); // 10分钟
|
|
|
+ }
|
|
|
+
|
|
|
+ private void instanceClient(){
|
|
|
+ CredentialsProvider credentialsProvider = new DefaultCredentialProvider(accessKeyId, accessKetSecret);
|
|
|
+ this.client = new OSSClientBuilder().build(endpoint, credentialsProvider);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadAndCalIfNeed(){
|
|
|
+ loadLambdaFile();
|
|
|
+ OSSObject dCpaFileOjb=client.getObject(bucketName,dCpaFileName);
|
|
|
+ if(cacheDate==null||dCpaFileOjb.getObjectMetadata().getLastModified().after(cacheDate)){
|
|
|
+ calNewLambda(dCpaFileOjb);
|
|
|
+ writeLambdaFileToOss();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void calNewLambda(OSSObject object) {
|
|
|
+ try {
|
|
|
+ InputStream is=object.getObjectContent();
|
|
|
+ InputStreamReader isr=new InputStreamReader(is);
|
|
|
+ BufferedReader bufferedReader = new BufferedReader(isr);
|
|
|
+ String line = null;
|
|
|
+ ConcurrentHashMap<Long, CpcCacheItem> temp=new ConcurrentHashMap<>();
|
|
|
+ Double conversion=0d;
|
|
|
+ Double cpa=0d;
|
|
|
+ Double realCost=0d;
|
|
|
+ Double latestRealCPA=0d;
|
|
|
+ double sumE=0d;
|
|
|
+ while ((line = bufferedReader.readLine()) != null){
|
|
|
+ try {
|
|
|
+ String[] cols=line.split(",");
|
|
|
+ Long creativeId=Long.parseLong(cols[0]);
|
|
|
+ CpcCacheItem cacheItem=lambdaCache.getOrDefault(creativeId,new CpcCacheItem(creativeId));
|
|
|
+ if(DateUtils.getCurrentHour()<=8){
|
|
|
+ temp.put(creativeId,cacheItem);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ conversion=Double.parseDouble(cols[1]);
|
|
|
+ cpa=Double.parseDouble(cols[2]);
|
|
|
+ realCost=Double.parseDouble(cols[3]);
|
|
|
+ if(conversion<1d){
|
|
|
+ temp.put(creativeId,cacheItem);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ latestRealCPA=realCost/conversion;
|
|
|
+ if(Math.abs(latestRealCPA-cacheItem.latestRealCpa)<0.01){
|
|
|
+ temp.put(creativeId,cacheItem);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ Double lambdaNew =cacheItem.calculate(kp,ki,kd,cpa,latestRealCPA);
|
|
|
+ if(lambdaNew<minLambda){
|
|
|
+ lambdaNew=minLambda;
|
|
|
+ }
|
|
|
+ cacheItem.lambda=lambdaNew;
|
|
|
+ cacheItem.latestRealCpa=latestRealCPA;
|
|
|
+ cacheItem.sumError=sumE;
|
|
|
+ cacheItem.latestConv=conversion;
|
|
|
+
|
|
|
+ temp.put(creativeId,cacheItem);
|
|
|
+
|
|
|
+ log.info("svc=calCPCLambda creativeId={} lambdaNew={}", creativeId,lambdaNew);
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lambdaCache.clear();
|
|
|
+ lambdaCache=temp;
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("svc=calCPCLambda status=failed error={}", Arrays.toString(e.getStackTrace()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeLambdaFileToOss(){
|
|
|
+ //先不考虑各种更新失败及重复更新问题。
|
|
|
+ try {
|
|
|
+ String tempFile=lambdaFileName+"_temp";
|
|
|
+ String content= JSONObject.toJSONString(lambdaCache);
|
|
|
+ PutObjectResult putObjectResult=client.putObject(bucketName,tempFile,new ByteArrayInputStream(content.getBytes()));
|
|
|
+ CopyObjectResult copyObjectResult=client.copyObject(bucketName, tempFile, bucketName, lambdaFileName);
|
|
|
+ this.cacheDate= copyObjectResult.getLastModified();
|
|
|
+ client.deleteObject(bucketName, tempFile);
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("svc=writeCPCLambdaFileToOss status=failed error={}", Arrays.toString(e.getStackTrace()));
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadLambdaFile(){
|
|
|
+ try {
|
|
|
+ OSSObject object=client.getObject(bucketName,lambdaFileName);
|
|
|
+ if(object==null) return;
|
|
|
+ if(cacheDate!=null&& !cacheDate.before(object.getObjectMetadata().getLastModified())) return;
|
|
|
+ StringBuilder builder=new StringBuilder();
|
|
|
+ InputStream is=object.getObjectContent();
|
|
|
+ InputStreamReader isr=new InputStreamReader(is);
|
|
|
+ BufferedReader bufferedReader = new BufferedReader(isr);
|
|
|
+ String line = null;
|
|
|
+ while ((line=bufferedReader.readLine())!=null){
|
|
|
+ builder.append(line);
|
|
|
+ }
|
|
|
+ lambdaCache=JSONObject.parseObject(builder.toString(),new TypeReference<ConcurrentHashMap<Long, CpcCacheItem>>(){});
|
|
|
+ this.cacheDate=object.getObjectMetadata().getLastModified();
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("svc=loadCPCLambdaFile status=failed error={}", Arrays.toString(e.getStackTrace()));
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Double getPidLambda(Long creativeId){
|
|
|
+ try {
|
|
|
+ return lambdaCache.getOrDefault(creativeId,new CpcCacheItem(creativeId)).lambda;
|
|
|
+ }catch (Exception e){
|
|
|
+ return 1d;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public static class CpcCacheItem {
|
|
|
+
|
|
|
+ public CpcCacheItem(){
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public CpcCacheItem(Long creativeId){
|
|
|
+ this.creativeId=creativeId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long creativeId;
|
|
|
+
|
|
|
+ public double lambda=-1d;
|
|
|
+
|
|
|
+ public double latestConv=0d;
|
|
|
+
|
|
|
+ public double sumError=0d;
|
|
|
+
|
|
|
+ public double latestRealCpa=0d;
|
|
|
+
|
|
|
+ public double processVariable=0d; // 处理变量
|
|
|
+ public double integral=0d; // 积分项
|
|
|
+ public double lastError=0d; // 上一个误差
|
|
|
+
|
|
|
+ public double lastPidValue=0d;
|
|
|
+
|
|
|
+ public double calculate(double kp, double ki, double kd, double setPoint,double currentValue) {
|
|
|
+ processVariable = currentValue;
|
|
|
+ double error = setPoint - processVariable;
|
|
|
+
|
|
|
+ integral += error;
|
|
|
+ if(Math.abs(integral)>2*setPoint){
|
|
|
+ integral=(Math.abs(integral)/integral)*2*setPoint;
|
|
|
+ }
|
|
|
+ double derivative = (error - lastError) / 1; // 假设采样间隔为1
|
|
|
+ lastError = error;
|
|
|
+ return lambda+kp * error + ki * integral + kd * derivative;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void reset() {
|
|
|
+ integral = 0;
|
|
|
+ lastError = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|