update_published_articles_read_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. """
  2. @author: luojunhui
  3. @desc: 更新文章的阅读详情
  4. """
  5. import json
  6. import time
  7. import traceback
  8. import urllib.parse
  9. from datetime import datetime
  10. from typing import Dict, List
  11. from pymysql.cursors import DictCursor
  12. from tqdm import tqdm
  13. from applications import log
  14. from applications.api import FeishuBotApi
  15. from applications.db import DatabaseConnector
  16. from applications.utils import str_to_md5, show_desc_to_sta
  17. from cold_start.crawler.wechat import get_article_list_from_account
  18. from config import denet_config, long_articles_config, piaoquan_crawler_config
  19. class UpdatePublishedArticlesTaskConst:
  20. """
  21. 更新已发布文章消息常量配置
  22. """
  23. SUCCESS_CODE = 0
  24. # 爬虫详情接口返回code
  25. ARTICLE_ILLEGAL_CODE = 25012
  26. ARTICLE_DELETE_CODE = 25005
  27. ARTICLE_UNKNOWN_CODE = 10000
  28. # 账号违规状态码
  29. ACCOUNT_ILLEGAL_CODE = 25013
  30. UNKNOWN_SPIDER_ERROR_CODE = 20000
  31. # 记录默认状态
  32. DEFAULT_STATUS = 0
  33. # 请求接口失败状态
  34. REQUEST_FAIL_STATUS = -1
  35. # 文章被删除状态
  36. DELETE_STATUS = -2
  37. # 未知原因无信息返回状态
  38. UNKNOWN_STATUS = -3
  39. # 文章违规状态
  40. ILLEGAL_STATUS = -4
  41. # 公众号类型(订阅号 or 服务号)
  42. # 订阅号
  43. SUBSCRIBE_TYPE_SET = {0, 1}
  44. # 服务号
  45. SERVICE_TYPE = 2
  46. # 监测周期(秒)
  47. MONITOR_PERIOD = 60 * 60 * 24 * 3
  48. # 新号抓文章周期
  49. NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
  50. # 订阅号,抓取失败失败率报警阈值
  51. TASK_FAIL_RATE_THRESHOLD = 0.3
  52. # ARTICLE TABLE
  53. ARTICLE_TABLE_NAME = "official_articles_v2"
  54. class UpdatePublishedArticlesTaskBase(UpdatePublishedArticlesTaskConst):
  55. def __init__(self):
  56. self.crawler_client = DatabaseConnector(piaoquan_crawler_config)
  57. self.long_articles_client = DatabaseConnector(long_articles_config)
  58. self.denet_client = DatabaseConnector(denet_config)
  59. self.crawler_client.connect()
  60. self.long_articles_client.connect()
  61. self.denet_client.connect()
  62. self.feishu_bot_api = FeishuBotApi()
  63. def fetch_published_accounts(self) -> List[Dict]:
  64. """
  65. get published articles from aigc
  66. """
  67. fetch_query = f"""
  68. SELECT DISTINCT
  69. t3.`name` as account_name,
  70. t3.gh_id,
  71. t3.follower_count as fans,
  72. t3.create_timestamp as account_init_timestamp,
  73. t4.service_type_info as account_type,
  74. t4.verify_type_info as account_auth,
  75. t3.id as account_id,
  76. group_concat(distinct t5.remark) as account_remark
  77. FROM
  78. publish_plan t1
  79. JOIN publish_plan_account t2 ON t1.id = t2.plan_id
  80. JOIN publish_account t3 ON t2.account_id = t3.id
  81. LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
  82. LEFT JOIN publish_account_remark t5 on t3.id = t5.publish_account_id
  83. WHERE
  84. t1.plan_status = 1
  85. AND t3.channel = 5
  86. GROUP BY t3.id;
  87. """
  88. fetch_response = self.denet_client.fetch(
  89. query=fetch_query, cursor_type=DictCursor
  90. )
  91. account_list = [
  92. i for i in fetch_response if "自动回复" not in str(i["account_remark"])
  93. ]
  94. return account_list
  95. def fetch_account_experiment_status(self) -> Dict[str, str]:
  96. fetch_query = f"""
  97. SELECT t1.account_id, t2.status
  98. FROM wx_statistics_group_source_account t1
  99. JOIN wx_statistics_group_source t2
  100. ON t1.group_source_name = t2.account_source_name;
  101. """
  102. account_status_list = self.denet_client.fetch(
  103. query=fetch_query, cursor_type=DictCursor
  104. )
  105. account_status_dict = {
  106. account["account_id"]: account["status"] for account in account_status_list
  107. }
  108. return account_status_dict
  109. def get_account_list(self) -> List[Dict]:
  110. account_without_status = self.fetch_published_accounts()
  111. account_status_dict = self.fetch_account_experiment_status()
  112. account_list = [
  113. {
  114. **item,
  115. "using_status": (
  116. 0 if account_status_dict.get(item["account_id"]) == "实验" else 1
  117. ),
  118. }
  119. for item in account_without_status
  120. ]
  121. return account_list
  122. def fetch_account_max_publish_timestamp(self, gh_id: str) -> int:
  123. # get max published timestamp for this account
  124. fetch_query = f"""
  125. SELECT MAX(publish_timestamp) AS max_publish_timestamp
  126. FROM {self.ARTICLE_TABLE_NAME}
  127. WHERE ghId = %s;
  128. """
  129. fetch_response = self.crawler_client.fetch(
  130. fetch_query, cursor_type=DictCursor, params=(gh_id,)
  131. )
  132. if fetch_response:
  133. max_publish_timestamp = fetch_response[0]["max_publish_timestamp"]
  134. else:
  135. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  136. return max_publish_timestamp
  137. def crawl_account_published_articles(self, account: Dict[str, str]):
  138. # max_publish_timestamp = self.fetch_account_max_publish_timestamp(
  139. # account["gh_id"]
  140. # )
  141. cursor = None
  142. while True:
  143. crawl_response = get_article_list_from_account(
  144. account["gh_id"], index=cursor
  145. )
  146. crawl_response_code = crawl_response["code"]
  147. match crawl_response_code:
  148. # 请求成功
  149. case self.SUCCESS_CODE:
  150. print("success")
  151. break
  152. # msg_list = crawl_response.get("data", {}).get("data", [])
  153. # if not msg_list:
  154. # break
  155. #
  156. # self.record_each_msg(account, msg_list)
  157. # earliest_msg = msg_list[-1]
  158. # earliest_update_timestamp = earliest_msg["AppMsg"]["BaseInfo"][
  159. # "UpdateTime"
  160. # ]
  161. # if earliest_update_timestamp > max_publish_timestamp:
  162. # cursor = crawl_response["data"]["next_cursor"]
  163. # else:
  164. # break
  165. # 账号违规
  166. case self.ACCOUNT_ILLEGAL_CODE:
  167. log(
  168. task="update_published_articles",
  169. function="crawl_account_published_articles",
  170. message="账号违规",
  171. data=account,
  172. )
  173. self.feishu_bot_api.bot(
  174. title="公众号账号违规报警",
  175. detail={
  176. "账号名称": account["account_name"],
  177. "账号ID": account["gh_id"],
  178. "违规原因": crawl_response["msg"],
  179. "粉丝数": account["fans"],
  180. "利用状态": account["using_status"]
  181. },
  182. env="dev",
  183. mention=False
  184. )
  185. break
  186. case self.UNKNOWN_SPIDER_ERROR_CODE:
  187. log(
  188. task="update_published_articles",
  189. function="crawl_account_published_articles",
  190. message=f"未知错误",
  191. data=account,
  192. )
  193. self.feishu_bot_api.bot(
  194. title="接口请求失败报警",
  195. detail={
  196. "账号名称": account["account_name"],
  197. "账号ID": account["gh_id"],
  198. "违规原因": crawl_response["msg"],
  199. "粉丝数": account["fans"],
  200. "利用状态": account["using_status"]
  201. },
  202. env="dev",
  203. mention=False
  204. )
  205. break
  206. # 其他 code
  207. case _:
  208. print("unknown code:", crawl_response_code)
  209. break
  210. def record_each_msg(self, account, msg_list):
  211. for msg in msg_list:
  212. base_info = msg.get("BaseInfo", {})
  213. app_msg_id = msg.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  214. create_timestamp = (
  215. msg.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  216. )
  217. update_timestamp = (
  218. msg.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  219. )
  220. publish_type = msg.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  221. detail_article_list = msg.get("AppMsg", {}).get("DetailInfo", [])
  222. if detail_article_list:
  223. for article in detail_article_list:
  224. title = article.get("Title", None)
  225. digest = article.get("Digest", None)
  226. item_index = article.get("ItemIndex", None)
  227. content_url = article.get("ContentUrl", None)
  228. source_url = article.get("SourceUrl", None)
  229. cover_img_url = article.get("CoverImgUrl", None)
  230. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  231. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  232. item_show_type = article.get("ItemShowType", None)
  233. is_original = article.get("IsOriginal", None)
  234. show_desc = article.get("ShowDesc", None)
  235. show_stat = show_desc_to_sta(show_desc)
  236. ori_content = article.get("ori_content", None)
  237. show_view_count = show_stat.get("show_view_count", 0)
  238. show_like_count = show_stat.get("show_like_count", 0)
  239. show_zs_count = show_stat.get("show_zs_count", 0)
  240. show_pay_count = show_stat.get("show_pay_count", 0)
  241. wx_sn = (
  242. content_url.split("&sn=")[1].split("&")[0]
  243. if content_url
  244. else None
  245. )
  246. status = account["using_status"]
  247. info_tuple = (
  248. account["gh_id"],
  249. account["account_name"],
  250. app_msg_id,
  251. title,
  252. publish_type,
  253. create_timestamp,
  254. update_timestamp,
  255. digest,
  256. item_index,
  257. content_url,
  258. source_url,
  259. cover_img_url,
  260. cover_img_url_1_1,
  261. cover_img_url_235_1,
  262. item_show_type,
  263. is_original,
  264. show_desc,
  265. ori_content,
  266. show_view_count,
  267. show_like_count,
  268. show_zs_count,
  269. show_pay_count,
  270. wx_sn,
  271. json.dumps(base_info, ensure_ascii=False),
  272. str_to_md5(title),
  273. status,
  274. )
  275. try:
  276. insert_sql = f"""
  277. INSERT INTO {self.ARTICLE_TABLE_NAME}
  278. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  279. VALUES
  280. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  281. """
  282. self.crawler_client.save(query=insert_sql, params=info_tuple)
  283. log(
  284. task="updatePublishedMsgDaily",
  285. function="insert_each_msg",
  286. message="插入文章数据成功",
  287. data={"info": info_tuple},
  288. )
  289. except Exception as e:
  290. try:
  291. update_sql = f"""
  292. UPDATE {self.ARTICLE_TABLE_NAME}
  293. SET show_view_count = %s, show_like_count=%s
  294. WHERE wx_sn = %s;
  295. """
  296. self.crawler_client.save(
  297. query=update_sql,
  298. params=(show_view_count, show_like_count, wx_sn),
  299. )
  300. log(
  301. task="updatePublishedMsgDaily",
  302. function="insert_each_msg",
  303. message="更新文章数据成功",
  304. data={
  305. "wxSn": wx_sn,
  306. "likeCount": show_like_count,
  307. "viewCount": show_view_count,
  308. },
  309. )
  310. except Exception as e:
  311. log(
  312. task="updatePublishedMsgDaily",
  313. function="insert_each_msg",
  314. message="更新文章失败, 报错原因是: {}".format(e),
  315. status="fail",
  316. )
  317. continue
  318. class UpdatePublishedArticlesTaskCollector(UpdatePublishedArticlesTaskBase):
  319. def deal(self):
  320. account_list = self.get_account_list()
  321. for account in tqdm(account_list, desc="抓取每个账号的文章信息"):
  322. try:
  323. self.crawl_account_published_articles(account)
  324. except Exception as e:
  325. log(
  326. task="update_published_articles_collector",
  327. function="crawl_account_published_articles",
  328. message=f"抓取账号文章信息失败, 报错原因是: {e}",
  329. status="fail",
  330. data=account
  331. )
  332. # self.feishu_bot_api.bot(
  333. # title='更新每日发布文章任务完成通知',
  334. # detail={
  335. # "msg": "账号更新完成",
  336. # "finish_time": datetime.today().__str__()
  337. # },
  338. # mention=False
  339. # )
  340. class UpdatePublishedArticlesTaskChecker(UpdatePublishedArticlesTaskBase):
  341. def deal(self):
  342. pass
  343. class UpdatePublishedArticlesTaskArticlesMonitor(UpdatePublishedArticlesTaskBase):
  344. pass