updatePublishedMsgDaily.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. """
  2. @author: luojunhui
  3. @description: update daily information into official articles v2
  4. """
  5. import time
  6. import json
  7. import threading
  8. import schedule
  9. from tqdm import tqdm
  10. from datetime import datetime
  11. from applications import PQMySQL, WeixinSpider, Functions, log, bot
  12. def getAccounts():
  13. """
  14. 获取账号信息
  15. :return: [{}, {},...], [{}, {}, {}...]
  16. """
  17. with open("config/accountInfoV0914.json", encoding="utf-8") as f:
  18. account_list = json.loads(f.read())
  19. subscription_account = [i for i in account_list if i['type'] == '订阅号']
  20. server_account = [i for i in account_list if i['type'] == '服务号']
  21. return subscription_account, server_account
  22. def insertEachMsg(db_client, gh_id, account_name, msg_list):
  23. """
  24. 把消息数据更新到数据库中
  25. :param db_client:
  26. :param account_name:
  27. :param gh_id:
  28. :param msg_list:
  29. :return:
  30. """
  31. for info in msg_list:
  32. baseInfo = info.get("BaseInfo", {})
  33. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  34. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  35. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  36. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  37. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  38. if detail_article_list:
  39. for article in detail_article_list:
  40. title = article.get("Title", None)
  41. Digest = article.get("Digest", None)
  42. ItemIndex = article.get("ItemIndex", None)
  43. ContentUrl = article.get("ContentUrl", None)
  44. SourceUrl = article.get("SourceUrl", None)
  45. CoverImgUrl = article.get("CoverImgUrl", None)
  46. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  47. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  48. ItemShowType = article.get("ItemShowType", None)
  49. IsOriginal = article.get("IsOriginal", None)
  50. ShowDesc = article.get("ShowDesc", None)
  51. show_stat = Functions().show_desc_to_sta(ShowDesc)
  52. ori_content = article.get("ori_content", None)
  53. show_view_count = show_stat.get("show_view_count", 0)
  54. show_like_count = show_stat.get("show_like_count", 0)
  55. show_zs_count = show_stat.get("show_zs_count", 0)
  56. show_pay_count = show_stat.get("show_pay_count", 0)
  57. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  58. info_tuple = (
  59. gh_id,
  60. account_name,
  61. appMsgId,
  62. title,
  63. Type,
  64. createTime,
  65. updateTime,
  66. Digest,
  67. ItemIndex,
  68. ContentUrl,
  69. SourceUrl,
  70. CoverImgUrl,
  71. CoverImgUrl_1_1,
  72. CoverImgUrl_235_1,
  73. ItemShowType,
  74. IsOriginal,
  75. ShowDesc,
  76. ori_content,
  77. show_view_count,
  78. show_like_count,
  79. show_zs_count,
  80. show_pay_count,
  81. wx_sn,
  82. json.dumps(baseInfo, ensure_ascii=False),
  83. Functions().str_to_md5(title)
  84. )
  85. try:
  86. insert_sql = f"""
  87. INSERT INTO official_articles_v2
  88. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5)
  89. values
  90. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  91. """
  92. db_client.update(sql=insert_sql, params=info_tuple)
  93. log(
  94. task="updatePublishedMsgDaily",
  95. function="insertEachMsg",
  96. message="插入文章数据成功",
  97. data={
  98. "info": info_tuple
  99. }
  100. )
  101. except Exception as e:
  102. try:
  103. update_sql = f"""
  104. UPDATE official_articles_v2
  105. SET show_view_count = %s, show_like_count=%s
  106. WHERE wx_sn = %s;
  107. """
  108. db_client.update(sql=update_sql,
  109. params=(show_view_count, show_like_count, wx_sn))
  110. log(
  111. task="updatePublishedMsgDaily",
  112. function="insertEachMsg",
  113. message="更新文章数据成功",
  114. data={
  115. "wxSn": wx_sn,
  116. "likeCount": show_like_count,
  117. "viewCount": show_view_count
  118. }
  119. )
  120. except Exception as e:
  121. log(
  122. task="updatePublishedMsgDaily",
  123. function="insertEachMsg",
  124. message="更新文章失败, 报错原因是: {}".format(e),
  125. status="fail"
  126. )
  127. continue
  128. def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor=None):
  129. """
  130. 更新每一个账号信息
  131. :param account_name:
  132. :param cursor:
  133. :param latest_update_time: 最新更新时间
  134. :param db_client: 数据库连接信息
  135. :param gh_id: 公众号 gh_id
  136. :return: None
  137. """
  138. response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
  139. msg_list = response.get("data", {}).get("data", {})
  140. if msg_list:
  141. # do
  142. last_article_in_this_msg = msg_list[-1]
  143. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  144. last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  145. resdata = WeixinSpider().get_account_by_url(last_url)
  146. check_id = resdata['data'].get('data', {}).get('wx_gh')
  147. if check_id == gh_id:
  148. insertEachMsg(
  149. db_client=db_client,
  150. gh_id=gh_id,
  151. account_name=account_name,
  152. msg_list=msg_list
  153. )
  154. if last_time_stamp_in_this_msg > latest_update_time:
  155. next_cursor = response['data']['next_cursor']
  156. return updateEachAccount(
  157. db_client=db_client,
  158. gh_id=gh_id,
  159. account_name=account_name,
  160. latest_update_time=latest_update_time,
  161. cursor=next_cursor
  162. )
  163. log(
  164. task="updatePublishedMsgDaily",
  165. function="updateEachAccount",
  166. message="账号文章更新成功",
  167. data=response
  168. )
  169. else:
  170. log(
  171. task="updatePublishedMsgDaily",
  172. function="updateEachAccount",
  173. message="账号文章更新失败",
  174. status="fail",
  175. data=response
  176. )
  177. return
  178. def checkAccountInfo(db_client, gh_id):
  179. """
  180. 通过 gh_id查询视频信息
  181. :param db_client:
  182. :param gh_id:
  183. :return:
  184. """
  185. sql = f"""
  186. select accountName, updateTime
  187. from official_articles_v2
  188. where ghId = '{gh_id}'
  189. order by updateTime DESC;
  190. """
  191. result = db_client.select(sql)
  192. if result:
  193. account_name, update_time = result[0]
  194. return {
  195. "account_name": account_name,
  196. "update_time": update_time,
  197. "account_type": "history"
  198. }
  199. else:
  200. return {
  201. "account_name": "",
  202. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  203. "account_type": "new"
  204. }
  205. def updateSingleJob(db_client, gh_id):
  206. """
  207. :param db_client:
  208. :param gh_id:
  209. :return:
  210. """
  211. account_info = checkAccountInfo(db_client, gh_id)
  212. account_name = account_info['account_name']
  213. update_time = account_info['update_time']
  214. updateEachAccount(
  215. db_client=db_client,
  216. gh_id=gh_id,
  217. account_name=account_name,
  218. latest_update_time=update_time
  219. )
  220. def checkSingleAccount(db_client, account_item):
  221. """
  222. 校验每个账号是否更新
  223. :param db_client:
  224. :param account_item:
  225. :return: True / False
  226. """
  227. gh_id = account_item['ghId']
  228. account_type = account_item['type']
  229. today_str = datetime.today().strftime("%Y-%m-%d")
  230. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  231. today_timestamp = today_date_time.timestamp()
  232. sql = f"""
  233. select updateTime
  234. from official_articles_v2
  235. where ghId = '{gh_id}'
  236. order by updateTime
  237. desc;
  238. """
  239. try:
  240. latest_update_time = db_client.select(sql)[0][0]
  241. # 判断该账号当天发布的文章是否被收集
  242. if account_type == "订阅号":
  243. if int(latest_update_time) > int(today_timestamp):
  244. return True
  245. else:
  246. return False
  247. else:
  248. if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
  249. return True
  250. else:
  251. return False
  252. except Exception as e:
  253. print("updateTime Error -- {}".format(e))
  254. return False
  255. def updateJob():
  256. """
  257. 更新任务
  258. :return:
  259. """
  260. db_client = PQMySQL()
  261. sub_accounts, server_accounts = getAccounts()
  262. s_count = 0
  263. f_count = 0
  264. for sub_item in tqdm(sub_accounts):
  265. try:
  266. updateSingleJob(db_client, sub_item['ghId'])
  267. s_count += 1
  268. time.sleep(5)
  269. except Exception as e:
  270. f_count += 1
  271. log(
  272. task="updatePublishedMsgDaily",
  273. function="updateJob",
  274. message="单个账号文章更新失败, 报错信息是: {}".format(e),
  275. status="fail",
  276. )
  277. log(
  278. task="updatePublishedMsgDaily",
  279. function="updateJob",
  280. message="订阅号更新完成",
  281. data={
  282. "success": s_count,
  283. "fail": f_count
  284. }
  285. )
  286. if f_count / (s_count + f_count) > 0.3:
  287. bot(
  288. title="订阅号超过 30% 的账号更新失败",
  289. detail={
  290. "success": s_count,
  291. "fail": f_count,
  292. "failRate": f_count / (s_count + f_count)
  293. }
  294. )
  295. for sub_item in tqdm(server_accounts):
  296. try:
  297. updateSingleJob(db_client, sub_item['ghId'])
  298. time.sleep(5)
  299. except Exception as e:
  300. print(e)
  301. def checkJob():
  302. """
  303. 校验任务
  304. :return:
  305. """
  306. db_client = PQMySQL()
  307. sub_accounts, server_accounts = getAccounts()
  308. fail_list = []
  309. # account_list = sub_accounts + server_accounts
  310. account_list = sub_accounts
  311. # check and rework if fail
  312. for sub_item in tqdm(account_list):
  313. res = checkSingleAccount(db_client, sub_item)
  314. if not res:
  315. updateSingleJob(db_client, sub_item['ghId'])
  316. # check whether success and bot if fails
  317. for sub_item in tqdm(account_list):
  318. res = checkSingleAccount(db_client, sub_item)
  319. if not res:
  320. fail_list.append(sub_item)
  321. if fail_list:
  322. try:
  323. bot(
  324. title="日常报警, 存在账号更新失败",
  325. detail=fail_list
  326. )
  327. except Exception as e:
  328. print("Timeout Error: {}".format(e))
  329. def job_with_thread(job_func):
  330. """
  331. 每个任务放到单个线程中
  332. :param job_func:
  333. :return:
  334. """
  335. job_thread = threading.Thread(target=job_func)
  336. job_thread.start()
  337. if __name__ == '__main__':
  338. schedule.every().day.at("20:50").do(job_with_thread, updateJob)
  339. schedule.every().day.at("21:45").do(job_with_thread, checkJob)
  340. while True:
  341. schedule.run_pending()
  342. time.sleep(1)
  343. # log(
  344. # task="updatePublishedMsgDaily",
  345. # function="main",
  346. # message="更新公众号文章信息任务正常执行"
  347. # )