updatePublishedMsgDaily.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. """
  2. @author: luojunhui
  3. @description: update daily information into official articles v2
  4. """
  5. import time
  6. import json
  7. import threading
  8. import requests
  9. import schedule
  10. from tqdm import tqdm
  11. from datetime import datetime
  12. from applications import PQMySQL
  13. from applications.decoratorApi import retryOnTimeout
  14. from applications import WeixinSpider
  15. from applications import Functions
  16. @retryOnTimeout()
  17. def bot(account_list):
  18. """
  19. 机器人
  20. """
  21. url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
  22. headers = {"Content-Type": "application/json"}
  23. payload = {
  24. "msg_type": "interactive",
  25. "card": {
  26. "elements": [
  27. {
  28. "tag": "div",
  29. "text": {
  30. "content": "存在文章更新失败<at id=all></at>\n",
  31. "tag": "lark_md",
  32. },
  33. },
  34. {
  35. "tag": "div",
  36. "text": {
  37. "content": json.dumps(
  38. account_list, ensure_ascii=False, indent=4
  39. ),
  40. "tag": "lark_md",
  41. },
  42. },
  43. ],
  44. "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
  45. },
  46. }
  47. requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
  48. def getAccounts():
  49. """
  50. 获取账号信息
  51. :return: [{}, {},...], [{}, {}, {}...]
  52. """
  53. with open("config/accountInfoV2.json", encoding="utf-8") as f:
  54. account_list = json.loads(f.read())
  55. subscription_account = [i for i in account_list if i['type'] == '订阅号']
  56. server_account = [i for i in account_list if i['type'] == '服务号']
  57. return subscription_account, server_account
  58. def insertEachMsg(db_client, gh_id, account_name, msg_list):
  59. """
  60. 把消息数据更新到数据库中
  61. :param db_client:
  62. :param account_name:
  63. :param gh_id:
  64. :param msg_list:
  65. :return:
  66. """
  67. for info in msg_list:
  68. baseInfo = info.get("BaseInfo", {})
  69. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  70. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  71. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  72. # if int(time.time()) - int(updateTime) <= 20 * 60 * 60:
  73. # continue
  74. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  75. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  76. if detail_article_list:
  77. for article in detail_article_list:
  78. title = article.get("Title", None)
  79. Digest = article.get("Digest", None)
  80. ItemIndex = article.get("ItemIndex", None)
  81. ContentUrl = article.get("ContentUrl", None)
  82. SourceUrl = article.get("SourceUrl", None)
  83. CoverImgUrl = article.get("CoverImgUrl", None)
  84. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  85. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  86. ItemShowType = article.get("ItemShowType", None)
  87. IsOriginal = article.get("IsOriginal", None)
  88. ShowDesc = article.get("ShowDesc", None)
  89. show_stat = Functions().show_desc_to_sta(ShowDesc)
  90. ori_content = article.get("ori_content", None)
  91. show_view_count = show_stat.get("show_view_count", 0)
  92. show_like_count = show_stat.get("show_like_count", 0)
  93. show_zs_count = show_stat.get("show_zs_count", 0)
  94. show_pay_count = show_stat.get("show_pay_count", 0)
  95. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  96. info_tuple = (
  97. gh_id,
  98. account_name,
  99. appMsgId,
  100. title,
  101. Type,
  102. createTime,
  103. updateTime,
  104. Digest,
  105. ItemIndex,
  106. ContentUrl,
  107. SourceUrl,
  108. CoverImgUrl,
  109. CoverImgUrl_1_1,
  110. CoverImgUrl_235_1,
  111. ItemShowType,
  112. IsOriginal,
  113. ShowDesc,
  114. ori_content,
  115. show_view_count,
  116. show_like_count,
  117. show_zs_count,
  118. show_pay_count,
  119. wx_sn,
  120. json.dumps(baseInfo, ensure_ascii=False)
  121. )
  122. # print(info_tuple)
  123. try:
  124. insert_sql = f"""
  125. INSERT INTO official_articles_v2
  126. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
  127. values
  128. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  129. """
  130. db_client.update(sql=insert_sql, params=info_tuple)
  131. print("插入成功")
  132. except Exception as e:
  133. try:
  134. update_sql = f"""
  135. UPDATE official_articles_v2
  136. SET show_view_count = %s, show_like_count=%s
  137. WHERE wx_sn = %s;
  138. """
  139. db_client.update(sql=update_sql,
  140. params=(show_view_count, show_like_count, wx_sn))
  141. print("更新成功")
  142. except Exception as e:
  143. print("失败-{}".format(e))
  144. continue
  145. def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor=None):
  146. """
  147. 更新每一个账号信息
  148. :param account_name:
  149. :param cursor:
  150. :param latest_update_time: 最新更新时间
  151. :param db_client: 数据库连接信息
  152. :param gh_id: 公众号 gh_id
  153. :return: None
  154. """
  155. response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
  156. msg_list = response.get("data", {}).get("data", {})
  157. if msg_list:
  158. # do
  159. last_article_in_this_msg = msg_list[-1]
  160. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  161. last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  162. resdata = WeixinSpider().get_account_by_url(last_url)
  163. check_name = resdata['data'].get('data', {}).get('account_name')
  164. check_id = resdata['data'].get('data', {}).get('wx_gh')
  165. if check_id == gh_id:
  166. insertEachMsg(
  167. db_client=db_client,
  168. gh_id=gh_id,
  169. account_name=account_name,
  170. msg_list=msg_list
  171. )
  172. if last_time_stamp_in_this_msg > latest_update_time:
  173. next_cursor = response['data']['next_cursor']
  174. return updateEachAccount(
  175. db_client=db_client,
  176. gh_id=gh_id,
  177. account_name=check_name,
  178. latest_update_time=latest_update_time,
  179. cursor=next_cursor
  180. )
  181. else:
  182. return
  183. def checkAccountInfo(db_client, gh_id):
  184. """
  185. 通过 gh_id查询视频信息
  186. :param db_client:
  187. :param gh_id:
  188. :return:
  189. """
  190. sql = f"""
  191. select accountName, updateTime
  192. from official_articles_v2
  193. where ghId = '{gh_id}'
  194. order by updateTime DESC;
  195. """
  196. result = db_client.select(sql)
  197. if result:
  198. account_name, update_time = result[0]
  199. return {
  200. "account_name": account_name,
  201. "update_time": update_time,
  202. "account_type": "history"
  203. }
  204. else:
  205. return {
  206. "account_name": "",
  207. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  208. "account_type": "new"
  209. }
  210. def updateSingleJob(db_client, gh_id):
  211. """
  212. :param db_client:
  213. :param gh_id:
  214. :return:
  215. """
  216. account_info = checkAccountInfo(db_client, gh_id)
  217. account_name = account_info['account_name']
  218. update_time = account_info['update_time']
  219. updateEachAccount(
  220. db_client=db_client,
  221. gh_id=gh_id,
  222. account_name=account_name,
  223. latest_update_time=update_time
  224. )
  225. def checkSingleAccount(db_client, account_item):
  226. """
  227. 校验每个账号是否更新
  228. :param db_client:
  229. :param account_item:
  230. :return: True / False
  231. """
  232. gh_id = account_item['ghId']
  233. account_type = account_item['type']
  234. today_str = datetime.today().strftime("%Y-%m-%d")
  235. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  236. today_timestamp = today_date_time.timestamp()
  237. sql = f"""
  238. select updateTime
  239. from official_articles_v2
  240. where ghId = '{gh_id}'
  241. order by updateTime
  242. desc;
  243. """
  244. try:
  245. latest_update_time = db_client.select(sql)[0][0]
  246. # 判断该账号当天发布的文章是否被收集
  247. if account_type == "订阅号":
  248. if int(latest_update_time) > int(today_timestamp):
  249. return True
  250. else:
  251. return False
  252. else:
  253. if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
  254. return True
  255. else:
  256. return False
  257. except Exception as e:
  258. print("updateTime Error -- {}".format(e))
  259. return False
  260. def updateJob():
  261. """
  262. 更新任务
  263. :return:
  264. """
  265. db_client = PQMySQL()
  266. sub_accounts, server_accounts = getAccounts()
  267. for sub_item in tqdm(sub_accounts):
  268. # updateSingleJob(db_client, sub_item['ghId'])
  269. try:
  270. updateSingleJob(db_client, sub_item['ghId'])
  271. time.sleep(2)
  272. except Exception as e:
  273. print(e)
  274. print("订阅号更新完成")
  275. for sub_item in tqdm(server_accounts):
  276. try:
  277. updateSingleJob(db_client, sub_item['ghId'])
  278. time.sleep(2)
  279. except Exception as e:
  280. print(e)
  281. print("服务号更新完成")
  282. def checkJob():
  283. """
  284. 校验任务
  285. :return:
  286. """
  287. db_client = PQMySQL()
  288. sub_accounts, server_accounts = getAccounts()
  289. fail_list = []
  290. account_list = sub_accounts + server_accounts
  291. # check and rework if fail
  292. for sub_item in tqdm(account_list):
  293. res = checkSingleAccount(db_client, sub_item)
  294. if not res:
  295. updateSingleJob(db_client, sub_item['ghId'])
  296. # check whether success and bot if fails
  297. for sub_item in tqdm(account_list):
  298. res = checkSingleAccount(db_client, sub_item)
  299. if not res:
  300. fail_list.append(sub_item)
  301. if fail_list:
  302. try:
  303. bot(fail_list)
  304. except Exception as e:
  305. print("Timeout Error: {}".format(e))
  306. def job_with_thread(job_func):
  307. """
  308. 每个任务放到单个线程中
  309. :param job_func:
  310. :return:
  311. """
  312. job_thread = threading.Thread(target=job_func)
  313. job_thread.start()
  314. if __name__ == '__main__':
  315. schedule.every().day.at("20:50").do(job_with_thread, updateJob)
  316. schedule.every().day.at("21:30").do(job_with_thread, checkJob)
  317. while True:
  318. schedule.run_pending()
  319. time.sleep(1)