article_detail_stat.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. import json
  2. import time
  3. import traceback
  4. from datetime import datetime, timedelta
  5. from applications.api import feishu_robot
  6. from applications.crawler.wechat import get_gzh_stat_daily
  7. from applications.crawler.wechat import get_access_token
  8. from applications.utils import run_tasks_with_asyncio_task_group
  9. class ArticleDetailStatConst:
  10. # Task Status
  11. INIT_STATUS = 0
  12. PROCESSING_STATUS = 1
  13. SUCCESS_STATUS = 2
  14. FAILED_STATUS = 99
  15. # Account Status
  16. ACCOUNT_VALID_STATUS = 1
  17. ACCOUNT_INVALID_STATUS = 0
  18. # Cookie Status
  19. COOKIE_VALID_STATUS = 1
  20. COOKIE_INVALID_STATUS = 0
  21. # Gap Time
  22. GAP_DURATION = 300
  23. # 小数点保留位数
  24. DECIMAL_PRECISION = 7
  25. class ArticleDetailStatMapper(ArticleDetailStatConst):
  26. def __init__(self, pool, log_service):
  27. self.pool = pool
  28. self.log_service = log_service
  29. # 获取账号信息
  30. async def fetch_monitor_accounts(self):
  31. query = """
  32. SELECT gh_id, account_name, app_id, app_secret
  33. FROM gzh_account_info WHERE status = %s;
  34. """
  35. return await self.pool.async_fetch(
  36. query=query, params=(self.ACCOUNT_VALID_STATUS,)
  37. )
  38. # 更新 access_token
  39. async def set_access_token_for_each_account(
  40. self, gh_id, access_token, expire_timestamp
  41. ):
  42. query = """
  43. UPDATE gzh_cookie_info
  44. SET access_token = %s, access_token_status = %s, expire_timestamp = %s
  45. WHERE gh_id = %s;
  46. """
  47. return await self.pool.async_save(
  48. query=query,
  49. params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id),
  50. )
  51. # 从数据库获取 access_token
  52. async def get_access_token_from_database(self, gh_id):
  53. query = """
  54. SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
  55. """
  56. return await self.pool.async_fetch(query=query, params=(gh_id,))
  57. # 从official_articles_v2 获取文章详情
  58. async def fetch_article_detail(self, gh_id, msg_id):
  59. query = """
  60. SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5
  61. FROM official_articles_v2
  62. WHERE ghId = %s AND appMsgId = %s;
  63. """
  64. return await self.pool.async_fetch(
  65. query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id)
  66. )
  67. # 存储文章的各个表现
  68. async def save_article_performance(self, params: tuple):
  69. query = """
  70. INSERT INTO article_daily_performance
  71. (
  72. account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date,
  73. read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate,
  74. read_avg_activetime, zaikan_user, like_user, comment_count, collection_user
  75. ) VALUES (
  76. %s, %s, %s, %s, %s, %s, %s, %s, %s,
  77. %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
  78. )
  79. ON DUPLICATE KEY UPDATE
  80. read_user = VALUES(read_user),
  81. share_user = VALUES(share_user),
  82. read_subscribe_user = VALUES(read_subscribe_user),
  83. read_delivery_rate = VALUES(read_delivery_rate),
  84. read_finish_rate = VALUES(read_finish_rate),
  85. read_avg_activetime = VALUES(read_avg_activetime),
  86. zaikan_user = VALUES(zaikan_user),
  87. like_user = VALUES(like_user),
  88. comment_count = VALUES(comment_count),
  89. collection_user = VALUES(collection_user),
  90. updated_at = CURRENT_TIMESTAMP;
  91. """
  92. return await self.pool.async_save(query=query, params=params)
  93. # 存储文章的阅读分布信息
  94. async def save_article_read_distribution(self, params: tuple):
  95. query = """
  96. INSERT INTO article_read_distribution
  97. (
  98. wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate,
  99. src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user,
  100. src_search_user, src_other_user, src_total_user
  101. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  102. ON DUPLICATE KEY UPDATE
  103. jump_pos_1_rate = VALUES(jump_pos_1_rate),
  104. jump_pos_2_rate = VALUES(jump_pos_2_rate),
  105. jump_pos_3_rate = VALUES(jump_pos_3_rate),
  106. jump_pos_4_rate = VALUES(jump_pos_4_rate),
  107. jump_pos_5_rate = VALUES(jump_pos_5_rate),
  108. src_mp_message_user = VALUES(src_mp_message_user),
  109. src_mp_home_user = VALUES(src_mp_home_user),
  110. src_chat_user = VALUES(src_chat_user),
  111. src_moments_user = VALUES(src_moments_user),
  112. src_recommend_user = VALUES(src_recommend_user),
  113. src_search_user = VALUES(src_search_user),
  114. src_other_user = VALUES(src_other_user),
  115. src_total_user = VALUES(src_total_user),
  116. updated_at = CURRENT_TIMESTAMP;
  117. """
  118. return await self.pool.async_save(query=query, params=params)
  119. class ArticleDetailStat(ArticleDetailStatMapper):
  120. def __init__(self, pool, log_service):
  121. super().__init__(pool, log_service)
  122. # 存储账号信息
  123. async def save_account_details(self, account, fetch_response):
  124. article_list = fetch_response.get("list")
  125. if not article_list:
  126. return 0
  127. is_delay = fetch_response["is_delay"]
  128. if is_delay:
  129. print("数据延迟,重新处理")
  130. return 0
  131. async def _process_article_detail(_article_list):
  132. _msg_id_set = set()
  133. for _article in _article_list:
  134. _msg_id = _article["msgid"].split("_")[0]
  135. _msg_id_set.add(_msg_id)
  136. _article_mapper = {}
  137. for _msg_id in _msg_id_set:
  138. _publish_articles = await self.fetch_article_detail(
  139. gh_id=account["gh_id"], msg_id=_msg_id
  140. )
  141. for _publish_article in _publish_articles:
  142. _key = f"{_msg_id}_{_publish_article['ItemIndex']}"
  143. _value = {
  144. "wx_sn": _publish_article["wx_sn"].decode("utf-8"),
  145. "title": _publish_article["title"],
  146. "content_url": _publish_article["ContentUrl"],
  147. }
  148. _article_mapper[_key] = _value
  149. return _article_mapper
  150. account_published_articles = await _process_article_detail(article_list)
  151. async def _update_single_article(_article):
  152. # 文章基本信息
  153. app_msg_id = _article["msgid"]
  154. msg_id = app_msg_id.split("_")[0]
  155. position = app_msg_id.split("_")[1]
  156. wx_sn = account_published_articles.get(app_msg_id)["wx_sn"]
  157. # 文章表现
  158. article_performance = _article["detail_list"][0]
  159. await self.save_article_performance(
  160. params=(
  161. account["account_name"],
  162. account["gh_id"],
  163. wx_sn,
  164. app_msg_id,
  165. msg_id,
  166. position,
  167. account_published_articles.get(app_msg_id)["title"],
  168. account_published_articles.get(app_msg_id)["content_url"],
  169. article_performance["stat_date"],
  170. article_performance["read_user"],
  171. article_performance["share_user"],
  172. article_performance["read_subscribe_user"],
  173. round(
  174. article_performance["read_delivery_rate"],
  175. self.DECIMAL_PRECISION,
  176. ),
  177. round(
  178. article_performance["read_finish_rate"], self.DECIMAL_PRECISION
  179. ),
  180. round(
  181. article_performance["read_avg_activetime"],
  182. self.DECIMAL_PRECISION,
  183. ),
  184. article_performance["zaikan_user"],
  185. article_performance["like_user"],
  186. article_performance["comment_count"],
  187. article_performance["collection_user"],
  188. )
  189. )
  190. # 表现分布信息
  191. jump_rate_map = {
  192. i["position"]: round(i["rate"], self.DECIMAL_PRECISION)
  193. for i in article_performance["read_jump_position"]
  194. }
  195. source_map = {
  196. i["scene_desc"]: i["user_count"]
  197. for i in article_performance["read_user_source"]
  198. }
  199. await self.save_article_read_distribution(
  200. params=(
  201. wx_sn,
  202. jump_rate_map[1],
  203. jump_rate_map[2],
  204. jump_rate_map[3],
  205. jump_rate_map[4],
  206. jump_rate_map[5],
  207. source_map["公众号消息"],
  208. source_map["公众号主页"],
  209. source_map["聊天会话"],
  210. source_map["朋友圈"],
  211. source_map["推荐"],
  212. source_map["搜一搜"],
  213. source_map["其他"],
  214. source_map["全部"],
  215. )
  216. )
  217. return await run_tasks_with_asyncio_task_group(
  218. task_list=article_list,
  219. handler=_update_single_article,
  220. description="批量更新文章阅读信息",
  221. unit="article",
  222. )
  223. # 处理单个账号
  224. async def process_single_account(self, account: dict):
  225. gh_id = account["gh_id"]
  226. token_info = await self.get_access_token_from_database(gh_id)
  227. if not token_info:
  228. return
  229. # 更新 token
  230. async def update_token(_new_token_info):
  231. _access_token = _new_token_info["access_token"]
  232. _expires_in = _new_token_info["expires_in"]
  233. await self.set_access_token_for_each_account(
  234. gh_id=account["gh_id"],
  235. access_token=_access_token,
  236. expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
  237. )
  238. print(f"{account['account_name']} access_token updated to database")
  239. expire_timestamp = token_info[0]["expire_timestamp"] or 0
  240. if int(time.time()) >= expire_timestamp:
  241. print(f"{account['account_name']} access_token expired")
  242. new_token_info = await get_access_token(
  243. account["app_id"], account["app_secret"]
  244. )
  245. access_token = new_token_info["access_token"]
  246. await update_token(_new_token_info=new_token_info)
  247. else:
  248. access_token = token_info[0]["access_token"]
  249. # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d")
  250. dt_list = [
  251. (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
  252. for i in range(1, 2)
  253. ]
  254. for dt in dt_list:
  255. print(f"{account['account_name']} crawl {dt} read_data")
  256. fetch_response = await get_gzh_stat_daily(
  257. access_token=access_token, date_string=dt
  258. )
  259. # 请求失败的情况
  260. if fetch_response.get("errcode") == 40001:
  261. fetch_new_token_info = await get_access_token(
  262. account["app_id"], account["app_secret"]
  263. )
  264. await update_token(_new_token_info=fetch_new_token_info)
  265. break
  266. # 处理并且落表
  267. await self.save_account_details(account, fetch_response)
  268. # 入口函数
  269. async def deal(self):
  270. accounts = await self.fetch_monitor_accounts()
  271. if not accounts:
  272. return
  273. for account in accounts:
  274. try:
  275. await self.process_single_account(account)
  276. await self.log_service.log(
  277. contents={
  278. "task": "article_detail_stat",
  279. "account_name": account["account_name"],
  280. "status": "success"
  281. }
  282. )
  283. except Exception as e:
  284. await self.log_service.log(
  285. contents={
  286. "task": "article_detail_stat",
  287. "account_name": account["account_name"],
  288. "error": str(e),
  289. "traceback": traceback.format_exc(),
  290. "status": "fail"
  291. }
  292. )