article_pool_cold_start.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. from __future__ import annotations
  2. import time
  3. import asyncio
  4. import datetime
  5. import traceback
  6. from typing import List
  7. from pandas import DataFrame
  8. from tqdm.asyncio import tqdm
  9. from app.infra.external import feishu_robot
  10. from app.infra.external import AsyncApolloApi
  11. from app.infra.internal import auto_create_crawler_task
  12. from app.infra.internal import auto_bind_crawler_task_to_generate_task
  13. from app.infra.internal import get_titles_from_produce_plan
  14. from app.domains.cold_start_tasks.article_pool import (
  15. ArticlePoolColdStartStrategy,
  16. ArticlePoolFilterStrategy,
  17. )
  18. class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
  19. def __init__(self, pool, log_client, trace_id, global_config):
  20. super().__init__(pool, log_client, trace_id)
  21. self.config = global_config.cold_start
  22. self.apollo_client = AsyncApolloApi(
  23. apollo_config=global_config.apollo, app_id=None, env=None
  24. )
  25. async def get_article_from_meta_table(
  26. self, platform: str, crawl_method: str, strategy: str, category: str | None
  27. ) -> DataFrame:
  28. """
  29. @param platform: 文章抓取平台
  30. @param crawl_method: 文章抓取模式
  31. @param strategy: 供给策略
  32. """
  33. match platform:
  34. case "weixin":
  35. article_list = await self.get_weixin_cold_start_articles(
  36. crawl_method, strategy, category
  37. )
  38. case "toutiao":
  39. article_list = await self.get_toutiao_cold_start_articles(
  40. crawl_method, strategy, category
  41. )
  42. case _:
  43. raise ValueError("Invalid platform")
  44. return DataFrame(
  45. article_list,
  46. columns=[
  47. "article_id",
  48. "title",
  49. "link",
  50. "llm_sensitivity",
  51. "score",
  52. "category_by_ai",
  53. ],
  54. )
  55. async def filter_published_titles(self, plan_id):
  56. """
  57. 过滤已添加至aigc中的标题
  58. """
  59. published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
  60. if not published_title_tuple:
  61. return 0
  62. update_query = """
  63. update crawler_meta_article set status = %s where title in %s and status = %s;
  64. """
  65. changed_rows = await self.pool.async_save(
  66. query=update_query,
  67. params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
  68. )
  69. return changed_rows
  70. async def insert_crawler_plan_into_database(
  71. self, crawler_plan_id, crawler_plan_name, create_timestamp
  72. ):
  73. query = """
  74. insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
  75. values (%s, %s, %s)
  76. """
  77. try:
  78. await self.pool.async_save(
  79. query=query,
  80. params=(crawler_plan_id, crawler_plan_name, create_timestamp),
  81. )
  82. except Exception as e:
  83. await feishu_robot.bot(
  84. title="品类冷启任务,记录抓取计划id失败",
  85. detail={
  86. "error": str(e),
  87. "error_msg": traceback.format_exc(),
  88. "crawler_plan_id": crawler_plan_id,
  89. "crawler_plan_name": crawler_plan_name,
  90. },
  91. )
  92. async def change_article_status_while_publishing(self, article_id_list):
  93. """
  94. :param: article_id_list: 文章的唯一 id
  95. :return:
  96. """
  97. query = """
  98. update crawler_meta_article
  99. set status = %s
  100. where article_id in %s and status = %s;
  101. """
  102. affect_rows = await self.pool.async_save(
  103. query=query,
  104. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
  105. )
  106. return affect_rows
  107. async def create_crawler_plan_and_bind_to_produce_plan(
  108. self,
  109. strategy: str,
  110. crawl_method: str,
  111. category: str,
  112. platform: str,
  113. url_list: List[str],
  114. plan_id: str,
  115. ):
  116. # create_crawler_plan
  117. crawler_plan_response = await auto_create_crawler_task(
  118. plan_id=None,
  119. plan_name=f"冷启动-{strategy}-{crawl_method}-{category}-{datetime.date.today().__str__()}-{len(url_list)}",
  120. plan_tag="品类冷启动",
  121. platform=platform,
  122. url_list=url_list,
  123. )
  124. # save to db
  125. create_timestamp = int(time.time()) * 1000
  126. crawler_plan_id = crawler_plan_response["data"]["id"]
  127. crawler_plan_name = crawler_plan_response["data"]["name"]
  128. await self.insert_crawler_plan_into_database(
  129. crawler_plan_id, crawler_plan_name, create_timestamp
  130. )
  131. # auto bind to generate plan
  132. new_crawler_task_list = [
  133. {
  134. "contentType": 1,
  135. "inputSourceType": 2,
  136. "inputSourceSubType": None,
  137. "fieldName": None,
  138. "inputSourceValue": crawler_plan_id,
  139. "inputSourceLabel": crawler_plan_name,
  140. "inputSourceModal": 3,
  141. "inputSourceChannel": self.config.input_source_map[platform],
  142. }
  143. ]
  144. generate_plan_response = await auto_bind_crawler_task_to_generate_task(
  145. crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
  146. )
  147. await self.log_client.log(
  148. contents={
  149. "task": "article_pool_cold_start",
  150. "platform": platform,
  151. "crawl_method": crawl_method,
  152. "status": "success",
  153. "trace_id": self.trace_id,
  154. "message": "绑定至生成计划成功",
  155. "data": generate_plan_response,
  156. }
  157. )
  158. async def create_cold_start_plan(
  159. self,
  160. platform,
  161. plan_id,
  162. strategy="strategy_v1",
  163. category=None,
  164. crawl_method=None,
  165. ):
  166. # get article data_frame from meta article
  167. article_dataframe = await self.get_article_from_meta_table(
  168. platform, crawl_method, strategy, category
  169. )
  170. await self.log_client.log(
  171. contents={
  172. "task": "article_pool_cold_start",
  173. "platform": platform,
  174. "crawl_method": crawl_method,
  175. "status": "success",
  176. "trace_id": self.trace_id,
  177. "message": "获取文章成功",
  178. "data": {"article_length": article_dataframe.shape[0]},
  179. }
  180. )
  181. filter_article_df = await self.article_pool_filter(
  182. strategy, platform, article_dataframe, crawl_method, category
  183. )
  184. match strategy:
  185. case "strategy_v1":
  186. # split article into each category
  187. category_list = await self.apollo_client.get_config_value(
  188. key="category_list"
  189. )
  190. for ai_category in category_list:
  191. filter_category_df = filter_article_df[
  192. filter_article_df["category_by_ai"] == ai_category
  193. ]
  194. url_list = filter_category_df["link"].values.tolist()
  195. if url_list:
  196. await self.create_crawler_plan_and_bind_to_produce_plan(
  197. strategy,
  198. crawl_method,
  199. ai_category,
  200. platform,
  201. url_list,
  202. plan_id,
  203. )
  204. # change article status
  205. article_id_list = filter_category_df[
  206. "article_id"
  207. ].values.tolist()
  208. await self.change_article_status_while_publishing(
  209. article_id_list=article_id_list
  210. )
  211. case "strategy_v2":
  212. url_list = filter_article_df["link"].values.tolist()
  213. await self.create_crawler_plan_and_bind_to_produce_plan(
  214. strategy, crawl_method, category, platform, url_list, plan_id
  215. )
  216. # change article status
  217. article_id_list = filter_article_df["article_id"].values.tolist()
  218. await self.change_article_status_while_publishing(
  219. article_id_list=article_id_list
  220. )
  221. case "strategy_v3":
  222. url_list = filter_article_df["link"].values.tolist()
  223. await self.create_crawler_plan_and_bind_to_produce_plan(
  224. strategy, crawl_method, category, platform, url_list, plan_id
  225. )
  226. # change article status
  227. article_id_list = filter_article_df["article_id"].values.tolist()
  228. await self.change_article_status_while_publishing(
  229. article_id_list=article_id_list
  230. )
  231. async def cold_start_by_category(self, category_list, platform, strategy):
  232. if not category_list:
  233. category_list = list(self.config.cold_start_category_map.keys())
  234. for category in tqdm(category_list):
  235. try:
  236. plan_id = self.config.cold_start_category_map[category]
  237. affected_rows = await self.filter_published_titles(plan_id)
  238. await self.log_client.log(
  239. contents={
  240. "task": "article_pool_cold_start",
  241. "platform": platform,
  242. "category": category,
  243. "status": "success",
  244. "trace_id": self.trace_id,
  245. "message": "通过已抓取标题修改文章状态",
  246. "data": {"affected_rows": affected_rows},
  247. }
  248. )
  249. await self.create_cold_start_plan(
  250. platform=platform,
  251. strategy=strategy,
  252. plan_id=plan_id,
  253. category=category,
  254. )
  255. await asyncio.sleep(120)
  256. except Exception as e:
  257. await feishu_robot.bot(
  258. title="文章冷启动异常",
  259. detail={
  260. "category": category,
  261. "strategy": strategy,
  262. "error": str(e),
  263. "function": "deal",
  264. "traceback": traceback.format_exc(),
  265. },
  266. )
  267. if self.cold_start_records:
  268. columns = [
  269. feishu_robot.create_feishu_columns_sheet(
  270. sheet_type="plain_text",
  271. sheet_name="category",
  272. display_name="文章品类",
  273. ),
  274. feishu_robot.create_feishu_columns_sheet(
  275. sheet_type="number",
  276. sheet_name="cold_start_num",
  277. display_name="本次冷启数量",
  278. ),
  279. feishu_robot.create_feishu_columns_sheet(
  280. sheet_type="number",
  281. sheet_name="total_length",
  282. display_name="总文章剩余数量",
  283. ),
  284. feishu_robot.create_feishu_columns_sheet(
  285. sheet_type="number",
  286. sheet_name="filter_by_title_length",
  287. display_name="标题长度过滤",
  288. ),
  289. feishu_robot.create_feishu_columns_sheet(
  290. sheet_type="number",
  291. sheet_name="filter_by_sensitivity",
  292. display_name="敏感词过滤",
  293. ),
  294. feishu_robot.create_feishu_columns_sheet(
  295. sheet_type="number",
  296. sheet_name="filter_by_llm_sensitivity",
  297. display_name="经过大模型判断敏感过滤",
  298. ),
  299. feishu_robot.create_feishu_columns_sheet(
  300. sheet_type="number",
  301. sheet_name="filter_by_score",
  302. display_name="经过相关性分过滤",
  303. ),
  304. feishu_robot.create_feishu_columns_sheet(
  305. sheet_type="number",
  306. sheet_name="read_avg_threshold",
  307. display_name="阅读均值倍数阈值",
  308. ),
  309. feishu_robot.create_feishu_columns_sheet(
  310. sheet_type="number",
  311. sheet_name="read_threshold",
  312. display_name="阅读量阈值",
  313. ),
  314. feishu_robot.create_feishu_columns_sheet(
  315. sheet_type="number",
  316. sheet_name="title_length_threshold",
  317. display_name="标题长度阈值",
  318. ),
  319. ]
  320. await feishu_robot.bot(
  321. title="长文文章路冷启动发布",
  322. detail={
  323. "columns": columns,
  324. "rows": self.cold_start_records,
  325. },
  326. table=True,
  327. mention=False,
  328. )
  329. async def deal(
  330. self,
  331. platform: str,
  332. strategy="strategy_v1",
  333. crawl_methods=None,
  334. category_list=None,
  335. ) -> None:
  336. """execute cold start task in different strategy"""
  337. match strategy:
  338. case "strategy_v1":
  339. if not crawl_methods:
  340. crawl_methods = self.DEFAULT_CRAWLER_METHODS
  341. await self.log_client.log(
  342. contents={
  343. "task": "article_pool_cold_start",
  344. "platform": platform,
  345. "crawl_methods": crawl_methods,
  346. "status": "success",
  347. "trace_id": self.trace_id,
  348. }
  349. )
  350. crawl_methods_map = await self.apollo_client.get_config_value(
  351. key="category_cold_start_map"
  352. )
  353. for crawl_method in crawl_methods:
  354. try:
  355. plan_id = crawl_methods_map[crawl_method]
  356. affected_rows = await self.filter_published_titles(plan_id)
  357. await self.log_client.log(
  358. contents={
  359. "task": "article_pool_cold_start",
  360. "platform": platform,
  361. "crawl_method": crawl_method,
  362. "status": "success",
  363. "trace_id": self.trace_id,
  364. "message": "通过已抓取标题修改文章状态",
  365. "data": {"affected_rows": affected_rows},
  366. }
  367. )
  368. await self.create_cold_start_plan(
  369. platform=platform,
  370. plan_id=plan_id,
  371. crawl_method=crawl_method,
  372. )
  373. except Exception as e:
  374. await feishu_robot.bot(
  375. title="文章冷启动异常",
  376. detail={
  377. "crawl_method": crawl_method,
  378. "error": str(e),
  379. "function": "deal",
  380. "traceback": traceback.format_exc(),
  381. },
  382. )
  383. case "strategy_v2":
  384. await self.cold_start_by_category(
  385. category_list=category_list,
  386. platform=platform,
  387. strategy=strategy,
  388. )
  389. case "strategy_v3":
  390. await self.cold_start_by_category(
  391. category_list=category_list,
  392. platform=platform,
  393. strategy=strategy,
  394. )
  395. case _:
  396. raise Exception(f"error strategy {strategy}")