|
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
|
|
|
import com.aliyun.odps.data.Record;
|
|
import com.aliyun.odps.data.Record;
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.tzld.piaoquan.api.common.enums.contentplatform.AccountStatusEnum;
|
|
import com.tzld.piaoquan.api.common.enums.contentplatform.AccountStatusEnum;
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import com.tzld.piaoquan.api.common.enums.contentplatform.BussinessTypeEnum;
|
|
import com.tzld.piaoquan.api.common.enums.contentplatform.BussinessTypeEnum;
|
|
|
import com.tzld.piaoquan.api.component.AigcApiService;
|
|
import com.tzld.piaoquan.api.component.AigcApiService;
|
|
|
import com.tzld.piaoquan.api.dao.mapper.contentplatform.*;
|
|
import com.tzld.piaoquan.api.dao.mapper.contentplatform.*;
|
|
@@ -73,19 +74,22 @@ public class ContentPlatformDatastatJob {
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
// 公众号自动回复数据统计
|
|
// 公众号自动回复数据统计
|
|
|
String sql = String.format(
|
|
String sql = String.format(
|
|
|
- "SELECT first_level.channel_shortname, first_level.subchannel, first_level.first_uv, fission.split_uv " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.channel_shortname = fission.channel_shortname and first_level.subchannel = fission.subchannel " +
|
|
|
|
|
- "and first_level.dt = fission.dt and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '公众号即时回复' and first_level.tag = '分投放渠道客户分账号去重';", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.channel_shortname, first_level.subchannel, first_level.first_uv, fission.split_uv, fission.裂变arpu " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.channel_shortname = fission.channel_shortname and first_level.subchannel = fission.subchannel " +
|
|
|
|
|
+ "and first_level.dt = fission.dt and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '公众号即时回复' and first_level.tag = '分投放渠道客户分账号去重';", dt);
|
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ List<ContentPlatformAccount> accountList = getAllAccount();
|
|
|
|
|
+ Map<String, ContentPlatformAccount> accountMap = accountList.stream()
|
|
|
|
|
+ .collect(Collectors.toMap(ContentPlatformAccount::getChannel, account -> account));
|
|
|
// 所有公众号
|
|
// 所有公众号
|
|
|
- List<ContentPlatformGzhAccount> accountList = getAllGzhAccount();
|
|
|
|
|
- Map<String, ContentPlatformGzhAccount> accountMap = accountList.stream()
|
|
|
|
|
- .collect(Collectors.toMap(ContentPlatformGzhAccount::getGhId, account -> account));
|
|
|
|
|
- List<String> ghIds = accountList.stream().map(ContentPlatformGzhAccount::getGhId).collect(Collectors.toList());
|
|
|
|
|
- List<String> accountExternalIds = accountList.stream().map(ContentPlatformGzhAccount::getExternalId)
|
|
|
|
|
|
|
+ List<ContentPlatformGzhAccount> gzhAccountList = getAllGzhAccount();
|
|
|
|
|
+ Map<Long, Map<String, ContentPlatformGzhAccount>> gzhAccountMap = gzhAccountList.stream()
|
|
|
|
|
+ .collect(Collectors.groupingBy(ContentPlatformGzhAccount::getCreateAccountId,
|
|
|
|
|
+ Collectors.toMap(ContentPlatformGzhAccount::getGhId, account -> account)));
|
|
|
|
|
+ List<String> accountExternalIds = gzhAccountList.stream().map(ContentPlatformGzhAccount::getExternalId)
|
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
String dateStr = dt.substring(0, 4) + "-" + dt.substring(4, 6) + "-" + dt.substring(6, 8);
|
|
String dateStr = dt.substring(0, 4) + "-" + dt.substring(4, 6) + "-" + dt.substring(6, 8);
|
|
|
List<WxAccountDatastatVO> wxAccountDatastatVOList = aigcApiService.getWxAccountDatastat(dateStr, accountExternalIds);
|
|
List<WxAccountDatastatVO> wxAccountDatastatVOList = aigcApiService.getWxAccountDatastat(dateStr, accountExternalIds);
|
|
@@ -93,16 +97,38 @@ public class ContentPlatformDatastatJob {
|
|
|
.collect(Collectors.toMap(WxAccountDatastatVO::getAccountId, wxAccountDatastatVO -> wxAccountDatastatVO));
|
|
.collect(Collectors.toMap(WxAccountDatastatVO::getAccountId, wxAccountDatastatVO -> wxAccountDatastatVO));
|
|
|
if (CollectionUtils.isNotEmpty(dataList)) {
|
|
if (CollectionUtils.isNotEmpty(dataList)) {
|
|
|
List<ContentPlatformGzhDataStat> saveList = new ArrayList<>();
|
|
List<ContentPlatformGzhDataStat> saveList = new ArrayList<>();
|
|
|
|
|
+ BigDecimal sumScore = BigDecimal.ZERO;
|
|
|
for (Record record : dataList) {
|
|
for (Record record : dataList) {
|
|
|
- ContentPlatformGzhDataStat item = new ContentPlatformGzhDataStat();
|
|
|
|
|
|
|
+ String channel = (String) record.get(0);
|
|
|
String ghId = (String) record.get(1);
|
|
String ghId = (String) record.get(1);
|
|
|
- int firstLevelCount = Integer.parseInt((String) record.get(2));
|
|
|
|
|
|
|
+ Integer firstLevelCount = parseInteger(record.get(2));
|
|
|
Integer fissionCount = parseInteger(record.get(3));
|
|
Integer fissionCount = parseInteger(record.get(3));
|
|
|
- item.setDateStr(dt);
|
|
|
|
|
- if (!ghIds.contains(ghId)) {
|
|
|
|
|
|
|
+ Double fissionArpu = parseDouble(record.get(4));
|
|
|
|
|
+ if (fissionArpu > 0.3) {
|
|
|
|
|
+ fissionArpu = 0.3;
|
|
|
|
|
+ }
|
|
|
|
|
+ if ("SUM".equals(channel)) {
|
|
|
|
|
+ BigDecimal fissionRate = BigDecimal.valueOf(fissionCount.doubleValue() * 10 / firstLevelCount)
|
|
|
|
|
+ .setScale(2, RoundingMode.HALF_UP);
|
|
|
|
|
+ fissionArpu = BigDecimal.valueOf(fissionArpu).setScale(2, RoundingMode.HALF_UP).doubleValue();
|
|
|
|
|
+ sumScore = fissionRate.add(new BigDecimal(fissionArpu));
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
- ContentPlatformGzhAccount gzhAccount = accountMap.get(ghId);
|
|
|
|
|
|
|
+ ContentPlatformAccount account = accountMap.get(channel);
|
|
|
|
|
+ if (Objects.isNull(account)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ Map<String, ContentPlatformGzhAccount> gzhAccounts = gzhAccountMap.get(account.getId());
|
|
|
|
|
+ if (Objects.isNull(gzhAccounts)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ContentPlatformGzhAccount gzhAccount = gzhAccounts.get(ghId);
|
|
|
|
|
+ if (Objects.isNull(gzhAccount)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ ContentPlatformGzhDataStat item = new ContentPlatformGzhDataStat();
|
|
|
|
|
+ item.setDateStr(dt);
|
|
|
item.setAccountId(gzhAccount.getId());
|
|
item.setAccountId(gzhAccount.getId());
|
|
|
item.setFirstLevelCount(firstLevelCount);
|
|
item.setFirstLevelCount(firstLevelCount);
|
|
|
WxAccountDatastatVO wxAccountDatastatVO = wxAccountDatastatMap.get(gzhAccount.getExternalId());
|
|
WxAccountDatastatVO wxAccountDatastatVO = wxAccountDatastatMap.get(gzhAccount.getExternalId());
|
|
@@ -111,13 +137,25 @@ public class ContentPlatformDatastatJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (fissionCount > 0 && firstLevelCount > 0) {
|
|
if (fissionCount > 0 && firstLevelCount > 0) {
|
|
|
- BigDecimal num = BigDecimal.valueOf(fissionCount.doubleValue() * 10 / firstLevelCount);
|
|
|
|
|
- BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
|
|
|
|
|
- item.setScore(rounded.doubleValue());
|
|
|
|
|
|
|
+ BigDecimal fissionRate = BigDecimal.valueOf(fissionCount.doubleValue() * 10 / firstLevelCount)
|
|
|
|
|
+ .setScale(2, RoundingMode.HALF_UP);
|
|
|
|
|
+ fissionArpu = BigDecimal.valueOf(fissionArpu).setScale(2, RoundingMode.HALF_UP).doubleValue();
|
|
|
|
|
+ BigDecimal totalScore = fissionRate.add(new BigDecimal(fissionArpu));
|
|
|
|
|
+ item.setScore(totalScore.doubleValue());
|
|
|
}
|
|
}
|
|
|
item.setCreateTimestamp(now);
|
|
item.setCreateTimestamp(now);
|
|
|
saveList.add(item);
|
|
saveList.add(item);
|
|
|
}
|
|
}
|
|
|
|
|
+ for (ContentPlatformGzhDataStat item : saveList) {
|
|
|
|
|
+ if (item.getFirstLevelCount() < 10) {
|
|
|
|
|
+ item.setScore(0.0);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (Objects.nonNull(item.getScore())) {
|
|
|
|
|
+ BigDecimal score = BigDecimal.valueOf(item.getScore());
|
|
|
|
|
+ item.setScore(score.divide(sumScore, 3, RoundingMode.HALF_UP).multiply(BigDecimal.valueOf(10)).doubleValue());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
if (CollectionUtils.isNotEmpty(saveList)) {
|
|
if (CollectionUtils.isNotEmpty(saveList)) {
|
|
|
dataStatMapperExt.deleteGzhDatastat(dt);
|
|
dataStatMapperExt.deleteGzhDatastat(dt);
|
|
|
dataStatMapperExt.batchInsertGzhDatastat(saveList);
|
|
dataStatMapperExt.batchInsertGzhDatastat(saveList);
|
|
@@ -186,15 +224,15 @@ public class ContentPlatformDatastatJob {
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
// 公众号自动回复数据统计
|
|
// 公众号自动回复数据统计
|
|
|
String sql = String.format(
|
|
String sql = String.format(
|
|
|
- "SELECT first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
- "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
- "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
- "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '公众号即时回复' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.channel_shortname, first_level.first_uv, fission.split_uv, fission.裂变arpu, price.arpu " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
+ "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
+ "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
+ "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '公众号即时回复' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
|
// 所有公众号
|
|
// 所有公众号
|
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
@@ -210,16 +248,24 @@ public class ContentPlatformDatastatJob {
|
|
|
Map<String, WxAccountDatastatVO> wxAccountDatastatMap = wxAccountDatastatVOList.stream()
|
|
Map<String, WxAccountDatastatVO> wxAccountDatastatMap = wxAccountDatastatVOList.stream()
|
|
|
.collect(Collectors.toMap(WxAccountDatastatVO::getAccountId, wxAccountDatastatVO -> wxAccountDatastatVO));
|
|
.collect(Collectors.toMap(WxAccountDatastatVO::getAccountId, wxAccountDatastatVO -> wxAccountDatastatVO));
|
|
|
if (CollectionUtils.isNotEmpty(dataList)) {
|
|
if (CollectionUtils.isNotEmpty(dataList)) {
|
|
|
|
|
+ BigDecimal sumScore = BigDecimal.ZERO;
|
|
|
List<ContentPlatformGzhDataStatTotal> saveList = new ArrayList<>();
|
|
List<ContentPlatformGzhDataStatTotal> saveList = new ArrayList<>();
|
|
|
for (Record record : dataList) {
|
|
for (Record record : dataList) {
|
|
|
- ContentPlatformGzhDataStatTotal item = new ContentPlatformGzhDataStatTotal();
|
|
|
|
|
String channel = record.getString(0);
|
|
String channel = record.getString(0);
|
|
|
- int firstLevelCount = Integer.parseInt((String) record.get(1));
|
|
|
|
|
|
|
+ Integer firstLevelCount = parseInteger(record.get(1));
|
|
|
Integer fissionCount = parseInteger(record.get(2));
|
|
Integer fissionCount = parseInteger(record.get(2));
|
|
|
- Double arpu = parseDouble(record.get(3));
|
|
|
|
|
- item.setDateStr(dt);
|
|
|
|
|
- item.setChannel(channel);
|
|
|
|
|
- item.setFirstLevelCount(firstLevelCount);
|
|
|
|
|
|
|
+ Double fissionArpu = parseDouble(record.get(3));
|
|
|
|
|
+ if (fissionArpu > 0.3) {
|
|
|
|
|
+ fissionArpu = 0.3;
|
|
|
|
|
+ }
|
|
|
|
|
+ Double arpu = parseDouble(record.get(4));
|
|
|
|
|
+ if ("SUM".equals(channel)) {
|
|
|
|
|
+ BigDecimal fissionRate = BigDecimal.valueOf(fissionCount.doubleValue() * 10 / firstLevelCount)
|
|
|
|
|
+ .setScale(2, RoundingMode.HALF_UP);
|
|
|
|
|
+ fissionArpu = BigDecimal.valueOf(fissionArpu).setScale(2, RoundingMode.HALF_UP).doubleValue();
|
|
|
|
|
+ sumScore = fissionRate.add(new BigDecimal(fissionArpu));
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
ContentPlatformAccount account = accountMap.get(channel);
|
|
ContentPlatformAccount account = accountMap.get(channel);
|
|
|
if (Objects.isNull(account)) {
|
|
if (Objects.isNull(account)) {
|
|
|
continue;
|
|
continue;
|
|
@@ -234,12 +280,19 @@ public class ContentPlatformDatastatJob {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ ContentPlatformGzhDataStatTotal item = new ContentPlatformGzhDataStatTotal();
|
|
|
|
|
+ item.setDateStr(dt);
|
|
|
|
|
+ item.setChannel(channel);
|
|
|
|
|
+ item.setFirstLevelCount(firstLevelCount);
|
|
|
item.setFansIncreaseCount(fansIncreaseCount);
|
|
item.setFansIncreaseCount(fansIncreaseCount);
|
|
|
|
|
|
|
|
if (fissionCount > 0 && firstLevelCount > 0) {
|
|
if (fissionCount > 0 && firstLevelCount > 0) {
|
|
|
- BigDecimal fissionRate = BigDecimal.valueOf(fissionCount.doubleValue() / firstLevelCount);
|
|
|
|
|
- BigDecimal rounded = fissionRate.multiply(new BigDecimal(10)).setScale(2, RoundingMode.HALF_UP);
|
|
|
|
|
- item.setScore(rounded.doubleValue());
|
|
|
|
|
|
|
+ BigDecimal fissionRate = BigDecimal.valueOf(fissionCount.doubleValue() * 10 / firstLevelCount)
|
|
|
|
|
+ .setScale(2, RoundingMode.HALF_UP);
|
|
|
|
|
+ fissionArpu = BigDecimal.valueOf(fissionArpu).setScale(2, RoundingMode.HALF_UP).doubleValue();
|
|
|
|
|
+ BigDecimal totalScore = fissionRate.add(new BigDecimal(fissionArpu));
|
|
|
|
|
+ item.setScore(totalScore.doubleValue());
|
|
|
|
|
+
|
|
|
BigDecimal unitPrice = getUnitPrice(account.getPrice(), fissionRate, arpu, BussinessTypeEnum.GZH_AUTO_REPLY);
|
|
BigDecimal unitPrice = getUnitPrice(account.getPrice(), fissionRate, arpu, BussinessTypeEnum.GZH_AUTO_REPLY);
|
|
|
if (Objects.nonNull(unitPrice)) {
|
|
if (Objects.nonNull(unitPrice)) {
|
|
|
item.setUnitPrice(unitPrice.doubleValue());
|
|
item.setUnitPrice(unitPrice.doubleValue());
|
|
@@ -254,6 +307,16 @@ public class ContentPlatformDatastatJob {
|
|
|
item.setCreateTimestamp(now);
|
|
item.setCreateTimestamp(now);
|
|
|
saveList.add(item);
|
|
saveList.add(item);
|
|
|
}
|
|
}
|
|
|
|
|
+ for (ContentPlatformGzhDataStatTotal item : saveList) {
|
|
|
|
|
+ if (item.getFirstLevelCount() < 25) {
|
|
|
|
|
+ item.setScore(0.0);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (Objects.nonNull(item.getScore())) {
|
|
|
|
|
+ BigDecimal score = BigDecimal.valueOf(item.getScore());
|
|
|
|
|
+ item.setScore(score.divide(sumScore, 3, RoundingMode.HALF_UP).multiply(BigDecimal.valueOf(10)).doubleValue());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
if (CollectionUtils.isNotEmpty(saveList)) {
|
|
if (CollectionUtils.isNotEmpty(saveList)) {
|
|
|
dataStatMapperExt.deleteGzhDatastatTotal(dt);
|
|
dataStatMapperExt.deleteGzhDatastatTotal(dt);
|
|
|
dataStatMapperExt.batchInsertGzhDatastatTotal(saveList);
|
|
dataStatMapperExt.batchInsertGzhDatastatTotal(saveList);
|
|
@@ -271,12 +334,12 @@ public class ContentPlatformDatastatJob {
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
// 公众号自动回复数据统计
|
|
// 公众号自动回复数据统计
|
|
|
String sql = String.format(
|
|
String sql = String.format(
|
|
|
- "SELECT first_level.subchannel, first_level.first_uv, fission.split_uv " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.subchannel = fission.subchannel and first_level.dt = fission.dt " +
|
|
|
|
|
- "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '服务号代运营' and first_level.tag = '分投放渠道客户分账号去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.subchannel, first_level.first_uv, fission.split_uv " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.subchannel = fission.subchannel and first_level.dt = fission.dt " +
|
|
|
|
|
+ "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '服务号代运营' and first_level.tag = '分投放渠道客户分账号去重' ;", dt);
|
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
|
// 所有公众号
|
|
// 所有公众号
|
|
|
List<ContentPlatformGzhAccount> accountList = getAllGzhAccount();
|
|
List<ContentPlatformGzhAccount> accountList = getAllGzhAccount();
|
|
@@ -333,15 +396,15 @@ public class ContentPlatformDatastatJob {
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
// 公众号自动回复数据统计
|
|
// 公众号自动回复数据统计
|
|
|
String sql = String.format(
|
|
String sql = String.format(
|
|
|
- "SELECT first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
- "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
- "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
- "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '服务号代运营' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
+ "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
+ "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
+ "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '服务号代运营' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
|
// 所有公众号
|
|
// 所有公众号
|
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
@@ -418,12 +481,12 @@ public class ContentPlatformDatastatJob {
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
// 公众号自动回复数据统计
|
|
// 公众号自动回复数据统计
|
|
|
String sql = String.format(
|
|
String sql = String.format(
|
|
|
- "SELECT first_level.subchannel, first_level.first_uv, fission.split_uv " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.subchannel = fission.subchannel and first_level.dt = fission.dt " +
|
|
|
|
|
- "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '公众号推送' and first_level.tag = '分投放渠道客户分账号去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.subchannel, first_level.first_uv, fission.split_uv " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.subchannel = fission.subchannel and first_level.dt = fission.dt " +
|
|
|
|
|
+ "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '公众号推送' and first_level.tag = '分投放渠道客户分账号去重' ;", dt);
|
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
|
// 所有公众号
|
|
// 所有公众号
|
|
|
List<ContentPlatformGzhAccount> accountList = getAllGzhAccount();
|
|
List<ContentPlatformGzhAccount> accountList = getAllGzhAccount();
|
|
@@ -480,15 +543,15 @@ public class ContentPlatformDatastatJob {
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
// 公众号自动回复数据统计
|
|
// 公众号自动回复数据统计
|
|
|
String sql = String.format(
|
|
String sql = String.format(
|
|
|
- "SELECT first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
- "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
- "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
- "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '公众号推送' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
+ "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
+ "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
+ "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '公众号推送' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
List<Record> dataList = OdpsUtil.getOdpsData(sql);
|
|
|
// 所有公众号
|
|
// 所有公众号
|
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
@@ -586,25 +649,39 @@ public class ContentPlatformDatastatJob {
|
|
|
}
|
|
}
|
|
|
Map<Long, ContentPlatformQwPlan> planMap = qwPlanList.stream()
|
|
Map<Long, ContentPlatformQwPlan> planMap = qwPlanList.stream()
|
|
|
.collect(Collectors.toMap(ContentPlatformQwPlan::getId, plan -> plan));
|
|
.collect(Collectors.toMap(ContentPlatformQwPlan::getId, plan -> plan));
|
|
|
- List<ContentPlatformQwDataStat> existList = getQwDatastatCount(dt);
|
|
|
|
|
- List<String> existRootSourceIds = existList.stream().map(ContentPlatformQwDataStat::getRootSourceId)
|
|
|
|
|
- .collect(Collectors.toList());
|
|
|
|
|
Map<String, Long> rootSourceIdMap = qwPlanList.stream()
|
|
Map<String, Long> rootSourceIdMap = qwPlanList.stream()
|
|
|
.collect(Collectors.toMap(ContentPlatformQwPlan::getRootSourceId, ContentPlatformQwPlan::getId));
|
|
.collect(Collectors.toMap(ContentPlatformQwPlan::getRootSourceId, ContentPlatformQwPlan::getId));
|
|
|
List<Long> planIds = qwPlanList.stream().map(ContentPlatformQwPlan::getId).collect(Collectors.toList());
|
|
List<Long> planIds = qwPlanList.stream().map(ContentPlatformQwPlan::getId).collect(Collectors.toList());
|
|
|
- List<ContentPlatformQwPlanVideo> planVideoList = planService.getQwPlanVideoList(planIds);
|
|
|
|
|
|
|
+ List<ContentPlatformQwPlanVideo> planVideoList = new ArrayList<>();
|
|
|
|
|
+ List<List<Long>> partitionList = Lists.partition(planIds, 2000);
|
|
|
|
|
+ for (List<Long> partition : partitionList) {
|
|
|
|
|
+ planVideoList.addAll(planService.getQwPlanVideoList(partition));
|
|
|
|
|
+ }
|
|
|
Map<Long, Long> planVideoMap = planVideoList.stream()
|
|
Map<Long, Long> planVideoMap = planVideoList.stream()
|
|
|
.collect(Collectors.toMap(ContentPlatformQwPlanVideo::getPlanId, ContentPlatformQwPlanVideo::getVideoId));
|
|
.collect(Collectors.toMap(ContentPlatformQwPlanVideo::getPlanId, ContentPlatformQwPlanVideo::getVideoId));
|
|
|
List<Long> videoIds = planVideoList.stream().map(ContentPlatformQwPlanVideo::getVideoId).collect(Collectors.toList());
|
|
List<Long> videoIds = planVideoList.stream().map(ContentPlatformQwPlanVideo::getVideoId).collect(Collectors.toList());
|
|
|
- List<ContentPlatformVideoAgg> videoList = planService.getVideoContentAggListByVideoIds(videoIds);
|
|
|
|
|
|
|
+ List<ContentPlatformVideoAgg> videoList = new ArrayList<>();
|
|
|
|
|
+ List<List<Long>> videoIdPartitionList = Lists.partition(videoIds, 2000);
|
|
|
|
|
+ for (List<Long> partition : videoIdPartitionList) {
|
|
|
|
|
+ videoList.addAll(planService.getVideoContentAggListByVideoIds(partition));
|
|
|
|
|
+ }
|
|
|
Map<Long, Double> videoScoreMap = videoList.stream()
|
|
Map<Long, Double> videoScoreMap = videoList.stream()
|
|
|
.collect(Collectors.toMap(ContentPlatformVideoAgg::getVideoId, ContentPlatformVideoAgg::getScore, (a, b) -> a));
|
|
.collect(Collectors.toMap(ContentPlatformVideoAgg::getVideoId, ContentPlatformVideoAgg::getScore, (a, b) -> a));
|
|
|
List<ContentPlatformQwDataStat> saveList = new ArrayList<>();
|
|
List<ContentPlatformQwDataStat> saveList = new ArrayList<>();
|
|
|
List<String> rootSourceIds = qwPlanList.stream().map(ContentPlatformQwPlan::getRootSourceId).collect(Collectors.toList());
|
|
List<String> rootSourceIds = qwPlanList.stream().map(ContentPlatformQwPlan::getRootSourceId).collect(Collectors.toList());
|
|
|
- String outSql = String.format("SELECT * FROM loghubods.qw_out_touliu_behavior_detail WHERE dt=%s;", dt);
|
|
|
|
|
- List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
|
|
|
|
|
|
|
+ List<String> existRootSourceIds = new ArrayList<>();
|
|
|
Long now = System.currentTimeMillis();
|
|
Long now = System.currentTimeMillis();
|
|
|
- if (CollectionUtils.isNotEmpty(outDataList)) {
|
|
|
|
|
|
|
+ int pageSize = 5000;
|
|
|
|
|
+ int pageNum = 1;
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ Integer offset = (pageNum - 1) * pageSize;
|
|
|
|
|
+ String outSql = String.format("SELECT rootsourceid, 首层访问人数 " +
|
|
|
|
|
+ "FROM loghubods.qw_out_touliu_behavior_detail " +
|
|
|
|
|
+ "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, pageSize);
|
|
|
|
|
+ List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(outDataList)) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
for (Record record : outDataList) {
|
|
for (Record record : outDataList) {
|
|
|
ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
|
|
ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
|
|
|
String rootSourceId = (String) record.get(0);
|
|
String rootSourceId = (String) record.get(0);
|
|
@@ -614,7 +691,7 @@ public class ContentPlatformDatastatJob {
|
|
|
if (existRootSourceIds.contains(rootSourceId)) {
|
|
if (existRootSourceIds.contains(rootSourceId)) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
- int firstLevelCount = Integer.parseInt((String) record.get(8));
|
|
|
|
|
|
|
+ int firstLevelCount = Integer.parseInt((String) record.get(1));
|
|
|
if (firstLevelCount == 0) {
|
|
if (firstLevelCount == 0) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
@@ -633,12 +710,24 @@ public class ContentPlatformDatastatJob {
|
|
|
item.setFirstLevelCount(firstLevelCount);
|
|
item.setFirstLevelCount(firstLevelCount);
|
|
|
item.setCreateTimestamp(now);
|
|
item.setCreateTimestamp(now);
|
|
|
saveList.add(item);
|
|
saveList.add(item);
|
|
|
|
|
+ existRootSourceIds.add(rootSourceId);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (outDataList.size() < pageSize) {
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
|
|
+ pageNum++;
|
|
|
}
|
|
}
|
|
|
- String out2Sql = String.format("SELECT * FROM loghubods.qw_out2_touliu_behavior_detail WHERE dt=%s;", dt);
|
|
|
|
|
- List<Record> out2DataList = OdpsUtil.getOdpsData(out2Sql);
|
|
|
|
|
- if (CollectionUtils.isNotEmpty(out2DataList)) {
|
|
|
|
|
- for (Record record : out2DataList) {
|
|
|
|
|
|
|
+ pageNum = 1;
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ Integer offset = (pageNum - 1) * pageSize;
|
|
|
|
|
+ String out2Sql = String.format("SELECT rootsourceid, 首层访问人数 " +
|
|
|
|
|
+ "FROM loghubods.qw_out2_touliu_behavior_detail " +
|
|
|
|
|
+ "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, pageSize);
|
|
|
|
|
+ List<Record> outDataList = OdpsUtil.getOdpsData(out2Sql);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(outDataList)) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ for (Record record : outDataList) {
|
|
|
ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
|
|
ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
|
|
|
String rootSourceId = (String) record.get(0);
|
|
String rootSourceId = (String) record.get(0);
|
|
|
if (!rootSourceIds.contains(rootSourceId)) {
|
|
if (!rootSourceIds.contains(rootSourceId)) {
|
|
@@ -647,7 +736,7 @@ public class ContentPlatformDatastatJob {
|
|
|
if (existRootSourceIds.contains(rootSourceId)) {
|
|
if (existRootSourceIds.contains(rootSourceId)) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
- int firstLevelCount = Integer.parseInt((String) record.get(8));
|
|
|
|
|
|
|
+ int firstLevelCount = Integer.parseInt((String) record.get(1));
|
|
|
if (firstLevelCount == 0) {
|
|
if (firstLevelCount == 0) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
@@ -664,7 +753,12 @@ public class ContentPlatformDatastatJob {
|
|
|
item.setFirstLevelCount(firstLevelCount);
|
|
item.setFirstLevelCount(firstLevelCount);
|
|
|
item.setCreateTimestamp(now);
|
|
item.setCreateTimestamp(now);
|
|
|
saveList.add(item);
|
|
saveList.add(item);
|
|
|
|
|
+ existRootSourceIds.add(rootSourceId);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (outDataList.size() < pageSize) {
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
|
|
+ pageNum++;
|
|
|
}
|
|
}
|
|
|
if (CollectionUtils.isNotEmpty(saveList)) {
|
|
if (CollectionUtils.isNotEmpty(saveList)) {
|
|
|
dataStatMapperExt.deleteQwDatastat(dt);
|
|
dataStatMapperExt.deleteQwDatastat(dt);
|
|
@@ -686,15 +780,15 @@ public class ContentPlatformDatastatJob {
|
|
|
}
|
|
}
|
|
|
Map<String, ContentPlatformQwDataStatTotal> saveMap = new HashMap<>();
|
|
Map<String, ContentPlatformQwDataStatTotal> saveMap = new HashMap<>();
|
|
|
String outSql = String.format(
|
|
String outSql = String.format(
|
|
|
- "SELECT first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
- "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
- "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
- "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '企微外部' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.channel_shortname, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.channel_shortname = fission.channel_shortname and first_level.dt = fission.dt " +
|
|
|
|
|
+ "and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
+ "on first_level.channel_shortname = price.channel_shortname and first_level.dt = price.dt " +
|
|
|
|
|
+ "and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '企微外部' and first_level.tag = '投放渠道内去重' ;", dt);
|
|
|
List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
|
|
List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
|
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
|
Map<String, ContentPlatformAccount> accountMap = accountList.stream()
|
|
Map<String, ContentPlatformAccount> accountMap = accountList.stream()
|
|
@@ -838,15 +932,15 @@ public class ContentPlatformDatastatJob {
|
|
|
}
|
|
}
|
|
|
List<ContentPlatformQwDataStatSubChannel> saveList = new ArrayList<>();
|
|
List<ContentPlatformQwDataStatSubChannel> saveList = new ArrayList<>();
|
|
|
String outSql = String.format(
|
|
String outSql = String.format(
|
|
|
- "SELECT first_level.channel_shortname, first_level.subchannel, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
- "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
- "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
- "on first_level.channel_shortname = fission.channel_shortname and first_level.subchannel = fission.subchannel " +
|
|
|
|
|
- "and first_level.dt = fission.dt and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
- "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
- "on first_level.channel_shortname = price.channel_shortname and first_level.subchannel = price.subchannel " +
|
|
|
|
|
- "and first_level.dt = price.dt and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
- "WHERE first_level.dt = %s and first_level.type = '企微外部' and first_level.tag = '分投放渠道客户分账号去重' ;", dt);
|
|
|
|
|
|
|
+ "SELECT distinct first_level.channel_shortname, first_level.subchannel, first_level.first_uv, fission.split_uv, price.arpu " +
|
|
|
|
|
+ "FROM loghubods.out_channel_mid_first_total first_level " +
|
|
|
|
|
+ "left join loghubods.out_channel_mid_split_total fission " +
|
|
|
|
|
+ "on first_level.channel_shortname = fission.channel_shortname and first_level.subchannel = fission.subchannel " +
|
|
|
|
|
+ "and first_level.dt = fission.dt and first_level.type = fission.type and first_level.tag = fission.tag " +
|
|
|
|
|
+ "left join loghubods.wecom_cooperation_dynamic_unit_price price " +
|
|
|
|
|
+ "on first_level.channel_shortname = price.channel_shortname and first_level.subchannel = price.subchannel " +
|
|
|
|
|
+ "and first_level.dt = price.dt and first_level.type = price.type and first_level.tag = price.tag " +
|
|
|
|
|
+ "WHERE first_level.dt = %s and first_level.type = '企微外部' and first_level.tag = '分投放渠道客户分账号去重' ;", dt);
|
|
|
List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
|
|
List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
|
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
List<ContentPlatformAccount> accountList = getAllAccount();
|
|
|
Map<String, ContentPlatformAccount> accountMap = accountList.stream()
|
|
Map<String, ContentPlatformAccount> accountMap = accountList.stream()
|