update_msg.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import json
  6. import requests
  7. import schedule
  8. from tqdm import tqdm
  9. from datetime import datetime
  10. from config import accountBaseInfo
  11. from applications import PQMySQL, WeixinSpider, Functions
  12. class UpdateMsgDaily(object):
  13. """
  14. 日常更新文章
  15. """
  16. db_client = PQMySQL()
  17. spider = WeixinSpider()
  18. functions = Functions()
  19. @classmethod
  20. def getAccountIdDict(cls):
  21. """
  22. 获取全部内部账号的id
  23. :return:
  24. """
  25. gh_id_dict = {}
  26. for key in accountBaseInfo:
  27. gh_id = accountBaseInfo[key]["ghId"]
  28. name = accountBaseInfo[key]["accountName"]
  29. gh_id_dict[gh_id] = name
  30. return gh_id_dict
  31. @classmethod
  32. def bot(cls, account_list):
  33. """
  34. 机器人
  35. """
  36. url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
  37. headers = {"Content-Type": "application/json"}
  38. payload = {
  39. "msg_type": "interactive",
  40. "card": {
  41. "elements": [
  42. {
  43. "tag": "div",
  44. "text": {
  45. "content": "存在文章更新失败<at id=all></at>\n",
  46. "tag": "lark_md",
  47. },
  48. },
  49. {
  50. "tag": "div",
  51. "text": {
  52. "content": json.dumps(
  53. account_list, ensure_ascii=False, indent=4
  54. ),
  55. "tag": "lark_md",
  56. },
  57. },
  58. ],
  59. "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
  60. },
  61. }
  62. requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
  63. @classmethod
  64. def findAccountLatestUpdateTime(cls, gh_id):
  65. """
  66. 获取账号的最近更新id
  67. :param gh_id:
  68. :return:
  69. """
  70. sql = f"""
  71. select accountName, updateTime
  72. from official_articles_v2
  73. where ghId = '{gh_id}'
  74. order by updateTime DESC;
  75. """
  76. result = cls.db_client.select(sql)
  77. if result:
  78. account_name, update_time = result[0]
  79. return {"update_time": update_time, "account_type": "history"}
  80. else:
  81. return {
  82. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  83. "account_type": "new",
  84. }
  85. @classmethod
  86. def updateMsgList(cls, gh_id, account_name, msg_list):
  87. """
  88. 把消息数据更新到数据库中
  89. :param account_name:
  90. :param gh_id:
  91. :param msg_list:
  92. :return:
  93. """
  94. for info in msg_list:
  95. baseInfo = info.get("BaseInfo", {})
  96. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  97. createTime = (
  98. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  99. )
  100. updateTime = (
  101. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  102. )
  103. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  104. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  105. if detail_article_list:
  106. for article in detail_article_list:
  107. title = article.get("Title", None)
  108. Digest = article.get("Digest", None)
  109. ItemIndex = article.get("ItemIndex", None)
  110. ContentUrl = article.get("ContentUrl", None)
  111. SourceUrl = article.get("SourceUrl", None)
  112. CoverImgUrl = article.get("CoverImgUrl", None)
  113. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  114. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  115. ItemShowType = article.get("ItemShowType", None)
  116. IsOriginal = article.get("IsOriginal", None)
  117. ShowDesc = article.get("ShowDesc", None)
  118. show_stat = cls.functions.show_desc_to_sta(ShowDesc)
  119. ori_content = article.get("ori_content", None)
  120. show_view_count = show_stat.get("show_view_count", 0)
  121. show_like_count = show_stat.get("show_like_count", 0)
  122. show_zs_count = show_stat.get("show_zs_count", 0)
  123. show_pay_count = show_stat.get("show_pay_count", 0)
  124. wx_sn = (
  125. ContentUrl.split("&sn=")[1].split("&")[0]
  126. if ContentUrl
  127. else None
  128. )
  129. info_tuple = (
  130. gh_id,
  131. account_name,
  132. appMsgId,
  133. title,
  134. Type,
  135. createTime,
  136. updateTime,
  137. Digest,
  138. ItemIndex,
  139. ContentUrl,
  140. SourceUrl,
  141. CoverImgUrl,
  142. CoverImgUrl_1_1,
  143. CoverImgUrl_235_1,
  144. ItemShowType,
  145. IsOriginal,
  146. ShowDesc,
  147. ori_content,
  148. show_view_count,
  149. show_like_count,
  150. show_zs_count,
  151. show_pay_count,
  152. wx_sn,
  153. json.dumps(baseInfo, ensure_ascii=False),
  154. )
  155. try:
  156. insert_sql = f"""
  157. INSERT INTO official_articles_v2
  158. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
  159. values
  160. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  161. """
  162. cls.db_client.update(sql=insert_sql, params=info_tuple)
  163. except Exception as e:
  164. try:
  165. update_sql = f"""
  166. UPDATE official_articles_v2
  167. SET show_view_count = %s, show_like_count=%s
  168. WHERE wx_sn = %s;
  169. """
  170. cls.db_client.update(
  171. sql=update_sql,
  172. params=(show_view_count, show_like_count, wx_sn),
  173. )
  174. except Exception as e:
  175. print("失败-{}".format(e))
  176. continue
  177. @classmethod
  178. def getAccountArticleList(cls, gh_id, last_update_time, cursor=None):
  179. """
  180. 输入ghid获取账号的文章list
  181. :return:
  182. """
  183. response = cls.spider.update_msg_list(ghId=gh_id, index=cursor)
  184. msg_list = response.get("data", {}).get("data")
  185. if msg_list:
  186. last_article_in_this_msg = msg_list[-1]
  187. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][
  188. "BaseInfo"
  189. ]["UpdateTime"]
  190. last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"]
  191. # 校验是否抓到的是同一个账号
  192. resdata = cls.spider.get_account_by_url(last_url)
  193. check_name = resdata["data"].get("data", {}).get("account_name")
  194. check_id = resdata["data"].get("data", {}).get("wx_gh")
  195. if check_id == gh_id:
  196. cls.updateMsgList(gh_id, check_name, msg_list)
  197. if last_time_stamp_in_this_msg > last_update_time:
  198. next_cursor = response["data"]["next_cursor"]
  199. return cls.getAccountArticleList(
  200. gh_id=gh_id,
  201. last_update_time=last_update_time,
  202. cursor=next_cursor,
  203. )
  204. else:
  205. response = {
  206. "code": 1002,
  207. "info": "抓取时候账号校验失败",
  208. "error": None,
  209. "gh_id": gh_id,
  210. "time_stamp": datetime.now().__str__(),
  211. }
  212. print(response)
  213. else:
  214. response = {
  215. "code": 1003,
  216. "info": "账号为抓取到内容",
  217. "error": None,
  218. "gh_id": gh_id,
  219. "time_stamp": datetime.now().__str__(),
  220. }
  221. print(response)
  222. @classmethod
  223. def checkEachAccount(cls, gh_id):
  224. """
  225. 验证单个账号是否当天有更新
  226. :param gh_id:
  227. :return:
  228. """
  229. today_str = datetime.today().strftime("%Y-%m-%d")
  230. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  231. today_timestamp = today_date_time.timestamp()
  232. sql = f"""
  233. select updateTime
  234. from official_articles_v2
  235. where ghId = '{gh_id}'
  236. order by updateTime
  237. desc
  238. """
  239. latest_update_time = cls.db_client.select(sql)[0][0]
  240. # 判断该账号当天发布的文章是否被收集
  241. if int(latest_update_time) > int(today_timestamp):
  242. return True
  243. else:
  244. return False
  245. @classmethod
  246. def updateJob(cls):
  247. """
  248. 更新文章任务
  249. :return:
  250. """
  251. account_list = cls.getAccountIdDict()
  252. for account_id in tqdm(account_list):
  253. account_info = cls.findAccountLatestUpdateTime(account_id)
  254. latest_time = account_info["update_time"]
  255. try:
  256. cls.getAccountArticleList(
  257. gh_id=account_id, last_update_time=latest_time
  258. )
  259. except Exception as e:
  260. response = {
  261. "code": 1001,
  262. "info": "单个账号更新失败",
  263. "error": str(e),
  264. "time_stamp": datetime.now().__str__(),
  265. }
  266. print(response)
  267. @classmethod
  268. def checkJob(cls):
  269. """
  270. 验证所有账号是否已经有更新数据
  271. :return:
  272. todo: 被封禁账号&&服务号需要做区分
  273. """
  274. account_dict = cls.getAccountIdDict()
  275. error_account_list = []
  276. for account_id in tqdm(account_dict):
  277. if cls.checkEachAccount(account_id):
  278. continue
  279. else:
  280. name = account_dict[account_id]
  281. error_account_list.append(name)
  282. if error_account_list:
  283. cls.bot(error_account_list)
  284. if __name__ == "__main__":
  285. UMD = UpdateMsgDaily()
  286. schedule.every().day.at("21:00").do(UMD.updateJob)
  287. schedule.every().day.at("21:30").do(UMD.checkJob)
  288. while True:
  289. schedule.run_pending()
  290. time.sleep(1)