updatePublishedMsgDaily.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. """
  2. @author: luojunhui
  3. @description: update daily information into official articles v2
  4. """
  5. import time
  6. import json
  7. import threading
  8. import schedule
  9. from tqdm import tqdm
  10. from datetime import datetime
  11. from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi
  12. ARTICLE_TABLE = "official_articles_v2"
  13. def get_accounts_v1():
  14. """
  15. 获取账号信息
  16. :return: [{}, {},...], [{}, {}, {}...]
  17. """
  18. with open("config/accountInfoV0914.json", encoding="utf-8") as f:
  19. account_list = json.loads(f.read())
  20. subscription_account = [i for i in account_list if i['type'] == '订阅号']
  21. server_account = [i for i in account_list if i['type'] == '服务号']
  22. return subscription_account, server_account
  23. def get_accounts():
  24. """
  25. 从 aigc 数据库中获取目前处于发布状态的账号
  26. :return:
  27. "name": line[0],
  28. "ghId": line[1],
  29. "follower_count": line[2],
  30. "account_init_time": int(line[3] / 1000),
  31. "account_type": line[4],
  32. "account_auth": line[5]
  33. """
  34. account_list = aiditApi.get_publish_account_from_aigc()
  35. subscription_account = [i for i in account_list if i['account_type'] in {0, 1}]
  36. server_account = [i for i in account_list if i['account_type'] == 2]
  37. return subscription_account, server_account
  38. def insert_each_msg(db_client, item, account_name, msg_list):
  39. """
  40. 把消息数据更新到数据库中
  41. :param item:
  42. :param db_client:
  43. :param account_name:
  44. :param msg_list:
  45. :return:
  46. """
  47. gh_id = item['ghId']
  48. account_init_timestamp = item['account_init_timestamp']
  49. for info in msg_list:
  50. baseInfo = info.get("BaseInfo", {})
  51. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  52. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  53. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  54. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  55. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  56. if detail_article_list:
  57. for article in detail_article_list:
  58. title = article.get("Title", None)
  59. Digest = article.get("Digest", None)
  60. ItemIndex = article.get("ItemIndex", None)
  61. ContentUrl = article.get("ContentUrl", None)
  62. SourceUrl = article.get("SourceUrl", None)
  63. CoverImgUrl = article.get("CoverImgUrl", None)
  64. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  65. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  66. ItemShowType = article.get("ItemShowType", None)
  67. IsOriginal = article.get("IsOriginal", None)
  68. ShowDesc = article.get("ShowDesc", None)
  69. show_stat = Functions().show_desc_to_sta(ShowDesc)
  70. ori_content = article.get("ori_content", None)
  71. show_view_count = show_stat.get("show_view_count", 0)
  72. show_like_count = show_stat.get("show_like_count", 0)
  73. show_zs_count = show_stat.get("show_zs_count", 0)
  74. show_pay_count = show_stat.get("show_pay_count", 0)
  75. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  76. status = 1 if int(time.time()) - account_init_timestamp > 14 * 24 * 3600 else 0
  77. info_tuple = (
  78. gh_id,
  79. account_name,
  80. appMsgId,
  81. title,
  82. Type,
  83. createTime,
  84. updateTime,
  85. Digest,
  86. ItemIndex,
  87. ContentUrl,
  88. SourceUrl,
  89. CoverImgUrl,
  90. CoverImgUrl_1_1,
  91. CoverImgUrl_235_1,
  92. ItemShowType,
  93. IsOriginal,
  94. ShowDesc,
  95. ori_content,
  96. show_view_count,
  97. show_like_count,
  98. show_zs_count,
  99. show_pay_count,
  100. wx_sn,
  101. json.dumps(baseInfo, ensure_ascii=False),
  102. Functions().str_to_md5(title),
  103. status
  104. )
  105. try:
  106. insert_sql = f"""
  107. INSERT INTO {ARTICLE_TABLE}
  108. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  109. values
  110. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  111. """
  112. db_client.update(sql=insert_sql, params=info_tuple)
  113. log(
  114. task="updatePublishedMsgDaily",
  115. function="insert_each_msg",
  116. message="插入文章数据成功",
  117. data={
  118. "info": info_tuple
  119. }
  120. )
  121. except Exception as e:
  122. try:
  123. update_sql = f"""
  124. UPDATE {ARTICLE_TABLE}
  125. SET show_view_count = %s, show_like_count=%s
  126. WHERE wx_sn = %s;
  127. """
  128. db_client.update(sql=update_sql,
  129. params=(show_view_count, show_like_count, wx_sn))
  130. log(
  131. task="updatePublishedMsgDaily",
  132. function="insert_each_msg",
  133. message="更新文章数据成功",
  134. data={
  135. "wxSn": wx_sn,
  136. "likeCount": show_like_count,
  137. "viewCount": show_view_count
  138. }
  139. )
  140. except Exception as e:
  141. log(
  142. task="updatePublishedMsgDaily",
  143. function="insert_each_msg",
  144. message="更新文章失败, 报错原因是: {}".format(e),
  145. status="fail"
  146. )
  147. continue
  148. def update_each_account(db_client, item, account_name, latest_update_time, cursor=None):
  149. """
  150. 更新每一个账号信息
  151. :param item:
  152. :param account_name:
  153. :param cursor:
  154. :param latest_update_time: 最新更新时间
  155. :param db_client: 数据库连接信息
  156. :return: None
  157. """
  158. gh_id = item['ghId']
  159. response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
  160. msg_list = response.get("data", {}).get("data", {})
  161. if msg_list:
  162. # do
  163. last_article_in_this_msg = msg_list[-1]
  164. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  165. last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  166. resdata = WeixinSpider().get_account_by_url(last_url)
  167. check_id = resdata['data'].get('data', {}).get('wx_gh')
  168. if check_id == gh_id:
  169. insert_each_msg(
  170. db_client=db_client,
  171. item=item,
  172. account_name=account_name,
  173. msg_list=msg_list
  174. )
  175. if last_time_stamp_in_this_msg > latest_update_time:
  176. next_cursor = response['data']['next_cursor']
  177. return update_each_account(
  178. db_client=db_client,
  179. item=item,
  180. account_name=account_name,
  181. latest_update_time=latest_update_time,
  182. cursor=next_cursor
  183. )
  184. log(
  185. task="updatePublishedMsgDaily",
  186. function="update_each_account",
  187. message="账号文章更新成功",
  188. data=response
  189. )
  190. else:
  191. log(
  192. task="updatePublishedMsgDaily",
  193. function="update_each_account",
  194. message="账号文章更新失败",
  195. status="fail",
  196. data=response
  197. )
  198. return
  199. def check_account_info(db_client, gh_id):
  200. """
  201. 通过 gh_id查询视频信息
  202. :param db_client:
  203. :param gh_id:
  204. :return:
  205. """
  206. sql = f"""
  207. SELECT accountName, updateTime
  208. FROM {ARTICLE_TABLE}
  209. WHERE ghId = '{gh_id}'
  210. ORDER BY updateTime DESC;
  211. """
  212. result = db_client.select(sql)
  213. if result:
  214. account_name, update_time = result[0]
  215. return {
  216. "account_name": account_name,
  217. "update_time": update_time,
  218. "account_type": "history"
  219. }
  220. else:
  221. return {
  222. "account_name": "",
  223. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  224. "account_type": "new"
  225. }
  226. def update_single_account(db_client, sub_item):
  227. """
  228. :param sub_item:
  229. :param db_client:
  230. :return:
  231. """
  232. gh_id = sub_item['ghId']
  233. account_info = check_account_info(db_client, gh_id)
  234. account_name = account_info['account_name']
  235. update_time = account_info['update_time']
  236. update_each_account(
  237. db_client=db_client,
  238. item=sub_item,
  239. account_name=account_name,
  240. latest_update_time=update_time
  241. )
  242. def check_single_account(db_client, account_item):
  243. """
  244. 校验每个账号是否更新
  245. :param db_client:
  246. :param account_item:
  247. :return: True / False
  248. """
  249. gh_id = account_item['ghId']
  250. account_type = account_item['account_type']
  251. today_str = datetime.today().strftime("%Y-%m-%d")
  252. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  253. today_timestamp = today_date_time.timestamp()
  254. sql = f"""
  255. SELECT updateTime
  256. FROM {ARTICLE_TABLE}
  257. WHERE ghId = '{gh_id}'
  258. ORDER BY updateTime
  259. DESC;
  260. """
  261. try:
  262. latest_update_time = db_client.select(sql)[0][0]
  263. # 判断该账号当天发布的文章是否被收集
  264. if account_type in {0, 1}:
  265. if int(latest_update_time) > int(today_timestamp):
  266. return True
  267. else:
  268. return False
  269. else:
  270. if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
  271. return True
  272. else:
  273. return False
  274. except Exception as e:
  275. print("updateTime Error -- {}".format(e))
  276. return False
  277. def update_job():
  278. """
  279. 更新任务
  280. :return:
  281. """
  282. db_client = PQMySQL()
  283. sub_accounts, server_accounts = get_accounts()
  284. s_count = 0
  285. f_count = 0
  286. for sub_item in tqdm(sub_accounts):
  287. try:
  288. update_single_account(db_client, sub_item)
  289. s_count += 1
  290. time.sleep(5)
  291. except Exception as e:
  292. f_count += 1
  293. log(
  294. task="updatePublishedMsgDaily",
  295. function="update_job",
  296. message="单个账号文章更新失败, 报错信息是: {}".format(e),
  297. status="fail",
  298. )
  299. log(
  300. task="updatePublishedMsgDaily",
  301. function="update_job",
  302. message="订阅号更新完成",
  303. data={
  304. "success": s_count,
  305. "fail": f_count
  306. }
  307. )
  308. if f_count / (s_count + f_count) > 0.3:
  309. bot(
  310. title="订阅号超过 30% 的账号更新失败",
  311. detail={
  312. "success": s_count,
  313. "fail": f_count,
  314. "failRate": f_count / (s_count + f_count)
  315. }
  316. )
  317. for sub_item in tqdm(server_accounts):
  318. try:
  319. update_single_account(db_client, sub_item)
  320. time.sleep(5)
  321. except Exception as e:
  322. print(e)
  323. def check_job():
  324. """
  325. 校验任务
  326. :return:
  327. """
  328. db_client = PQMySQL()
  329. sub_accounts, server_accounts = get_accounts()
  330. fail_list = []
  331. # account_list = sub_accounts + server_accounts
  332. account_list = sub_accounts
  333. # check and rework if fail
  334. for sub_item in tqdm(account_list):
  335. res = check_single_account(db_client, sub_item)
  336. if not res:
  337. update_single_account(db_client, sub_item)
  338. # check whether success and bot if fails
  339. for sub_item in tqdm(account_list):
  340. res = check_single_account(db_client, sub_item)
  341. if not res:
  342. fail_list.append(sub_item)
  343. if fail_list:
  344. try:
  345. bot(
  346. title="日常报警, 存在账号更新失败",
  347. detail=fail_list
  348. )
  349. except Exception as e:
  350. print("Timeout Error: {}".format(e))
  351. def job_with_thread(job_func):
  352. """
  353. 每个任务放到单个线程中
  354. :param job_func:
  355. :return:
  356. """
  357. job_thread = threading.Thread(target=job_func)
  358. job_thread.start()
  359. if __name__ == '__main__':
  360. schedule.every().day.at("20:50").do(job_with_thread, update_job)
  361. schedule.every().day.at("21:45").do(job_with_thread, check_job)
  362. while True:
  363. schedule.run_pending()
  364. time.sleep(1)