cooperate_accounts_monitor.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import asyncio
  2. import json
  3. import random
  4. import traceback
  5. from tqdm import tqdm
  6. from datetime import datetime, timedelta
  7. from urllib.parse import unquote, parse_qs, urlparse
  8. from applications.utils import fetch_from_odps
  9. from applications.crawler.wechat import get_article_list_from_account
  10. from applications.crawler.wechat import get_article_detail
  11. class CooperateAccountsMonitorTaskConst:
  12. INVALID_STATUS = 0
  13. VALID_STATUS = 1
  14. INIT_STATUS = 0
  15. PROCESSING_STATUS = 1
  16. SUCCESS_STATUS = 2
  17. FAIL_STATUS = 99
  18. HAS_MINI_PROGRAM = 1
  19. DONT_HAS_MINI_PROGRAM = 0
  20. ARTICLE_NUM = 200
  21. class CooperateAccountsMonitorTaskUtils(CooperateAccountsMonitorTaskConst):
  22. @staticmethod
  23. def get_monitor_account_list():
  24. # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  25. week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
  26. query = f"""
  27. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  28. FROM loghubods.opengid_base_data
  29. WHERE dt = MAX_PT('loghubods.opengid_base_data')
  30. AND hotsencetype = 1058
  31. AND usersharedepth = 0
  32. AND channel = '公众号合作-即转-稳定'
  33. AND 点击时间 >= '{week_ago}'
  34. GROUP BY 公众号名, ghid
  35. ORDER BY uv DESC
  36. ;
  37. """
  38. result = fetch_from_odps(query)
  39. return result
  40. @staticmethod
  41. def extract_page_path(page_path):
  42. # 解析外层 URL
  43. parsed_url = urlparse(page_path)
  44. outer_params = parse_qs(parsed_url.query)
  45. # 取出并解码 jumpPage
  46. jump_page = outer_params.get("jumpPage", [""])[0]
  47. if not jump_page:
  48. return None, None
  49. decoded_jump_page = unquote(jump_page)
  50. # 解析 jumpPage 内层参数
  51. inner_query = urlparse(decoded_jump_page).query
  52. inner_params = parse_qs(inner_query)
  53. video_id = inner_params.get("id", [None])[0]
  54. root_source_id = inner_params.get("rootSourceId", [None])[0]
  55. return video_id, root_source_id
  56. @staticmethod
  57. def extract_wx_sn(content_url):
  58. if not content_url:
  59. return None
  60. query = urlparse(content_url).query
  61. return parse_qs(query).get("sn", [None])[0]
  62. class CooperateAccountsMonitorMapper(CooperateAccountsMonitorTaskUtils):
  63. def __init__(self, pool, log_client):
  64. self.pool = pool
  65. self.log_client = log_client
  66. # 从 growth 数据库获取账号
  67. async def fetch_monitor_accounts(self):
  68. query = """
  69. SELECT t2.name AS partner_name, t2.channel AS partner_id,
  70. t1.name AS account_name, t1.gh_id
  71. FROM content_platform_gzh_account t1 JOIN content_platform_account t2
  72. ON t1.create_account_id = t2.id
  73. WHERE t1.status = 1;
  74. """
  75. return await self.pool.async_fetch(query=query, db_name="growth")
  76. # 获取 gh_id 的兜底逻辑
  77. async def fetch_gh_id(self, account_name):
  78. query = """
  79. SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s;
  80. """
  81. fetch_response = await self.pool.async_fetch(
  82. query=query, db_name="growth", params=(account_name, self.VALID_STATUS)
  83. )
  84. return fetch_response[0].get("gh_id", None) if fetch_response else None
  85. # 修改 fetch 状态
  86. async def update_fetch_status(self, wx_sn, ori_status, new_status):
  87. query = """
  88. UPDATE cooperate_accounts_daily_detail SET fetch_status = %s WHERE wx_sn = %s AND fetch_status = %s;
  89. """
  90. return await self.pool.async_save(
  91. query=query, params=(new_status, wx_sn, ori_status)
  92. )
  93. # 存储账号并且维护账号状态
  94. async def save_account(self, account):
  95. query = """
  96. INSERT IGNORE INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
  97. VALUES (%s, %s, %s, %s);
  98. """
  99. return await self.pool.async_save(
  100. query=query,
  101. params=(
  102. account["partner_name"],
  103. account["partner_id"],
  104. account["account_name"],
  105. account["gh_id"],
  106. ),
  107. )
  108. # 修改账号状态
  109. async def update_account_status(self, gh_id, ori_status, new_status, remark):
  110. query = """
  111. UPDATE cooperate_accounts SET status = %s, remark = %s WHERE gh_id = %s AND status = %s;
  112. """
  113. return await self.pool.async_save(
  114. query=query, params=(new_status, remark, gh_id, ori_status)
  115. )
  116. # 获取账号状态
  117. async def get_account_status(self, gh_id):
  118. query = """
  119. SELECT status FROM cooperate_accounts WHERE gh_id = %s;
  120. """
  121. fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
  122. return fetch_response
  123. class CooperateAccountsMonitorTask(CooperateAccountsMonitorMapper):
  124. def __init__(self, pool, log_client):
  125. super().__init__(pool, log_client)
  126. # 更新文章详情
  127. async def set_article_detail(self, article):
  128. wx_sn = article["wx_sn"]
  129. article_link = article["article_link"]
  130. # acquire lock
  131. acquire_lock = await self.update_fetch_status(
  132. wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS
  133. )
  134. if not acquire_lock:
  135. print("锁抢占失败")
  136. return acquire_lock
  137. article_detail = await get_article_detail(
  138. article_link, is_count=True, is_cache=False
  139. )
  140. if not article_detail:
  141. return await self.update_fetch_status(
  142. wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS
  143. )
  144. # 更新文章信息
  145. code = article_detail.get("code", None)
  146. match code:
  147. case 0:
  148. try:
  149. article = article_detail.get("data", {}).get("data", {})
  150. body_text = article.get("body_text", None)
  151. images = article.get("image_url_list", [])
  152. mini_program = article.get("mini_program", [])
  153. has_mini_program = (
  154. self.HAS_MINI_PROGRAM
  155. if mini_program
  156. else self.DONT_HAS_MINI_PROGRAM
  157. )
  158. read_cnt = article.get("view_count", None)
  159. like_cnt = article.get("like_count", None)
  160. share_cnt = article.get("share_count", None)
  161. looking_cnt = article.get("looking_count", None)
  162. publish_timestamp = article.get("publish_timestamp", None)
  163. await self.store_mini_program(mini_program, wx_sn)
  164. query = """
  165. UPDATE cooperate_accounts_daily_detail SET
  166. article_text = %s,
  167. article_images = %s,
  168. read_cnt = %s,
  169. like_cnt = %s,
  170. share_cnt = %s,
  171. looking_cnt = %s,
  172. publish_timestamp = %s,
  173. fetch_status = %s,
  174. has_mini_program = %s
  175. WHERE wx_sn = %s AND fetch_status = %s;
  176. """
  177. return await self.pool.async_save(
  178. query=query,
  179. params=(
  180. body_text,
  181. json.dumps(images, ensure_ascii=False),
  182. read_cnt,
  183. like_cnt,
  184. share_cnt,
  185. looking_cnt,
  186. int(publish_timestamp / 1000),
  187. self.SUCCESS_STATUS,
  188. has_mini_program,
  189. wx_sn,
  190. self.PROCESSING_STATUS,
  191. ),
  192. )
  193. except Exception as e:
  194. print(f"更新文章详情失败-{article_link}-{e}")
  195. return await self.update_fetch_status(
  196. wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
  197. )
  198. case _:
  199. return await self.update_fetch_status(
  200. wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
  201. )
  202. # 存储小程序信息
  203. async def store_mini_program(self, mini_program, wx_sn):
  204. for card_index, i in enumerate(mini_program, 1):
  205. try:
  206. video_id, root_source_id = self.extract_page_path(i["path"])
  207. card_title = i["title"]
  208. card_cover = i["image_url"]
  209. mini_name = i["nike_name"]
  210. query = """
  211. INSERT INTO cooperate_accounts_daily_mini_info
  212. (wx_sn, card_title, card_cover, video_id, root_source_id, mini_program_name, card_index)
  213. VALUES
  214. (%s, %s, %s, %s, %s, %s, %s);
  215. """
  216. await self.pool.async_save(
  217. query=query,
  218. params=(
  219. wx_sn,
  220. card_title,
  221. card_cover,
  222. video_id,
  223. root_source_id,
  224. mini_name,
  225. card_index,
  226. ),
  227. )
  228. except Exception as e:
  229. print(e)
  230. continue
  231. # 存储文章
  232. async def store_articles(self, gh_id, account_name, article_list):
  233. params = []
  234. for group_article in article_list:
  235. base_info = group_article["AppMsg"]["BaseInfo"]
  236. detail_info = group_article["AppMsg"]["DetailInfo"]
  237. for single_article in detail_info:
  238. single_param = (
  239. gh_id,
  240. account_name,
  241. base_info["AppMsgId"],
  242. base_info["Type"],
  243. single_article["ItemIndex"],
  244. single_article["Title"],
  245. single_article["ContentUrl"],
  246. single_article["CoverImgUrl"],
  247. single_article["Digest"],
  248. single_article["send_time"],
  249. self.extract_wx_sn(single_article["ContentUrl"]),
  250. )
  251. params.append(single_param)
  252. query = """
  253. INSERT IGNORE INTO cooperate_accounts_daily_detail
  254. (gh_id, account_name, app_msg_id, publish_type, position, article_title, article_link, article_cover, article_desc, publish_timestamp, wx_sn)
  255. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  256. """
  257. await self.pool.async_save(query=query, params=params, batch=True)
  258. # 存储单个账号
  259. async def store_single_accounts(self, account):
  260. account_name = account["account_name"]
  261. gh_id = account["gh_id"]
  262. account_status = await self.get_account_status(gh_id)
  263. if not account_status:
  264. # 账号没存在长文数据库
  265. affected_row = await self.save_account(account)
  266. print(f"账号{account_name}存储到数据库")
  267. if not affected_row:
  268. return
  269. account_status = await self.get_account_status(gh_id)
  270. account_using_status = account_status[0].get("status", None)
  271. if not account_using_status:
  272. print("账号违规")
  273. return
  274. # 只抓最新的文章
  275. crawl_response = await get_article_list_from_account(gh_id)
  276. await asyncio.sleep(random.randint(1, 3))
  277. if not crawl_response:
  278. return
  279. code = crawl_response.get("code")
  280. match code:
  281. case 0:
  282. article_list = crawl_response.get("data", {} or {}).get("data", [])
  283. # 将文章存储到库中
  284. await self.store_articles(gh_id, account_name, article_list)
  285. case 25013:
  286. msg = crawl_response.get("msg")
  287. await self.update_account_status(
  288. gh_id, self.VALID_STATUS, self.INVALID_STATUS, msg
  289. )
  290. case _:
  291. print(crawl_response["msg"])
  292. pass
  293. # 获取待处理的文章
  294. async def get_article_list(self):
  295. query = """
  296. SELECT wx_sn, article_link FROM cooperate_accounts_daily_detail
  297. WHERE fetch_status = %s ORDER BY position LIMIT %s;
  298. """
  299. return await self.pool.async_fetch(
  300. query=query, params=(self.INIT_STATUS, self.ARTICLE_NUM)
  301. )
  302. # 入口函数
  303. async def deal(self, task_name):
  304. match task_name:
  305. case "save_articles":
  306. account_list = await self.fetch_monitor_accounts()
  307. for account in tqdm(account_list):
  308. print(f"开始处理账号:{account['account_name']}")
  309. try:
  310. await self.store_single_accounts(account)
  311. except Exception as e:
  312. print(f"获取账号文章失败--{account['account_name']}--{e}")
  313. print(traceback.format_exc())
  314. case "get_detail":
  315. article_list = await self.get_article_list()
  316. for article in tqdm(article_list, desc="处理文章详情"):
  317. try:
  318. await self.set_article_detail(article)
  319. except Exception as e:
  320. print(f"获取文章详情失败-{article['article_link']}-{e}")