article_detail_stat.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import time
  2. import traceback
  3. from datetime import datetime, timedelta
  4. from app.infra.crawler.wechat import get_gzh_stat_daily
  5. from app.infra.crawler.wechat import get_access_token
  6. from app.infra.shared import run_tasks_with_asyncio_task_group
  7. class ArticleDetailStatConst:
  8. # Task Status
  9. INIT_STATUS = 0
  10. PROCESSING_STATUS = 1
  11. SUCCESS_STATUS = 2
  12. FAILED_STATUS = 99
  13. # Account Status
  14. ACCOUNT_VALID_STATUS = 1
  15. ACCOUNT_INVALID_STATUS = 0
  16. # Cookie Status
  17. COOKIE_VALID_STATUS = 1
  18. COOKIE_INVALID_STATUS = 0
  19. # Gap Time
  20. GAP_DURATION = 300
  21. # 小数点保留位数
  22. DECIMAL_PRECISION = 7
  23. class ArticleDetailStatMapper(ArticleDetailStatConst):
  24. def __init__(self, pool, log_service):
  25. self.pool = pool
  26. self.log_service = log_service
  27. # 获取账号信息
  28. async def fetch_monitor_accounts(self):
  29. query = """
  30. SELECT gh_id, account_name, app_id, app_secret
  31. FROM gzh_account_info WHERE status = %s;
  32. """
  33. return await self.pool.async_fetch(
  34. query=query, params=(self.ACCOUNT_VALID_STATUS,)
  35. )
  36. # 更新 access_token
  37. async def set_access_token_for_each_account(
  38. self, gh_id, access_token, expire_timestamp
  39. ):
  40. query = """
  41. UPDATE gzh_cookie_info
  42. SET access_token = %s, access_token_status = %s, expire_timestamp = %s
  43. WHERE gh_id = %s;
  44. """
  45. return await self.pool.async_save(
  46. query=query,
  47. params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id),
  48. )
  49. # 从数据库获取 access_token
  50. async def get_access_token_from_database(self, gh_id):
  51. query = """
  52. SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
  53. """
  54. return await self.pool.async_fetch(query=query, params=(gh_id,))
  55. # 从official_articles_v2 获取文章详情
  56. async def fetch_article_detail(self, gh_id, msg_id):
  57. query = """
  58. SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5
  59. FROM official_articles_v2
  60. WHERE ghId = %s AND appMsgId = %s;
  61. """
  62. return await self.pool.async_fetch(
  63. query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id)
  64. )
  65. # 存储文章的各个表现
  66. async def save_article_performance(self, params: tuple):
  67. query = """
  68. INSERT INTO article_daily_performance
  69. (
  70. account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date,
  71. read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate,
  72. read_avg_activetime, zaikan_user, like_user, comment_count, collection_user
  73. ) VALUES (
  74. %s, %s, %s, %s, %s, %s, %s, %s, %s,
  75. %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
  76. )
  77. ON DUPLICATE KEY UPDATE
  78. read_user = VALUES(read_user),
  79. share_user = VALUES(share_user),
  80. read_subscribe_user = VALUES(read_subscribe_user),
  81. read_delivery_rate = VALUES(read_delivery_rate),
  82. read_finish_rate = VALUES(read_finish_rate),
  83. read_avg_activetime = VALUES(read_avg_activetime),
  84. zaikan_user = VALUES(zaikan_user),
  85. like_user = VALUES(like_user),
  86. comment_count = VALUES(comment_count),
  87. collection_user = VALUES(collection_user),
  88. updated_at = CURRENT_TIMESTAMP;
  89. """
  90. return await self.pool.async_save(query=query, params=params)
  91. # 存储文章的阅读分布信息
  92. async def save_article_read_distribution(self, params: tuple):
  93. query = """
  94. INSERT INTO article_read_distribution
  95. (
  96. wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate,
  97. src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user,
  98. src_search_user, src_other_user, src_total_user
  99. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  100. ON DUPLICATE KEY UPDATE
  101. jump_pos_1_rate = VALUES(jump_pos_1_rate),
  102. jump_pos_2_rate = VALUES(jump_pos_2_rate),
  103. jump_pos_3_rate = VALUES(jump_pos_3_rate),
  104. jump_pos_4_rate = VALUES(jump_pos_4_rate),
  105. jump_pos_5_rate = VALUES(jump_pos_5_rate),
  106. src_mp_message_user = VALUES(src_mp_message_user),
  107. src_mp_home_user = VALUES(src_mp_home_user),
  108. src_chat_user = VALUES(src_chat_user),
  109. src_moments_user = VALUES(src_moments_user),
  110. src_recommend_user = VALUES(src_recommend_user),
  111. src_search_user = VALUES(src_search_user),
  112. src_other_user = VALUES(src_other_user),
  113. src_total_user = VALUES(src_total_user),
  114. updated_at = CURRENT_TIMESTAMP;
  115. """
  116. return await self.pool.async_save(query=query, params=params)
  117. class ArticleDetailStat(ArticleDetailStatMapper):
  118. def __init__(self, pool, log_service):
  119. super().__init__(pool, log_service)
  120. # 存储账号信息
  121. async def save_account_details(self, account, fetch_response):
  122. article_list = fetch_response.get("list")
  123. if not article_list:
  124. return 0
  125. is_delay = fetch_response["is_delay"]
  126. if is_delay:
  127. print("数据延迟,重新处理")
  128. return 0
  129. async def _process_article_detail(_article_list):
  130. _msg_id_set = set()
  131. for _article in _article_list:
  132. _msg_id = _article["msgid"].split("_")[0]
  133. _msg_id_set.add(_msg_id)
  134. _article_mapper = {}
  135. for _msg_id in _msg_id_set:
  136. _publish_articles = await self.fetch_article_detail(
  137. gh_id=account["gh_id"], msg_id=_msg_id
  138. )
  139. for _publish_article in _publish_articles:
  140. _key = f"{_msg_id}_{_publish_article['ItemIndex']}"
  141. _value = {
  142. "wx_sn": _publish_article["wx_sn"].decode("utf-8"),
  143. "title": _publish_article["title"],
  144. "content_url": _publish_article["ContentUrl"],
  145. }
  146. _article_mapper[_key] = _value
  147. return _article_mapper
  148. account_published_articles = await _process_article_detail(article_list)
  149. async def _update_single_article(_article):
  150. # 文章基本信息
  151. app_msg_id = _article["msgid"]
  152. msg_id = app_msg_id.split("_")[0]
  153. position = app_msg_id.split("_")[1]
  154. wx_sn = account_published_articles.get(app_msg_id)["wx_sn"]
  155. # 文章表现
  156. article_performance = _article["detail_list"][0]
  157. await self.save_article_performance(
  158. params=(
  159. account["account_name"],
  160. account["gh_id"],
  161. wx_sn,
  162. app_msg_id,
  163. msg_id,
  164. position,
  165. account_published_articles.get(app_msg_id)["title"],
  166. account_published_articles.get(app_msg_id)["content_url"],
  167. article_performance["stat_date"],
  168. article_performance["read_user"],
  169. article_performance["share_user"],
  170. article_performance["read_subscribe_user"],
  171. round(
  172. article_performance["read_delivery_rate"],
  173. self.DECIMAL_PRECISION,
  174. ),
  175. round(
  176. article_performance["read_finish_rate"], self.DECIMAL_PRECISION
  177. ),
  178. round(
  179. article_performance["read_avg_activetime"],
  180. self.DECIMAL_PRECISION,
  181. ),
  182. article_performance["zaikan_user"],
  183. article_performance["like_user"],
  184. article_performance["comment_count"],
  185. article_performance["collection_user"],
  186. )
  187. )
  188. # 表现分布信息
  189. jump_rate_map = {
  190. i["position"]: round(i["rate"], self.DECIMAL_PRECISION)
  191. for i in article_performance["read_jump_position"]
  192. }
  193. source_map = {
  194. i["scene_desc"]: i["user_count"]
  195. for i in article_performance["read_user_source"]
  196. }
  197. await self.save_article_read_distribution(
  198. params=(
  199. wx_sn,
  200. jump_rate_map[1],
  201. jump_rate_map[2],
  202. jump_rate_map[3],
  203. jump_rate_map[4],
  204. jump_rate_map[5],
  205. source_map["公众号消息"],
  206. source_map["公众号主页"],
  207. source_map["聊天会话"],
  208. source_map["朋友圈"],
  209. source_map["推荐"],
  210. source_map["搜一搜"],
  211. source_map["其他"],
  212. source_map["全部"],
  213. )
  214. )
  215. return await run_tasks_with_asyncio_task_group(
  216. task_list=article_list,
  217. handler=_update_single_article,
  218. description="批量更新文章阅读信息",
  219. unit="article",
  220. )
  221. # 处理单个账号
  222. async def process_single_account(self, account: dict):
  223. gh_id = account["gh_id"]
  224. token_info = await self.get_access_token_from_database(gh_id)
  225. if not token_info:
  226. return
  227. # 更新 token
  228. async def update_token(_new_token_info):
  229. _access_token = _new_token_info["access_token"]
  230. _expires_in = _new_token_info["expires_in"]
  231. await self.set_access_token_for_each_account(
  232. gh_id=account["gh_id"],
  233. access_token=_access_token,
  234. expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
  235. )
  236. print(f"{account['account_name']} access_token updated to database")
  237. expire_timestamp = token_info[0]["expire_timestamp"] or 0
  238. if int(time.time()) >= expire_timestamp:
  239. print(f"{account['account_name']} access_token expired")
  240. new_token_info = await get_access_token(
  241. account["app_id"], account["app_secret"]
  242. )
  243. access_token = new_token_info["access_token"]
  244. await update_token(_new_token_info=new_token_info)
  245. else:
  246. access_token = token_info[0]["access_token"]
  247. # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d")
  248. dt_list = [
  249. (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
  250. for i in range(1, 2)
  251. ]
  252. for dt in dt_list:
  253. print(f"{account['account_name']} crawl {dt} read_data")
  254. fetch_response = await get_gzh_stat_daily(
  255. access_token=access_token, date_string=dt
  256. )
  257. # 请求失败的情况
  258. if fetch_response.get("errcode") == 40001:
  259. fetch_new_token_info = await get_access_token(
  260. account["app_id"], account["app_secret"]
  261. )
  262. await update_token(_new_token_info=fetch_new_token_info)
  263. break
  264. # 处理并且落表
  265. await self.save_account_details(account, fetch_response)
  266. # 入口函数
  267. async def deal(self):
  268. accounts = await self.fetch_monitor_accounts()
  269. if not accounts:
  270. return
  271. for account in accounts:
  272. try:
  273. await self.process_single_account(account)
  274. await self.log_service.log(
  275. contents={
  276. "task": "article_detail_stat",
  277. "account_name": account["account_name"],
  278. "status": "success",
  279. }
  280. )
  281. except Exception as e:
  282. await self.log_service.log(
  283. contents={
  284. "task": "article_detail_stat",
  285. "account_name": account["account_name"],
  286. "error": str(e),
  287. "traceback": traceback.format_exc(),
  288. "status": "fail",
  289. }
  290. )