|
@@ -0,0 +1,249 @@
|
|
|
+package com.tzld.piaoquan.risk.control.service.sync.impl;
|
|
|
+
|
|
|
+import com.tzld.piaoquan.risk.control.model.po.QywxCorpBlacklistUser;
|
|
|
+import com.tzld.piaoquan.risk.control.model.qywx.RoomBlacklistResponse;
|
|
|
+import com.tzld.piaoquan.risk.control.service.impl.RiskUserOperateService;
|
|
|
+import com.tzld.piaoquan.risk.control.service.sync.BlacklistSyncService;
|
|
|
+import com.tzld.piaoquan.risk.control.service.sync.data.BlacklistDataService;
|
|
|
+import com.tzld.piaoquan.risk.control.service.sync.lock.RedisDistributedLockService;
|
|
|
+import com.tzld.piaoquan.risk.control.service.sync.strategy.SyncDecision;
|
|
|
+import com.tzld.piaoquan.risk.control.service.sync.strategy.SyncTimeStrategy;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 黑名单同步服务实现类
|
|
|
+ *
|
|
|
+ * @author 风控系统开发团队
|
|
|
+ * @since 1.0.0
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class BlacklistSyncServiceImpl implements BlacklistSyncService {
|
|
|
+
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(BlacklistSyncServiceImpl.class);
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SyncTimeStrategy syncTimeStrategy;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private BlacklistDataService blacklistDataService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RiskUserOperateService riskUserOperateService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RedisDistributedLockService lockService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void intelligentSyncCheck() {
|
|
|
+ // 1. 获取所有企业ID
|
|
|
+ List<Long> corpIds = blacklistDataService.getAllCorpIds();
|
|
|
+ log.info("开始智能同步检查,共{}个企业", corpIds.size());
|
|
|
+
|
|
|
+ int syncCount = 0;
|
|
|
+ int skipCount = 0;
|
|
|
+ int lockFailedCount = 0;
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 2. 遍历企业ID,进行智能决策
|
|
|
+ for (Long corpId : corpIds) {
|
|
|
+ try {
|
|
|
+ // 3. 判断是否需要同步
|
|
|
+ SyncDecision decision = syncTimeStrategy.shouldSync(corpId);
|
|
|
+
|
|
|
+ log.info("企业{}同步决策: {}, 原因: {}", corpId,
|
|
|
+ decision.isShouldSync() ? "执行同步" : "跳过同步",
|
|
|
+ decision.getReason());
|
|
|
+
|
|
|
+ if (decision.isShouldSync()) {
|
|
|
+ // 4. 尝试获取企业级分布式锁
|
|
|
+ String lockKey = "corp_sync_" + corpId;
|
|
|
+
|
|
|
+ if (lockService.tryLock(lockKey, 300)) { // 锁定5分钟
|
|
|
+ try {
|
|
|
+ log.info("企业{}获取分布式锁成功,开始执行同步", corpId);
|
|
|
+
|
|
|
+ // 5. 执行同步
|
|
|
+ boolean success = executeSyncForCorp(corpId);
|
|
|
+ if (success) {
|
|
|
+ // 6. 更新同步时间
|
|
|
+ syncTimeStrategy.updateLastSyncTime(corpId);
|
|
|
+ syncCount++;
|
|
|
+ log.info("企业{}同步成功", corpId);
|
|
|
+ } else {
|
|
|
+ log.error("企业{}同步失败", corpId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ // 7. 释放企业级锁
|
|
|
+ lockService.releaseLock(lockKey);
|
|
|
+ log.debug("企业{}释放分布式锁", corpId);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ lockFailedCount++;
|
|
|
+ log.info("企业{}获取分布式锁失败,其他实例正在同步该企业", corpId);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ skipCount++;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("企业{}同步检查异常", corpId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ long duration = endTime - startTime;
|
|
|
+
|
|
|
+ log.info("智能同步检查完成,耗时{}ms,执行同步: {}个企业,跳过同步: {}个企业,锁冲突: {}个企业",
|
|
|
+ duration, syncCount, skipCount, lockFailedCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean forceSyncCorp(Long corpId) {
|
|
|
+ if (corpId == null) {
|
|
|
+ log.warn("强制同步企业时企业ID为空");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("开始强制同步企业: {}", corpId);
|
|
|
+
|
|
|
+ // 尝试获取企业级分布式锁
|
|
|
+ String lockKey = "corp_sync_" + corpId;
|
|
|
+
|
|
|
+ if (lockService.tryLock(lockKey, 300)) { // 锁定5分钟
|
|
|
+ try {
|
|
|
+ log.info("企业{}获取分布式锁成功,开始强制同步", corpId);
|
|
|
+
|
|
|
+ boolean success = executeSyncForCorp(corpId);
|
|
|
+ if (success) {
|
|
|
+ // 更新同步时间
|
|
|
+ syncTimeStrategy.updateLastSyncTime(corpId);
|
|
|
+ log.info("企业{}强制同步成功", corpId);
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ log.error("企业{}强制同步失败", corpId);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ lockService.releaseLock(lockKey);
|
|
|
+ log.debug("企业{}释放分布式锁", corpId);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("企业{}获取分布式锁失败,其他实例正在同步该企业", corpId);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行企业同步
|
|
|
+ *
|
|
|
+ * @param corpId 企业ID
|
|
|
+ * @return true表示同步成功,false表示同步失败
|
|
|
+ */
|
|
|
+ private boolean executeSyncForCorp(Long corpId) {
|
|
|
+ try {
|
|
|
+ // 获取登录用户UUID
|
|
|
+ String uuid = blacklistDataService.getLoginUserUuid(corpId);
|
|
|
+ if (uuid == null) {
|
|
|
+ log.warn("企业{}未找到登录用户", corpId);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("企业{}开始同步,使用UUID: {}", corpId, uuid);
|
|
|
+
|
|
|
+ // 调用API获取黑名单数据
|
|
|
+ RoomBlacklistResponse response = riskUserOperateService.getRoomBlacklist(uuid);
|
|
|
+
|
|
|
+ if (response != null && response.getErrcode() == 0) {
|
|
|
+ // 清理旧数据
|
|
|
+ int deletedCount = blacklistDataService.clearCorpBlacklist(corpId);
|
|
|
+ log.debug("企业{}清理旧黑名单数据完成,删除{}条记录", corpId, deletedCount);
|
|
|
+
|
|
|
+ // 插入新数据
|
|
|
+ if (response.getData() != null && response.getData().getList() != null) {
|
|
|
+ List<QywxCorpBlacklistUser> users = convertToCorpBlacklistUsers(
|
|
|
+ response.getData().getList(), corpId);
|
|
|
+
|
|
|
+ if (!users.isEmpty()) {
|
|
|
+ int insertedCount = blacklistDataService.batchInsertBlacklist(users);
|
|
|
+ log.info("企业{}同步完成,更新{}条黑名单记录", corpId, insertedCount);
|
|
|
+ } else {
|
|
|
+ log.info("企业{}同步完成,黑名单为空", corpId);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.info("企业{}同步完成,API返回数据为空", corpId);
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ log.error("企业{}API调用失败,错误码: {}, 错误信息: {}",
|
|
|
+ corpId, response != null ? response.getErrcode() : -1,
|
|
|
+ response != null ? response.getErrmsg() : "未知错误");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("企业{}同步执行异常", corpId, e);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 转换API响应数据为数据库实体
|
|
|
+ *
|
|
|
+ * @param items API响应的黑名单项列表
|
|
|
+ * @param corpId 企业ID
|
|
|
+ * @return 数据库实体列表
|
|
|
+ */
|
|
|
+ private List<QywxCorpBlacklistUser> convertToCorpBlacklistUsers(
|
|
|
+ List<RoomBlacklistResponse.BlacklistItem> items, Long corpId) {
|
|
|
+
|
|
|
+ List<QywxCorpBlacklistUser> users = new ArrayList<>();
|
|
|
+ Date now = new Date();
|
|
|
+
|
|
|
+ for (RoomBlacklistResponse.BlacklistItem item : items) {
|
|
|
+ if (item.getBlacklist_vid() != null) {
|
|
|
+ QywxCorpBlacklistUser user = new QywxCorpBlacklistUser();
|
|
|
+ user.setVid(item.getBlacklist_vid());
|
|
|
+ user.setCorpId(corpId);
|
|
|
+ // 判断时间戳长度,如果不是13位才补齐
|
|
|
+ long timestamp = item.getCreate_time();
|
|
|
+ if (String.valueOf(timestamp).length() == 10) {
|
|
|
+ timestamp = timestamp * 1000; // 10位时间戳转13位
|
|
|
+ }
|
|
|
+ user.setBlackTime(new Date(timestamp));
|
|
|
+ user.setCreateTime(now);
|
|
|
+ users.add(user);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return users;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getSyncStatistics() {
|
|
|
+ try {
|
|
|
+ List<Long> corpIds = blacklistDataService.getAllCorpIds();
|
|
|
+ int totalCorps = corpIds.size();
|
|
|
+ int totalBlacklistUsers = 0;
|
|
|
+
|
|
|
+ for (Long corpId : corpIds) {
|
|
|
+ totalBlacklistUsers += blacklistDataService.getCorpBlacklistCount(corpId);
|
|
|
+ }
|
|
|
+
|
|
|
+ return String.format("同步统计: 企业总数=%d, 黑名单用户总数=%d", totalCorps, totalBlacklistUsers);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("获取同步统计信息异常", e);
|
|
|
+ return "同步统计信息获取失败: " + e.getMessage();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|