article_pool_cold_start.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. from __future__ import annotations
  2. import time
  3. import asyncio
  4. import datetime
  5. import traceback
  6. from typing import List
  7. from pandas import DataFrame
  8. from tqdm.asyncio import tqdm
  9. from applications.api import task_apollo, feishu_robot
  10. from applications.api import auto_create_crawler_task
  11. from applications.api import auto_bind_crawler_task_to_generate_task
  12. from applications.config import cold_start_category_map, input_source_map
  13. from applications.utils import get_titles_from_produce_plan
  14. from applications.tasks.cold_start_tasks.article_pool import (
  15. ArticlePoolColdStartStrategy,
  16. ArticlePoolFilterStrategy
  17. )
  18. class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
  19. def __init__(self, pool, log_client, trace_id):
  20. super().__init__(pool, log_client, trace_id)
  21. async def get_article_from_meta_table(
  22. self, platform: str, crawl_method: str, strategy: str, category: str | None
  23. ) -> DataFrame:
  24. """
  25. @param platform: 文章抓取平台
  26. @param crawl_method: 文章抓取模式
  27. @param strategy: 供给策略
  28. """
  29. match platform:
  30. case "weixin":
  31. article_list = await self.get_weixin_cold_start_articles(
  32. crawl_method, strategy, category
  33. )
  34. case "toutiao":
  35. article_list = await self.get_toutiao_cold_start_articles(
  36. crawl_method, strategy, category
  37. )
  38. case _:
  39. raise ValueError("Invalid platform")
  40. return DataFrame(
  41. article_list,
  42. columns=[
  43. "article_id",
  44. "title",
  45. "link",
  46. "llm_sensitivity",
  47. "score",
  48. "category_by_ai",
  49. ],
  50. )
  51. async def filter_published_titles(self, plan_id):
  52. """
  53. 过滤已添加至aigc中的标题
  54. """
  55. published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
  56. if not published_title_tuple:
  57. return 0
  58. update_query = """
  59. update crawler_meta_article set status = %s where title in %s and status = %s;
  60. """
  61. changed_rows = await self.pool.async_save(
  62. query=update_query,
  63. params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
  64. )
  65. return changed_rows
  66. async def insert_crawler_plan_into_database(
  67. self, crawler_plan_id, crawler_plan_name, create_timestamp
  68. ):
  69. query = """
  70. insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
  71. values (%s, %s, %s)
  72. """
  73. try:
  74. await self.pool.async_save(
  75. query=query,
  76. params=(crawler_plan_id, crawler_plan_name, create_timestamp),
  77. )
  78. except Exception as e:
  79. await feishu_robot.bot(
  80. title="品类冷启任务,记录抓取计划id失败",
  81. detail={
  82. "error": str(e),
  83. "error_msg": traceback.format_exc(),
  84. "crawler_plan_id": crawler_plan_id,
  85. "crawler_plan_name": crawler_plan_name,
  86. },
  87. )
  88. async def change_article_status_while_publishing(self, article_id_list):
  89. """
  90. :param: article_id_list: 文章的唯一 id
  91. :return:
  92. """
  93. query = """
  94. update crawler_meta_article
  95. set status = %s
  96. where article_id in %s and status = %s;
  97. """
  98. affect_rows = await self.pool.async_save(
  99. query=query,
  100. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
  101. )
  102. return affect_rows
  103. async def create_crawler_plan_and_bind_to_produce_plan(
  104. self,
  105. strategy: str,
  106. crawl_method: str,
  107. category: str,
  108. platform: str,
  109. url_list: List[str],
  110. plan_id: str,
  111. ):
  112. # create_crawler_plan
  113. crawler_plan_response = await auto_create_crawler_task(
  114. plan_id=None,
  115. plan_name=f"冷启动-{strategy}-{category}-{datetime.date.today().__str__()}-{len(url_list)}",
  116. plan_tag="品类冷启动",
  117. platform=platform,
  118. url_list=url_list,
  119. )
  120. # save to db
  121. create_timestamp = int(time.time()) * 1000
  122. crawler_plan_id = crawler_plan_response["data"]["id"]
  123. crawler_plan_name = crawler_plan_response["data"]["name"]
  124. await self.insert_crawler_plan_into_database(
  125. crawler_plan_id, crawler_plan_name, create_timestamp
  126. )
  127. # auto bind to generate plan
  128. new_crawler_task_list = [
  129. {
  130. "contentType": 1,
  131. "inputSourceType": 2,
  132. "inputSourceSubType": None,
  133. "fieldName": None,
  134. "inputSourceValue": crawler_plan_id,
  135. "inputSourceLabel": crawler_plan_name,
  136. "inputSourceModal": 3,
  137. "inputSourceChannel": input_source_map[platform],
  138. }
  139. ]
  140. generate_plan_response = await auto_bind_crawler_task_to_generate_task(
  141. crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
  142. )
  143. await self.log_client.log(
  144. contents={
  145. "task": "article_pool_cold_start",
  146. "platform": platform,
  147. "crawl_method": crawl_method,
  148. "status": "success",
  149. "trace_id": self.trace_id,
  150. "message": "绑定至生成计划成功",
  151. "data": generate_plan_response,
  152. }
  153. )
  154. async def create_cold_start_plan(
  155. self,
  156. platform,
  157. plan_id,
  158. strategy="strategy_v1",
  159. category=None,
  160. crawl_method=None,
  161. ):
  162. # get article data_frame from meta article
  163. article_dataframe = await self.get_article_from_meta_table(
  164. platform, crawl_method, strategy, category
  165. )
  166. await self.log_client.log(
  167. contents={
  168. "task": "article_pool_cold_start",
  169. "platform": platform,
  170. "crawl_method": crawl_method,
  171. "status": "success",
  172. "trace_id": self.trace_id,
  173. "message": "获取文章成功",
  174. "data": {"article_length": article_dataframe.shape[0]},
  175. }
  176. )
  177. filter_article_df = await self.article_pool_filter(
  178. strategy, platform, article_dataframe, crawl_method, category
  179. )
  180. match strategy:
  181. case "strategy_v1":
  182. # split article into each category
  183. category_list = await task_apollo.get_config_value(key="category_list")
  184. for ai_category in category_list:
  185. filter_category_df = filter_article_df[
  186. filter_article_df["category_by_ai"] == ai_category
  187. ]
  188. url_list = filter_category_df["link"].values.tolist()
  189. if url_list:
  190. await self.create_crawler_plan_and_bind_to_produce_plan(
  191. strategy, crawl_method, ai_category, platform, url_list, plan_id
  192. )
  193. # change article status
  194. article_id_list = filter_category_df["article_id"].values.tolist()
  195. await self.change_article_status_while_publishing(
  196. article_id_list=article_id_list
  197. )
  198. case "strategy_v2":
  199. url_list = filter_article_df["link"].values.tolist()
  200. await self.create_crawler_plan_and_bind_to_produce_plan(
  201. strategy, crawl_method, category, platform, url_list, plan_id
  202. )
  203. # change article status
  204. article_id_list = filter_article_df["article_id"].values.tolist()
  205. await self.change_article_status_while_publishing(
  206. article_id_list=article_id_list
  207. )
  208. async def deal(
  209. self,
  210. platform: str,
  211. strategy="strategy_v1",
  212. crawl_methods=None,
  213. category_list=None,
  214. ) -> None:
  215. """execute cold start task in different strategy"""
  216. match strategy:
  217. case "strategy_v1":
  218. if not crawl_methods:
  219. crawl_methods = self.DEFAULT_CRAWLER_METHODS
  220. await self.log_client.log(
  221. contents={
  222. "task": "article_pool_cold_start",
  223. "platform": platform,
  224. "crawl_methods": crawl_methods,
  225. "status": "success",
  226. "trace_id": self.trace_id,
  227. }
  228. )
  229. crawl_methods_map = await task_apollo.get_config_value(
  230. key="category_cold_start_map"
  231. )
  232. for crawl_method in crawl_methods:
  233. try:
  234. plan_id = crawl_methods_map[crawl_method]
  235. affected_rows = await self.filter_published_titles(plan_id)
  236. await self.log_client.log(
  237. contents={
  238. "task": "article_pool_cold_start",
  239. "platform": platform,
  240. "crawl_method": crawl_method,
  241. "status": "success",
  242. "trace_id": self.trace_id,
  243. "message": "通过已抓取标题修改文章状态",
  244. "data": {"affected_rows": affected_rows},
  245. }
  246. )
  247. await self.create_cold_start_plan(
  248. platform=platform,
  249. plan_id=plan_id,
  250. crawl_method=crawl_method,
  251. )
  252. except Exception as e:
  253. await feishu_robot.bot(
  254. title="文章冷启动异常",
  255. detail={
  256. "crawl_method": crawl_method,
  257. "error": str(e),
  258. "function": "deal",
  259. "traceback": traceback.format_exc(),
  260. },
  261. )
  262. case "strategy_v2":
  263. if not category_list:
  264. category_list = list(cold_start_category_map.keys())
  265. for category in tqdm(category_list):
  266. try:
  267. plan_id = cold_start_category_map[category]
  268. affected_rows = await self.filter_published_titles(plan_id)
  269. await self.log_client.log(
  270. contents={
  271. "task": "article_pool_cold_start",
  272. "platform": platform,
  273. "category": category,
  274. "status": "success",
  275. "trace_id": self.trace_id,
  276. "message": "通过已抓取标题修改文章状态",
  277. "data": {"affected_rows": affected_rows},
  278. }
  279. )
  280. await self.create_cold_start_plan(
  281. platform=platform,
  282. strategy=strategy,
  283. plan_id=plan_id,
  284. category=category,
  285. )
  286. await asyncio.sleep(120)
  287. except Exception as e:
  288. await feishu_robot.bot(
  289. title="文章冷启动异常",
  290. detail={
  291. "category": category,
  292. "strategy": strategy,
  293. "error": str(e),
  294. "function": "deal",
  295. "traceback": traceback.format_exc(),
  296. },
  297. )
  298. if self.cold_start_records:
  299. columns = [
  300. feishu_robot.create_feishu_columns_sheet(
  301. sheet_type="plain_text",
  302. sheet_name="category",
  303. display_name="文章品类",
  304. ),
  305. feishu_robot.create_feishu_columns_sheet(
  306. sheet_type="number",
  307. sheet_name="cold_start_num",
  308. display_name="本次冷启数量",
  309. ),
  310. feishu_robot.create_feishu_columns_sheet(
  311. sheet_type="number",
  312. sheet_name="total_length",
  313. display_name="总文章剩余数量",
  314. ),
  315. feishu_robot.create_feishu_columns_sheet(
  316. sheet_type="number",
  317. sheet_name="filter_by_title_length",
  318. display_name="标题长度过滤",
  319. ),
  320. feishu_robot.create_feishu_columns_sheet(
  321. sheet_type="number",
  322. sheet_name="filter_by_sensitivity",
  323. display_name="敏感词过滤",
  324. ),
  325. feishu_robot.create_feishu_columns_sheet(
  326. sheet_type="number",
  327. sheet_name="filter_by_llm_sensitity",
  328. display_name="经过大模型判断敏感过滤",
  329. ),
  330. feishu_robot.create_feishu_columns_sheet(
  331. sheet_type="number",
  332. sheet_name="filter_by_score",
  333. display_name="经过相关性分过滤",
  334. ),
  335. feishu_robot.create_feishu_columns_sheet(
  336. sheet_type="number",
  337. sheet_name="read_avg_threshold",
  338. display_name="阅读均值倍数阈值",
  339. ),
  340. feishu_robot.create_feishu_columns_sheet(
  341. sheet_type="number",
  342. sheet_name="read_threshold",
  343. display_name="阅读量阈值",
  344. ),
  345. feishu_robot.create_feishu_columns_sheet(
  346. sheet_type="number",
  347. sheet_name="title_length_threshold",
  348. display_name="标题长度阈值",
  349. ),
  350. ]
  351. await feishu_robot.bot(
  352. title="长文文章路冷启动发布",
  353. detail={
  354. "columns": columns,
  355. "rows": self.cold_start_records,
  356. },
  357. table=True,
  358. mention=False,
  359. )
  360. case _:
  361. raise Exception(f"error strategy {strategy}")