article_detail_stat.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. import json
  2. import time
  3. from datetime import datetime, timedelta
  4. from applications.api import feishu_robot
  5. from applications.crawler.wechat import get_gzh_stat_daily
  6. from applications.crawler.wechat import get_access_token
  7. from applications.utils import run_tasks_with_asyncio_task_group
  8. class ArticleDetailStatConst:
  9. # Task Status
  10. INIT_STATUS = 0
  11. PROCESSING_STATUS = 1
  12. SUCCESS_STATUS = 2
  13. FAILED_STATUS = 99
  14. # Account Status
  15. ACCOUNT_VALID_STATUS = 1
  16. ACCOUNT_INVALID_STATUS = 0
  17. # Cookie Status
  18. COOKIE_VALID_STATUS = 1
  19. COOKIE_INVALID_STATUS = 0
  20. # Gap Time
  21. GAP_DURATION = 300
  22. # 小数点保留位数
  23. DECIMAL_PRECISION = 7
  24. class ArticleDetailStatMapper(ArticleDetailStatConst):
  25. def __init__(self, pool, log_client):
  26. self.pool = pool
  27. self.log_client = log_client
  28. # 获取账号信息
  29. async def fetch_monitor_accounts(self):
  30. query = """
  31. SELECT gh_id, account_name, app_id, app_secret
  32. FROM gzh_account_info WHERE status = %s;
  33. """
  34. return await self.pool.async_fetch(
  35. query=query, params=(self.ACCOUNT_VALID_STATUS,)
  36. )
  37. # 更新 access_token
  38. async def set_access_token_for_each_account(
  39. self, gh_id, access_token, expire_timestamp
  40. ):
  41. query = """
  42. UPDATE gzh_cookie_info
  43. SET access_token = %s, access_token_status = %s, expire_timestamp = %s
  44. WHERE gh_id = %s;
  45. """
  46. return await self.pool.async_save(
  47. query=query,
  48. params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id),
  49. )
  50. # 从数据库获取 access_token
  51. async def get_access_token_from_database(self, gh_id):
  52. query = """
  53. SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
  54. """
  55. return await self.pool.async_fetch(query=query, params=(gh_id,))
  56. # 从official_articles_v2 获取文章详情
  57. async def fetch_article_detail(self, gh_id, msg_id):
  58. query = """
  59. SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5
  60. FROM official_articles_v2
  61. WHERE ghId = %s AND appMsgId = %s;
  62. """
  63. return await self.pool.async_fetch(
  64. query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id)
  65. )
  66. # 存储文章的各个表现
  67. async def save_article_performance(self, params: tuple):
  68. query = """
  69. INSERT INTO article_daily_performance
  70. (
  71. account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date,
  72. read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate,
  73. read_avg_activetime, zaikan_user, like_user, comment_count, collection_user
  74. ) VALUES (
  75. %s, %s, %s, %s, %s, %s, %s, %s, %s,
  76. %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
  77. )
  78. ON DUPLICATE KEY UPDATE
  79. read_user = VALUES(read_user),
  80. share_user = VALUES(share_user),
  81. read_subscribe_user = VALUES(read_subscribe_user),
  82. read_delivery_rate = VALUES(read_delivery_rate),
  83. read_finish_rate = VALUES(read_finish_rate),
  84. read_avg_activetime = VALUES(read_avg_activetime),
  85. zaikan_user = VALUES(zaikan_user),
  86. like_user = VALUES(like_user),
  87. comment_count = VALUES(comment_count),
  88. collection_user = VALUES(collection_user),
  89. updated_at = CURRENT_TIMESTAMP;
  90. """
  91. return await self.pool.async_save(query=query, params=params)
  92. # 存储文章的阅读分布信息
  93. async def save_article_read_distribution(self, params: tuple):
  94. query = """
  95. INSERT INTO article_read_distribution
  96. (
  97. wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate,
  98. src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user,
  99. src_search_user, src_other_user, src_total_user
  100. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  101. ON DUPLICATE KEY UPDATE
  102. jump_pos_1_rate = VALUES(jump_pos_1_rate),
  103. jump_pos_2_rate = VALUES(jump_pos_2_rate),
  104. jump_pos_3_rate = VALUES(jump_pos_3_rate),
  105. jump_pos_4_rate = VALUES(jump_pos_4_rate),
  106. jump_pos_5_rate = VALUES(jump_pos_5_rate),
  107. src_mp_message_user = VALUES(src_mp_message_user),
  108. src_mp_home_user = VALUES(src_mp_home_user),
  109. src_chat_user = VALUES(src_chat_user),
  110. src_moments_user = VALUES(src_moments_user),
  111. src_recommend_user = VALUES(src_recommend_user),
  112. src_search_user = VALUES(src_search_user),
  113. src_other_user = VALUES(src_other_user),
  114. src_total_user = VALUES(src_total_user),
  115. updated_at = CURRENT_TIMESTAMP;
  116. """
  117. return await self.pool.async_save(query=query, params=params)
  118. class ArticleDetailStat(ArticleDetailStatMapper):
  119. def __init__(self, pool, log_client):
  120. super().__init__(pool, log_client)
  121. # 存储账号信息
  122. async def save_account_details(self, account, fetch_response):
  123. article_list = fetch_response.get("list")
  124. if not article_list:
  125. return 0
  126. is_delay = fetch_response["is_delay"]
  127. if is_delay:
  128. print("数据延迟,重新处理")
  129. return 0
  130. async def _process_article_detail(_article_list):
  131. _msg_id_set = set()
  132. for _article in _article_list:
  133. _msg_id = _article["msgid"].split("_")[0]
  134. _msg_id_set.add(_msg_id)
  135. _article_mapper = {}
  136. for _msg_id in _msg_id_set:
  137. _publish_articles = await self.fetch_article_detail(
  138. gh_id=account["gh_id"], msg_id=_msg_id
  139. )
  140. for _publish_article in _publish_articles:
  141. _key = f"{_msg_id}_{_publish_article['ItemIndex']}"
  142. _value = {
  143. "wx_sn": _publish_article["wx_sn"].decode("utf-8"),
  144. "title": _publish_article["title"],
  145. "content_url": _publish_article["ContentUrl"],
  146. }
  147. _article_mapper[_key] = _value
  148. return _article_mapper
  149. account_published_articles = await _process_article_detail(article_list)
  150. async def _update_single_article(_article):
  151. # 文章基本信息
  152. app_msg_id = _article["msgid"]
  153. msg_id = app_msg_id.split("_")[0]
  154. position = app_msg_id.split("_")[1]
  155. wx_sn = account_published_articles.get(app_msg_id)["wx_sn"]
  156. # 文章表现
  157. article_performance = _article["detail_list"][0]
  158. await self.save_article_performance(
  159. params=(
  160. account["account_name"],
  161. account["gh_id"],
  162. wx_sn,
  163. app_msg_id,
  164. msg_id,
  165. position,
  166. account_published_articles.get(app_msg_id)["title"],
  167. account_published_articles.get(app_msg_id)["content_url"],
  168. article_performance["stat_date"],
  169. article_performance["read_user"],
  170. article_performance["share_user"],
  171. article_performance["read_subscribe_user"],
  172. round(
  173. article_performance["read_delivery_rate"],
  174. self.DECIMAL_PRECISION,
  175. ),
  176. round(
  177. article_performance["read_finish_rate"], self.DECIMAL_PRECISION
  178. ),
  179. round(
  180. article_performance["read_avg_activetime"],
  181. self.DECIMAL_PRECISION,
  182. ),
  183. article_performance["zaikan_user"],
  184. article_performance["like_user"],
  185. article_performance["comment_count"],
  186. article_performance["collection_user"],
  187. )
  188. )
  189. # 表现分布信息
  190. jump_rate_map = {
  191. i["position"]: round(i["rate"], self.DECIMAL_PRECISION)
  192. for i in article_performance["read_jump_position"]
  193. }
  194. source_map = {
  195. i["scene_desc"]: i["user_count"]
  196. for i in article_performance["read_user_source"]
  197. }
  198. await self.save_article_read_distribution(
  199. params=(
  200. wx_sn,
  201. jump_rate_map[1],
  202. jump_rate_map[2],
  203. jump_rate_map[3],
  204. jump_rate_map[4],
  205. jump_rate_map[5],
  206. source_map["公众号消息"],
  207. source_map["公众号主页"],
  208. source_map["聊天会话"],
  209. source_map["朋友圈"],
  210. source_map["推荐"],
  211. source_map["搜一搜"],
  212. source_map["其他"],
  213. source_map["全部"],
  214. )
  215. )
  216. return await run_tasks_with_asyncio_task_group(
  217. task_list=article_list,
  218. handler=_update_single_article,
  219. description="批量更新文章阅读信息",
  220. unit="article",
  221. )
  222. # 处理单个账号
  223. async def process_single_account(self, account: dict):
  224. gh_id = account["gh_id"]
  225. token_info = await self.get_access_token_from_database(gh_id)
  226. if not token_info:
  227. return
  228. # 更新 token
  229. async def update_token(_new_token_info):
  230. _access_token = _new_token_info["access_token"]
  231. _expires_in = _new_token_info["expires_in"]
  232. await self.set_access_token_for_each_account(
  233. gh_id=account["gh_id"],
  234. access_token=_access_token,
  235. expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
  236. )
  237. print(f"{account['account_name']} access_token updated to database")
  238. expire_timestamp = token_info[0]["expire_timestamp"] or 0
  239. if int(time.time()) >= expire_timestamp:
  240. print(f"{account['account_name']} access_token expired")
  241. new_token_info = await get_access_token(
  242. account["app_id"], account["app_secret"]
  243. )
  244. access_token = new_token_info["access_token"]
  245. await update_token(_new_token_info=new_token_info)
  246. else:
  247. access_token = token_info[0]["access_token"]
  248. # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d")
  249. dt_list = [
  250. (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
  251. for i in range(1, 31)
  252. ]
  253. for dt in dt_list:
  254. print(f"{account['account_name']} crawl {dt} read_data")
  255. fetch_response = await get_gzh_stat_daily(
  256. access_token=access_token, date_string=dt
  257. )
  258. # 请求失败的情况
  259. if fetch_response.get("errcode") == 40001:
  260. fetch_new_token_info = await get_access_token(
  261. account["app_id"], account["app_secret"]
  262. )
  263. await update_token(_new_token_info=fetch_new_token_info)
  264. break
  265. # 处理并且落表
  266. await self.save_account_details(account, fetch_response)
  267. # 入口函数
  268. async def deal(self):
  269. accounts = await self.fetch_monitor_accounts()
  270. for account in accounts:
  271. await self.process_single_account(account)