WeComUserDataJob.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package com.tzld.piaoquan.wecom.job;
  2. import com.alibaba.fastjson.JSONArray;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.tzld.piaoquan.wecom.common.constant.TimeConstant;
  5. import com.tzld.piaoquan.wecom.dao.mapper.StaffMapper;
  6. import com.tzld.piaoquan.wecom.dao.mapper.StaffWithUserMapper;
  7. import com.tzld.piaoquan.wecom.dao.mapper.UserMapper;
  8. import com.tzld.piaoquan.wecom.model.bo.XxlJobParam;
  9. import com.tzld.piaoquan.wecom.model.po.*;
  10. import com.tzld.piaoquan.wecom.service.AccessTokenService;
  11. import com.tzld.piaoquan.wecom.utils.HttpClientUtil;
  12. import com.tzld.piaoquan.wecom.utils.HttpPoolClient;
  13. import com.tzld.piaoquan.wecom.utils.page.Page;
  14. import com.xxl.job.core.biz.model.ReturnT;
  15. import com.xxl.job.core.handler.annotation.XxlJob;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.apache.commons.lang3.ObjectUtils;
  18. import org.apache.commons.lang3.StringUtils;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.stereotype.Component;
  21. import org.springframework.util.CollectionUtils;
  22. import java.io.IOException;
  23. import java.util.ArrayList;
  24. import java.util.List;
  25. import static com.tzld.piaoquan.wecom.common.constant.WeComConstant.*;
  26. @Slf4j
  27. @Component
  28. public class WeComUserDataJob {
  29. private static final HttpPoolClient httpPoolClientDefault = HttpClientUtil.create(30000, 30000, 2000, 5000, 5, 30000);
  30. final int size = 100;
  31. @Autowired
  32. private UserMapper userMapper;
  33. @Autowired
  34. private AccessTokenService accessTokenService;
  35. @Autowired
  36. private StaffMapper staffMapper;
  37. @Autowired
  38. private StaffWithUserMapper staffWithUserMapper;
  39. @Autowired
  40. private WeComUserDataJob1 weComUserDataJob1;
  41. @XxlJob("insertStaffWithUserJob")
  42. public ReturnT<String> insertStaffWithUser(String param) {
  43. XxlJobParam xxlJobParam = new XxlJobParam();
  44. if (StringUtils.isNotEmpty(param)) {
  45. xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class);
  46. }
  47. if (xxlJobParam.getStartTime() == null) {
  48. xxlJobParam.setStartTime(1720540800L);
  49. }
  50. if (xxlJobParam.getEndTime() == null) {
  51. xxlJobParam.setEndTime(System.currentTimeMillis() / 1000);
  52. }
  53. StaffExample example = new StaffExample();
  54. StaffExample.Criteria criteria = example.createCriteria();
  55. if (xxlJobParam.getStaffId() != null) {
  56. criteria.andIdEqualTo(xxlJobParam.getStaffId());
  57. }
  58. List<Staff> staffList = staffMapper.selectByExample(example);
  59. for (Staff staff : staffList) {
  60. insertAllUser(xxlJobParam.getStartTime(), xxlJobParam.getEndTime(), staff);
  61. }
  62. return ReturnT.SUCCESS;
  63. }
  64. //初始化用户使用此任务
  65. private void insertAllUser(long startTime, long endTime, Staff staff) {
  66. try {
  67. Integer total = getUserTotal(startTime, endTime, staff.getStaffExtId());
  68. if (total == null || total == 0) {
  69. return;
  70. }
  71. int page = total / size + 1;
  72. int sum = 0;
  73. for (int i = 0; i < page; i++) {
  74. String res = getUser(size, i * size, startTime, endTime, staff.getStaffExtId());
  75. log.info("insertAllUser size={}, i={}, staffExtId = {}, startTime={}, endTime={}, res={}", size, i, staff.getStaffExtId(), startTime, endTime, res);
  76. if (ObjectUtils.isEmpty(res)) {
  77. continue;
  78. }
  79. JSONObject jsonObject = JSONObject.parseObject(res);
  80. JSONArray jsonArray = jsonObject.getJSONArray("external_user_list");
  81. List<StaffWithUser> insertStaffWithUserList = new ArrayList<>();
  82. for (int j = 0; j < jsonArray.size(); j++) {
  83. String externalUserId3rdParty = (String) jsonArray.getJSONObject(j).get("id");
  84. Long userId = userMapper.selectIdByExternalUserId3rdParty(externalUserId3rdParty);
  85. if (userId == null) {
  86. jsonArray.getJSONObject(j).put("id", null);
  87. User user = jsonArray.getJSONObject(j).toJavaObject(User.class);
  88. user.setExternalUserId3rdParty(externalUserId3rdParty);
  89. userMapper.insert(user);
  90. userId = user.getId();
  91. }
  92. if (userId == null) {
  93. System.out.println("插入数据异常:" + jsonArray.getJSONObject(j));
  94. continue;
  95. }
  96. StaffWithUserExample staffWithUserExample = new StaffWithUserExample();
  97. staffWithUserExample.createCriteria().andStaffIdEqualTo(staff.getId()).andUserIdEqualTo(userId);
  98. List<StaffWithUser> staffWithUserList = staffWithUserMapper.selectByExample(staffWithUserExample);
  99. if (CollectionUtils.isEmpty(staffWithUserList)) {
  100. StaffWithUser staffWithUser = new StaffWithUser();
  101. staffWithUser.setUserId(userId);
  102. staffWithUser.setStaffId(staff.getId());
  103. insertStaffWithUserList.add(staffWithUser);
  104. }
  105. sum++;
  106. }
  107. if (!CollectionUtils.isEmpty(insertStaffWithUserList)) {
  108. staffWithUserMapper.insertList(insertStaffWithUserList);
  109. }
  110. if (jsonArray.size() < size) {
  111. break;
  112. }
  113. }
  114. if (total > sum) {
  115. log.error("insertAllUser插入数量不足 total = {}, sum={}", total, sum);
  116. }
  117. } catch (Exception e) {
  118. log.error("insertAllUser error", e);
  119. }
  120. }
  121. private Integer getUserTotal(Long startTime, Long endTime, String staffId) throws IOException {
  122. String res = getUser(1, 0, startTime, endTime, staffId);
  123. JSONObject jsonObject = JSONObject.parseObject(res);
  124. return jsonObject.getInteger("total");
  125. }
  126. private String getUser(Integer limit, Integer offset, Long startTime, Long endTime, String staffExtId) throws IOException {
  127. String accessToken = accessTokenService.getAccessToken();
  128. String url = GET_USER_URL
  129. + "?access_token=" + accessToken
  130. + "&limit=" + limit + "&offset=" + offset + "&start_time=" + startTime + "&end_time=" + endTime;
  131. if (StringUtils.isNotEmpty(staffExtId)) {
  132. url = url + "&staff_id=" + staffExtId;
  133. }
  134. return httpPoolClientDefault.get(url);
  135. }
  136. @XxlJob("updateStaffWithUserJob")
  137. public ReturnT<String> updateStaffWithUser(String param) {
  138. XxlJobParam xxlJobParam = new XxlJobParam();
  139. if (StringUtils.isNotEmpty(param)) {
  140. xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class);
  141. }
  142. if (xxlJobParam.getStartTime() == null) {
  143. UserExample userExample = new UserExample();
  144. userExample.setOrderByClause("create_time desc");
  145. userExample.setPage(new Page<>(1, 1));
  146. List<User> userList = userMapper.selectByExample(userExample);
  147. xxlJobParam.setStartTime(userList.get(0).getCreateTime().getTime() / 1000 - 2L * TimeConstant.HOUR);
  148. }
  149. if (xxlJobParam.getEndTime() == null) {
  150. xxlJobParam.setEndTime(System.currentTimeMillis() / 1000);
  151. }
  152. StaffExample example = new StaffExample();
  153. StaffExample.Criteria criteria = example.createCriteria();
  154. if (xxlJobParam.getStaffId() != null) {
  155. criteria.andIdEqualTo(xxlJobParam.getStaffId());
  156. }
  157. List<Staff> staffList = staffMapper.selectByExample(example);
  158. for (Staff staff : staffList) {
  159. updateUser(xxlJobParam.getStartTime(), xxlJobParam.getEndTime(), staff);
  160. }
  161. return ReturnT.SUCCESS;
  162. }
  163. private void updateUser(long startTime, long endTime, Staff staff) {
  164. try {
  165. Integer total = getUpdateUserTotal(startTime, endTime, staff.getStaffExtId());
  166. if (total == null || total == 0) {
  167. return;
  168. }
  169. int page = total / size + 1;
  170. int sum = 0;
  171. for (int i = 0; i < page; i++) {
  172. String res = getUpdateUser(size, i * size, startTime, endTime, staff.getStaffExtId());
  173. log.info("updateUser size={}, i={}, staffExtId = {}, startTime={}, endTime={}, res={}",
  174. size, i, staff.getStaffExtId(), startTime, endTime, res);
  175. if (ObjectUtils.isEmpty(res)) {
  176. continue;
  177. }
  178. JSONObject jsonObject = JSONObject.parseObject(res);
  179. JSONArray jsonArray = jsonObject.getJSONArray("external_user_list");
  180. for (int j = 0; j < jsonArray.size(); j++) {
  181. String id = (String) jsonArray.getJSONObject(j).get("id");
  182. jsonArray.getJSONObject(j).put("id", null);
  183. User user = jsonArray.getJSONObject(j).toJavaObject(User.class);
  184. user.setExternalUserId3rdParty(id);
  185. UserExample example = new UserExample();
  186. example.createCriteria().andExternalUserId3rdPartyEqualTo(user.getExternalUserId3rdParty());
  187. List<User> list = userMapper.selectByExample(example);
  188. Long userId;
  189. if (CollectionUtils.isEmpty(list)) {
  190. //没有用户,走插入逻辑
  191. String externalUserId = weComUserDataJob1.getExternalUserId(user.getExternalUserId3rdParty());
  192. user.setExternalUserId(externalUserId);
  193. userMapper.insert(user);
  194. userId = user.getId();
  195. } else {
  196. User oldUser = list.get(0);
  197. user.setId(oldUser.getId());
  198. userMapper.updateByPrimaryKeySelective(user);
  199. userId = oldUser.getId();
  200. }
  201. if (userId == null) {
  202. continue;
  203. }
  204. StaffWithUserExample staffWithUserExample = new StaffWithUserExample();
  205. staffWithUserExample.createCriteria().andStaffIdEqualTo(staff.getId()).andUserIdEqualTo(userId);
  206. List<StaffWithUser> staffWithUserList = staffWithUserMapper.selectByExample(staffWithUserExample);
  207. if (CollectionUtils.isEmpty(staffWithUserList)) {
  208. StaffWithUser staffWithUser = new StaffWithUser();
  209. staffWithUser.setStaffId(staff.getId());
  210. staffWithUser.setUserId(userId);
  211. staffWithUserMapper.insert(staffWithUser);
  212. }
  213. sum++;
  214. }
  215. if (jsonArray.size() < size) {
  216. break;
  217. }
  218. }
  219. if (total > sum) {
  220. log.error("updateUser插入数量不足 total = {}, sum={}", total, sum);
  221. }
  222. } catch (IOException e) {
  223. log.error("updateUser error", e);
  224. }
  225. }
  226. private Integer getUpdateUserTotal(Long startTime, Long endTime, String staffExtId) throws IOException {
  227. String res = getUpdateUser(1, 0, startTime, endTime, staffExtId);
  228. JSONObject jsonObject = JSONObject.parseObject(res);
  229. return jsonObject.getInteger("total");
  230. }
  231. private String getUpdateUser(Integer limit, Integer offset, Long startTime, Long endTime, String staffExtId) throws IOException {
  232. String accessToken = accessTokenService.getAccessToken();
  233. String url = UPDATE_USER_URL
  234. + "?access_token=" + accessToken
  235. + "&limit=" + limit + "&offset=" + offset + "&start_update_time=" + startTime + "&end_update_time=" + endTime
  236. + "&source=external_user";
  237. if (StringUtils.isNotEmpty(staffExtId)) {
  238. url = url + "&staff_id=" + staffExtId;
  239. }
  240. return httpPoolClientDefault.get(url);
  241. }
  242. @XxlJob("deleteUserJob")
  243. public ReturnT<String> deleteUserJob(String param) {
  244. XxlJobParam xxlJobParam = new XxlJobParam();
  245. if (StringUtils.isNotEmpty(param)) {
  246. xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class);
  247. }
  248. if (xxlJobParam.getStartTime() == null) {
  249. UserExample userExample = new UserExample();
  250. userExample.createCriteria().andIsDeleteEqualTo(1);
  251. userExample.setOrderByClause("deleted_at desc");
  252. userExample.setPage(new Page<>(1, 1));
  253. List<User> userList = userMapper.selectByExample(userExample);
  254. if (CollectionUtils.isEmpty(userList)) {
  255. xxlJobParam.setStartTime(1720540800L);
  256. } else {
  257. xxlJobParam.setStartTime(userList.get(0).getDeletedAt());
  258. }
  259. }
  260. if (xxlJobParam.getEndTime() == null) {
  261. xxlJobParam.setEndTime(System.currentTimeMillis() / 1000);
  262. }
  263. deleteUser(xxlJobParam.getStartTime(), xxlJobParam.getEndTime());
  264. return ReturnT.SUCCESS;
  265. }
  266. //查询删除用户并更新
  267. private void deleteUser(long startTime, long endTime) {
  268. try {
  269. Integer total = getDeleteUserTotal(startTime, endTime);
  270. if (total == null || total == 0) {
  271. return;
  272. }
  273. int page = total / size + 1;
  274. int sum = 0;
  275. for (int i = 0; i < page; i++) {
  276. String res = getDeleteUser(size, i * size, startTime, endTime);
  277. log.info("deleteUser size={}, i={}, startTime={}, endTime={}, res={}", size, i, startTime, endTime, res);
  278. if (ObjectUtils.isEmpty(res)) {
  279. continue;
  280. }
  281. JSONObject jsonObject = JSONObject.parseObject(res);
  282. JSONArray jsonArray = jsonObject.getJSONArray("external_user_list");
  283. for (int j = 0; j < jsonArray.size(); j++) {
  284. JSONObject staffRelation = jsonArray.getJSONObject(j).getJSONObject("staff_relation");
  285. Long deletedAt = staffRelation.getLong("deleted_at");
  286. String externalUserId3rdParty = staffRelation.getString("external_user_ext_id");
  287. Long userId = userMapper.selectIdByExternalUserId3rdParty(externalUserId3rdParty);
  288. User updateUser = new User();
  289. updateUser.setId(userId);
  290. updateUser.setIsDelete(1);
  291. updateUser.setDeletedAt(deletedAt);
  292. userMapper.updateByPrimaryKeySelective(updateUser);
  293. sum++;
  294. }
  295. }
  296. if (total > sum) {
  297. log.error("deleteUser数量不足 total = {}, sum={}", total, sum);
  298. }
  299. } catch (Exception e) {
  300. log.error("deleteUser error", e);
  301. }
  302. }
  303. private Integer getDeleteUserTotal(Long startTime, Long endTime) throws IOException {
  304. String res = getDeleteUser(1, 0, startTime, endTime);
  305. JSONObject jsonObject = JSONObject.parseObject(res);
  306. return jsonObject.getInteger("total_count");
  307. }
  308. private String getDeleteUser(Integer limit, Integer offset, Long startTime, Long endTime) throws IOException {
  309. String accessToken = accessTokenService.getAccessToken();
  310. String url = GET_DELETE_USER_URL
  311. + "?access_token=" + accessToken
  312. + "&limit=" + limit + "&offset=" + offset + "&start_time=" + startTime + "&end_time=" + endTime;
  313. return httpPoolClientDefault.get(url);
  314. }
  315. }