update_msg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import json
  6. import threading
  7. import requests
  8. import schedule
  9. from tqdm import tqdm
  10. from datetime import datetime
  11. from config import accountBaseInfo
  12. from applications import PQMySQL, WeixinSpider, Functions
  13. from applications.decoratorApi import retryOnTimeout
  14. class UpdateMsgDaily(object):
  15. """
  16. 日常更新文章
  17. """
  18. db_client = PQMySQL()
  19. spider = WeixinSpider()
  20. functions = Functions()
  21. @classmethod
  22. def getAccountIdDict(cls):
  23. """
  24. 获取全部内部账号的id
  25. :return:
  26. """
  27. gh_id_dict = {}
  28. for key in accountBaseInfo:
  29. gh_id = accountBaseInfo[key]["ghId"]
  30. name = accountBaseInfo[key]["accountName"]
  31. gh_id_dict[gh_id] = name
  32. return gh_id_dict
  33. @classmethod
  34. @retryOnTimeout()
  35. def bot(cls, account_list):
  36. """
  37. 机器人
  38. """
  39. url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
  40. headers = {"Content-Type": "application/json"}
  41. payload = {
  42. "msg_type": "interactive",
  43. "card": {
  44. "elements": [
  45. {
  46. "tag": "div",
  47. "text": {
  48. "content": "存在文章更新失败<at id=all></at>\n",
  49. "tag": "lark_md",
  50. },
  51. },
  52. {
  53. "tag": "div",
  54. "text": {
  55. "content": json.dumps(
  56. account_list, ensure_ascii=False, indent=4
  57. ),
  58. "tag": "lark_md",
  59. },
  60. },
  61. ],
  62. "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
  63. },
  64. }
  65. requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
  66. @classmethod
  67. def findAccountLatestUpdateTime(cls, gh_id):
  68. """
  69. 获取账号的最近更新id
  70. :param gh_id:
  71. :return:
  72. """
  73. sql = f"""
  74. select accountName, updateTime
  75. from official_articles_v2
  76. where ghId = '{gh_id}'
  77. order by updateTime DESC;
  78. """
  79. result = cls.db_client.select(sql)
  80. if result:
  81. account_name, update_time = result[0]
  82. return {"update_time": update_time, "account_type": "history"}
  83. else:
  84. return {
  85. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  86. "account_type": "new",
  87. }
  88. @classmethod
  89. def updateMsgList(cls, gh_id, account_name, msg_list):
  90. """
  91. 把消息数据更新到数据库中
  92. :param account_name:
  93. :param gh_id:
  94. :param msg_list:
  95. :return:
  96. """
  97. for info in msg_list:
  98. baseInfo = info.get("BaseInfo", {})
  99. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  100. createTime = (
  101. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  102. )
  103. updateTime = (
  104. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  105. )
  106. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  107. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  108. if detail_article_list:
  109. for article in detail_article_list:
  110. title = article.get("Title", None)
  111. Digest = article.get("Digest", None)
  112. ItemIndex = article.get("ItemIndex", None)
  113. ContentUrl = article.get("ContentUrl", None)
  114. SourceUrl = article.get("SourceUrl", None)
  115. CoverImgUrl = article.get("CoverImgUrl", None)
  116. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  117. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  118. ItemShowType = article.get("ItemShowType", None)
  119. IsOriginal = article.get("IsOriginal", None)
  120. ShowDesc = article.get("ShowDesc", None)
  121. show_stat = cls.functions.show_desc_to_sta(ShowDesc)
  122. ori_content = article.get("ori_content", None)
  123. show_view_count = show_stat.get("show_view_count", 0)
  124. show_like_count = show_stat.get("show_like_count", 0)
  125. show_zs_count = show_stat.get("show_zs_count", 0)
  126. show_pay_count = show_stat.get("show_pay_count", 0)
  127. wx_sn = (
  128. ContentUrl.split("&sn=")[1].split("&")[0]
  129. if ContentUrl
  130. else None
  131. )
  132. info_tuple = (
  133. gh_id,
  134. account_name,
  135. appMsgId,
  136. title,
  137. Type,
  138. createTime,
  139. updateTime,
  140. Digest,
  141. ItemIndex,
  142. ContentUrl,
  143. SourceUrl,
  144. CoverImgUrl,
  145. CoverImgUrl_1_1,
  146. CoverImgUrl_235_1,
  147. ItemShowType,
  148. IsOriginal,
  149. ShowDesc,
  150. ori_content,
  151. show_view_count,
  152. show_like_count,
  153. show_zs_count,
  154. show_pay_count,
  155. wx_sn,
  156. json.dumps(baseInfo, ensure_ascii=False),
  157. )
  158. try:
  159. insert_sql = f"""
  160. INSERT INTO official_articles_v2
  161. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
  162. values
  163. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  164. """
  165. cls.db_client.update(sql=insert_sql, params=info_tuple)
  166. except Exception as e:
  167. try:
  168. update_sql = f"""
  169. UPDATE official_articles_v2
  170. SET show_view_count = %s, show_like_count=%s
  171. WHERE wx_sn = %s;
  172. """
  173. cls.db_client.update(
  174. sql=update_sql,
  175. params=(show_view_count, show_like_count, wx_sn),
  176. )
  177. except Exception as e:
  178. print("失败-{}".format(e))
  179. continue
  180. @classmethod
  181. def getAccountArticleList(cls, gh_id, last_update_time, cursor=None):
  182. """
  183. 输入ghid获取账号的文章list
  184. :return:
  185. """
  186. try:
  187. response = cls.spider.update_msg_list(ghId=gh_id, index=cursor)
  188. except Exception as e:
  189. response = {
  190. "error": str(e),
  191. "info": "更新文章接口请求失败",
  192. "gh_id": gh_id,
  193. "time": datetime.now().__str__()
  194. }
  195. # 之后可以考虑抛出阿里云日志
  196. print(response)
  197. return
  198. msg_list = response.get("data", {}).get("data")
  199. if msg_list:
  200. last_article_in_this_msg = msg_list[-1]
  201. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][
  202. "BaseInfo"
  203. ]["UpdateTime"]
  204. last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"]
  205. # 校验是否抓到的是同一个账号
  206. try:
  207. resdata = cls.spider.get_account_by_url(last_url)
  208. except Exception as e:
  209. resdata = {
  210. "error": str(e),
  211. "info": "通过链接获取账号信息失败",
  212. "gh_id": gh_id,
  213. "time": datetime.now().__str__()
  214. }
  215. print(resdata)
  216. return
  217. check_name = resdata["data"].get("data", {}).get("account_name")
  218. check_id = resdata["data"].get("data", {}).get("wx_gh")
  219. if check_id == gh_id:
  220. cls.updateMsgList(gh_id, check_name, msg_list)
  221. if last_time_stamp_in_this_msg > last_update_time:
  222. next_cursor = response["data"]["next_cursor"]
  223. return cls.getAccountArticleList(
  224. gh_id=gh_id,
  225. last_update_time=last_update_time,
  226. cursor=next_cursor,
  227. )
  228. else:
  229. response = {
  230. "code": 1002,
  231. "info": "抓取时候账号校验失败",
  232. "error": None,
  233. "gh_id": gh_id,
  234. "time_stamp": datetime.now().__str__(),
  235. }
  236. print(response)
  237. else:
  238. response = {
  239. "code": 1003,
  240. "info": "账号为抓取到内容",
  241. "error": None,
  242. "gh_id": gh_id,
  243. "time_stamp": datetime.now().__str__(),
  244. }
  245. print(response)
  246. @classmethod
  247. def checkEachAccount(cls, gh_id):
  248. """
  249. 验证单个账号是否当天有更新
  250. :param gh_id:
  251. :return:
  252. """
  253. today_str = datetime.today().strftime("%Y-%m-%d")
  254. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  255. today_timestamp = today_date_time.timestamp()
  256. sql = f"""
  257. select updateTime
  258. from official_articles_v2
  259. where ghId = '{gh_id}'
  260. order by updateTime
  261. desc
  262. """
  263. latest_update_time = cls.db_client.select(sql)[0][0]
  264. # 判断该账号当天发布的文章是否被收集
  265. if int(latest_update_time) > int(today_timestamp):
  266. return True
  267. else:
  268. return False
  269. @classmethod
  270. def updateJob(cls):
  271. """
  272. 更新文章任务
  273. :return:
  274. """
  275. account_list = cls.getAccountIdDict()
  276. for account_id in tqdm(account_list):
  277. account_info = cls.findAccountLatestUpdateTime(account_id)
  278. latest_time = account_info["update_time"]
  279. try:
  280. cls.getAccountArticleList(
  281. gh_id=account_id, last_update_time=latest_time
  282. )
  283. except Exception as e:
  284. response = {
  285. "code": 1001,
  286. "info": "单个账号更新失败",
  287. "error": str(e),
  288. "time_stamp": datetime.now().__str__(),
  289. }
  290. print(response)
  291. @classmethod
  292. def checkJob(cls):
  293. """
  294. 验证所有账号是否已经有更新数据
  295. :return:
  296. todo: 被封禁账号&&服务号需要做区分
  297. """
  298. account_dict = cls.getAccountIdDict()
  299. error_account_list = []
  300. for account_id in tqdm(account_dict):
  301. if cls.checkEachAccount(account_id):
  302. continue
  303. else:
  304. name = account_dict[account_id]
  305. error_account_list.append(name)
  306. if error_account_list:
  307. try:
  308. cls.bot(error_account_list)
  309. except Exception as e:
  310. print("Timeout Error: {}".format(e))
  311. def job_with_thread(job_func):
  312. """
  313. 每个任务放到单个线程中
  314. :param job_func:
  315. :return:
  316. """
  317. job_thread = threading.Thread(target=job_func)
  318. job_thread.start()
  319. if __name__ == "__main__":
  320. UMD = UpdateMsgDaily()
  321. schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob)
  322. schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob)
  323. while True:
  324. schedule.run_pending()
  325. time.sleep(1)