update_published_articles_read_detail.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. """
  2. @author: luojunhui
  3. @desc: 更新文章的阅读详情
  4. """
  5. import json
  6. import time
  7. import traceback
  8. import urllib.parse
  9. from datetime import datetime
  10. from typing import Dict, List
  11. from pymysql.cursors import DictCursor
  12. from tqdm import tqdm
  13. from applications import log
  14. from applications.api import FeishuBotApi
  15. from applications.db import DatabaseConnector
  16. from applications.utils import str_to_md5, show_desc_to_sta
  17. from cold_start.crawler.wechat import get_article_list_from_account
  18. from config import denet_config, long_articles_config, piaoquan_crawler_config
  19. class UpdatePublishedArticlesTaskConst:
  20. """
  21. 更新已发布文章消息常量配置
  22. """
  23. SUCCESS_CODE = 0
  24. # 爬虫详情接口返回code
  25. ARTICLE_ILLEGAL_CODE = 25012
  26. ARTICLE_DELETE_CODE = 25005
  27. ARTICLE_UNKNOWN_CODE = 10000
  28. # 账号违规状态码
  29. ACCOUNT_ILLEGAL_CODE = 25013
  30. UNKNOWN_SPIDER_ERROR_CODE = 20000
  31. # 记录默认状态
  32. DEFAULT_STATUS = 0
  33. # 请求接口失败状态
  34. REQUEST_FAIL_STATUS = -1
  35. # 文章被删除状态
  36. DELETE_STATUS = -2
  37. # 未知原因无信息返回状态
  38. UNKNOWN_STATUS = -3
  39. # 文章违规状态
  40. ILLEGAL_STATUS = -4
  41. # 公众号类型(订阅号 or 服务号)
  42. # 订阅号
  43. SUBSCRIBE_TYPE_SET = {0, 1}
  44. # 服务号
  45. SERVICE_TYPE = 2
  46. # 监测周期(秒)
  47. MONITOR_PERIOD = 60 * 60 * 24 * 3
  48. # 新号抓文章周期
  49. NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
  50. # 订阅号,抓取失败失败率报警阈值
  51. TASK_FAIL_RATE_THRESHOLD = 0.3
  52. # ARTICLE TABLE
  53. ARTICLE_TABLE_NAME = "official_articles_v2"
  54. # ILLEGAL_GH_IDS
  55. ILLEGAL_GH_IDS = [
  56. 'gh_4c058673c07e',
  57. 'gh_de9f9ebc976b',
  58. 'gh_7b4a5f86d68c',
  59. 'gh_f902cea89e48',
  60. 'gh_789a40fe7935',
  61. 'gh_cd041ed721e6',
  62. 'gh_62d7f423f382',
  63. 'gh_043223059726',
  64. 'gh_5bb79339a1f4'
  65. ]
  66. class UpdatePublishedArticlesTaskBase(UpdatePublishedArticlesTaskConst):
  67. def __init__(self):
  68. self.crawler_client = DatabaseConnector(piaoquan_crawler_config)
  69. self.long_articles_client = DatabaseConnector(long_articles_config)
  70. self.denet_client = DatabaseConnector(denet_config)
  71. self.crawler_client.connect()
  72. self.long_articles_client.connect()
  73. self.denet_client.connect()
  74. self.feishu_bot_api = FeishuBotApi()
  75. def fetch_published_accounts(self) -> List[Dict]:
  76. """
  77. get published articles from aigc
  78. """
  79. fetch_query = f"""
  80. SELECT DISTINCT
  81. t3.`name` as account_name,
  82. t3.gh_id,
  83. t3.follower_count as fans,
  84. t3.create_timestamp as account_init_timestamp,
  85. t4.service_type_info as account_type,
  86. t4.verify_type_info as account_auth,
  87. t3.id as account_id,
  88. group_concat(distinct t5.remark) as account_remark
  89. FROM
  90. publish_plan t1
  91. JOIN publish_plan_account t2 ON t1.id = t2.plan_id
  92. JOIN publish_account t3 ON t2.account_id = t3.id
  93. LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
  94. LEFT JOIN publish_account_remark t5 on t3.id = t5.publish_account_id
  95. WHERE
  96. t1.plan_status = 1
  97. AND t3.channel = 5
  98. GROUP BY t3.id;
  99. """
  100. fetch_response = self.denet_client.fetch(
  101. query=fetch_query, cursor_type=DictCursor
  102. )
  103. account_list = [
  104. i for i in fetch_response if "自动回复" not in str(i["account_remark"])
  105. ]
  106. account_list = [
  107. i for i in account_list if i["gh_id"] not in self.ILLEGAL_GH_IDS
  108. ]
  109. return account_list
  110. def fetch_account_experiment_status(self) -> Dict[str, str]:
  111. fetch_query = f"""
  112. SELECT t1.account_id, t2.status
  113. FROM wx_statistics_group_source_account t1
  114. JOIN wx_statistics_group_source t2
  115. ON t1.group_source_name = t2.account_source_name;
  116. """
  117. account_status_list = self.denet_client.fetch(
  118. query=fetch_query, cursor_type=DictCursor
  119. )
  120. account_status_dict = {
  121. account["account_id"]: account["status"] for account in account_status_list
  122. }
  123. return account_status_dict
  124. def get_account_list(self) -> List[Dict]:
  125. account_without_status = self.fetch_published_accounts()
  126. account_status_dict = self.fetch_account_experiment_status()
  127. account_list = [
  128. {
  129. **item,
  130. "using_status": (
  131. 0 if account_status_dict.get(item["account_id"]) == "实验" else 1
  132. ),
  133. }
  134. for item in account_without_status
  135. ]
  136. return account_list
  137. def fetch_account_max_publish_timestamp(self, gh_id: str) -> int:
  138. # get max published timestamp for this account
  139. fetch_query = f"""
  140. SELECT MAX(publish_timestamp) AS max_publish_timestamp
  141. FROM {self.ARTICLE_TABLE_NAME}
  142. WHERE ghId = %s;
  143. """
  144. fetch_response = self.crawler_client.fetch(
  145. fetch_query, cursor_type=DictCursor, params=(gh_id,)
  146. )
  147. if fetch_response:
  148. max_publish_timestamp = fetch_response[0]["max_publish_timestamp"]
  149. else:
  150. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  151. return max_publish_timestamp
  152. def crawl_account_published_articles(self, account: Dict[str, str]):
  153. # max_publish_timestamp = self.fetch_account_max_publish_timestamp(
  154. # account["gh_id"]
  155. # )
  156. cursor = None
  157. while True:
  158. crawl_response = get_article_list_from_account(
  159. account["gh_id"], index=cursor
  160. )
  161. crawl_response_code = crawl_response["code"]
  162. match crawl_response_code:
  163. # 请求成功
  164. case self.SUCCESS_CODE:
  165. print("success")
  166. break
  167. # msg_list = crawl_response.get("data", {}).get("data", [])
  168. # if not msg_list:
  169. # break
  170. #
  171. # self.record_each_msg(account, msg_list)
  172. # earliest_msg = msg_list[-1]
  173. # earliest_update_timestamp = earliest_msg["AppMsg"]["BaseInfo"][
  174. # "UpdateTime"
  175. # ]
  176. # if earliest_update_timestamp > max_publish_timestamp:
  177. # cursor = crawl_response["data"]["next_cursor"]
  178. # else:
  179. # break
  180. # 账号违规
  181. case self.ACCOUNT_ILLEGAL_CODE:
  182. log(
  183. task="update_published_articles",
  184. function="crawl_account_published_articles",
  185. message="账号违规",
  186. data=account,
  187. )
  188. self.feishu_bot_api.bot(
  189. title="公众号账号违规报警",
  190. detail={
  191. "账号名称": account["account_name"],
  192. "账号ID": account["gh_id"],
  193. "违规原因": crawl_response["msg"],
  194. "粉丝数": account["fans"],
  195. "利用状态": account["using_status"]
  196. },
  197. env="gzh_monitor_bot",
  198. mention=False
  199. )
  200. break
  201. case self.UNKNOWN_SPIDER_ERROR_CODE:
  202. log(
  203. task="update_published_articles",
  204. function="crawl_account_published_articles",
  205. message=f"未知错误",
  206. data=account,
  207. )
  208. self.feishu_bot_api.bot(
  209. title="接口请求失败报警",
  210. detail={
  211. "账号名称": account["account_name"],
  212. "账号ID": account["gh_id"],
  213. "违规原因": crawl_response["msg"],
  214. "粉丝数": account["fans"],
  215. "利用状态": account["using_status"]
  216. },
  217. env="dev",
  218. mention=False
  219. )
  220. break
  221. # 其他 code
  222. case _:
  223. print("unknown code:", crawl_response_code)
  224. break
  225. def record_each_msg(self, account, msg_list):
  226. for msg in msg_list:
  227. base_info = msg.get("BaseInfo", {})
  228. app_msg_id = msg.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  229. create_timestamp = (
  230. msg.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  231. )
  232. update_timestamp = (
  233. msg.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  234. )
  235. publish_type = msg.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  236. detail_article_list = msg.get("AppMsg", {}).get("DetailInfo", [])
  237. if detail_article_list:
  238. for article in detail_article_list:
  239. title = article.get("Title", None)
  240. digest = article.get("Digest", None)
  241. item_index = article.get("ItemIndex", None)
  242. content_url = article.get("ContentUrl", None)
  243. source_url = article.get("SourceUrl", None)
  244. cover_img_url = article.get("CoverImgUrl", None)
  245. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  246. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  247. item_show_type = article.get("ItemShowType", None)
  248. is_original = article.get("IsOriginal", None)
  249. show_desc = article.get("ShowDesc", None)
  250. show_stat = show_desc_to_sta(show_desc)
  251. ori_content = article.get("ori_content", None)
  252. show_view_count = show_stat.get("show_view_count", 0)
  253. show_like_count = show_stat.get("show_like_count", 0)
  254. show_zs_count = show_stat.get("show_zs_count", 0)
  255. show_pay_count = show_stat.get("show_pay_count", 0)
  256. wx_sn = (
  257. content_url.split("&sn=")[1].split("&")[0]
  258. if content_url
  259. else None
  260. )
  261. status = account["using_status"]
  262. info_tuple = (
  263. account["gh_id"],
  264. account["account_name"],
  265. app_msg_id,
  266. title,
  267. publish_type,
  268. create_timestamp,
  269. update_timestamp,
  270. digest,
  271. item_index,
  272. content_url,
  273. source_url,
  274. cover_img_url,
  275. cover_img_url_1_1,
  276. cover_img_url_235_1,
  277. item_show_type,
  278. is_original,
  279. show_desc,
  280. ori_content,
  281. show_view_count,
  282. show_like_count,
  283. show_zs_count,
  284. show_pay_count,
  285. wx_sn,
  286. json.dumps(base_info, ensure_ascii=False),
  287. str_to_md5(title),
  288. status,
  289. )
  290. try:
  291. insert_sql = f"""
  292. INSERT INTO {self.ARTICLE_TABLE_NAME}
  293. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  294. VALUES
  295. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  296. """
  297. self.crawler_client.save(query=insert_sql, params=info_tuple)
  298. log(
  299. task="updatePublishedMsgDaily",
  300. function="insert_each_msg",
  301. message="插入文章数据成功",
  302. data={"info": info_tuple},
  303. )
  304. except Exception as e:
  305. try:
  306. update_sql = f"""
  307. UPDATE {self.ARTICLE_TABLE_NAME}
  308. SET show_view_count = %s, show_like_count=%s
  309. WHERE wx_sn = %s;
  310. """
  311. self.crawler_client.save(
  312. query=update_sql,
  313. params=(show_view_count, show_like_count, wx_sn),
  314. )
  315. log(
  316. task="updatePublishedMsgDaily",
  317. function="insert_each_msg",
  318. message="更新文章数据成功",
  319. data={
  320. "wxSn": wx_sn,
  321. "likeCount": show_like_count,
  322. "viewCount": show_view_count,
  323. },
  324. )
  325. except Exception as e:
  326. log(
  327. task="updatePublishedMsgDaily",
  328. function="insert_each_msg",
  329. message="更新文章失败, 报错原因是: {}".format(e),
  330. status="fail",
  331. )
  332. continue
  333. class UpdatePublishedArticlesTaskCollector(UpdatePublishedArticlesTaskBase):
  334. def deal(self):
  335. account_list = self.get_account_list()
  336. for account in tqdm(account_list, desc="抓取每个账号的文章信息"):
  337. try:
  338. self.crawl_account_published_articles(account)
  339. except Exception as e:
  340. log(
  341. task="update_published_articles_collector",
  342. function="crawl_account_published_articles",
  343. message=f"抓取账号文章信息失败, 报错原因是: {e}",
  344. status="fail",
  345. data={
  346. "account": account,
  347. "error": traceback.format_exc(),
  348. }
  349. )
  350. # self.feishu_bot_api.bot(
  351. # title='更新每日发布文章任务完成通知',
  352. # detail={
  353. # "msg": "账号更新完成",
  354. # "finish_time": datetime.today().__str__()
  355. # },
  356. # mention=False
  357. # )
  358. class UpdatePublishedArticlesTaskChecker(UpdatePublishedArticlesTaskBase):
  359. def deal(self):
  360. pass
  361. class UpdatePublishedArticlesTaskArticlesMonitor(UpdatePublishedArticlesTaskBase):
  362. pass