publishCategoryArticles.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. import time
  8. import traceback
  9. from pandas import DataFrame
  10. from applications import aiditApi, log, bot, llm_sensitivity
  11. from config import apolloConfig
  12. apollo = apolloConfig()
  13. DAILY_CRAWLER_MAX_NUM = 100
  14. SIMILARITY_MIN_SCORE = 0.4
  15. TITLE_NOT_SENSITIVE = 0
  16. class CategoryColdStartTask(object):
  17. """
  18. 品类冷启动发布任务
  19. """
  20. PUBLISHED_STATUS = 2
  21. INIT_STATUS = 1
  22. BAD_STATUS = 0
  23. def __init__(self, db_client):
  24. """
  25. :param db_client:
  26. """
  27. self.db_client = db_client
  28. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  29. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  30. self.article_category_list = json.loads(apollo.getConfigValue("category_list"))
  31. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  32. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  33. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  34. self.TITLE_LENGTH_MAX = self.category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
  35. log(
  36. task="category_publish_task",
  37. function="__init__",
  38. message="数据库初始化连接完成,apollo配置获取完成",
  39. data={
  40. "category": self.category_map,
  41. "threshold": self.category_cold_start_threshold
  42. }
  43. )
  44. def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp):
  45. """
  46. 插入抓取计划到数据库中
  47. :param create_timestamp:
  48. :param crawler_plan_id:
  49. :param crawler_plan_name:
  50. :return:
  51. """
  52. insert_sql = f"""
  53. INSERT INTO article_crawler_plan
  54. (crawler_plan_id, name, create_timestamp)
  55. values
  56. (%s, %s, %s)
  57. """
  58. try:
  59. self.db_client.update(
  60. sql=insert_sql,
  61. params=(crawler_plan_id, crawler_plan_name, create_timestamp)
  62. )
  63. except Exception as e:
  64. bot(
  65. title="品类冷启任务,记录抓取计划id失败",
  66. detail={
  67. "error": str(e),
  68. "error_msg": traceback.format_exc(),
  69. "crawler_plan_id": crawler_plan_id,
  70. "crawler_plan_name": crawler_plan_name
  71. }
  72. )
  73. def get_articles_from_meta_table(self, category, article_source):
  74. """
  75. 从长文 meta 库中获取冷启文章
  76. :return:
  77. """
  78. sql = f"""
  79. select
  80. article_id, title, link, llm_sensitivity, score, category_by_ai
  81. from crawler_meta_article t1
  82. join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
  83. where category = '{category}'
  84. and platform = '{article_source}'
  85. and title_sensitivity = {TITLE_NOT_SENSITIVE}
  86. and t1.status = {self.INIT_STATUS}
  87. and t1.read_cnt / t2.read_avg >= {self.READ_TIMES_THRESHOLD}
  88. and t1.read_cnt >= {self.READ_THRESHOLD}
  89. ORDER BY score DESC;
  90. """
  91. article_list = self.db_client.select(sql)
  92. log(
  93. task="category_publish_task",
  94. function="get_articles_from_meta_table",
  95. message="获取品类文章总数",
  96. data={
  97. "total_articles": len(article_list),
  98. "category": category
  99. }
  100. )
  101. article_df = DataFrame(article_list,
  102. columns=['article_id', 'title', 'link', 'llm_sensitivity', 'score', 'category_by_ai'])
  103. return article_df
  104. def filter_each_category(self, category):
  105. """
  106. 过滤单个生成计划类别的文章
  107. """
  108. plan_id = self.category_map.get(category)
  109. if plan_id:
  110. article_list = aiditApi.get_generated_article_list(plan_id)
  111. title_list = [i[1] for i in article_list]
  112. if title_list:
  113. # update
  114. update_sql = f"""
  115. update crawler_meta_article
  116. set status = %s
  117. where title in %s and status = %s;
  118. """
  119. affected_rows = self.db_client.update(
  120. sql=update_sql,
  121. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  122. )
  123. print(affected_rows)
  124. else:
  125. print("未获取到计划id")
  126. return
  127. def published_articles_title_filter(self):
  128. """
  129. 已经发布到生成计划中的 id,
  130. :return:
  131. """
  132. category_list = list(self.category_map.keys())
  133. for category in category_list:
  134. try:
  135. self.filter_each_category(category)
  136. except Exception as e:
  137. log(
  138. task="category_publish_task",
  139. function="published_articles_title_filter",
  140. message="过滤已发布文章失败",
  141. data={
  142. "error": str(e),
  143. "error_msg": traceback.format_exc(),
  144. "category": category
  145. }
  146. )
  147. def change_article_status_while_publishing(self, article_id_list):
  148. """
  149. :param: article_id_list: 文章的唯一 id
  150. :return:
  151. """
  152. update_sql = f"""
  153. update crawler_meta_article
  154. set status = %s
  155. where article_id in %s and status = %s;
  156. """
  157. affect_rows = self.db_client.update(
  158. sql=update_sql,
  159. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  160. )
  161. # if affect_rows != len(article_id_list):
  162. # bot(
  163. # title="品类冷启任务中,出现更新状文章状态失败异常",
  164. # detail={
  165. # "affected_rows": affect_rows,
  166. # "task_rows": len(article_id_list)
  167. # }
  168. # )
  169. def filter_weixin_articles(self, articles_df, category):
  170. """
  171. 微信抓取文章过滤漏斗
  172. """
  173. total_length = articles_df.shape[0]
  174. # 第1层漏斗通过标题长度过滤
  175. filter_df = articles_df[
  176. (articles_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)
  177. & (articles_df['title'].str.len() <= self.TITLE_LENGTH_MAX)
  178. ]
  179. length_level1 = filter_df.shape[0]
  180. # 第2层通过敏感词过滤
  181. filter_df = filter_df[
  182. (~filter_df['title'].str.contains('农历'))
  183. & (~filter_df['title'].str.contains('太极'))
  184. & (~filter_df['title'].str.contains('节'))
  185. & (~filter_df['title'].str.contains('早上好'))
  186. & (~filter_df['title'].str.contains('赖清德'))
  187. & (~filter_df['title'].str.contains('普京'))
  188. & (~filter_df['title'].str.contains('俄'))
  189. & (~filter_df['title'].str.contains('南海'))
  190. & (~filter_df['title'].str.contains('台海'))
  191. & (~filter_df['title'].str.contains('解放军'))
  192. & (~filter_df['title'].str.contains('蔡英文'))
  193. & (~filter_df['title'].str.contains('中国'))
  194. ]
  195. length_level2 = filter_df.shape[0]
  196. # 第3层通过LLM敏感度过滤
  197. filter_df = filter_df[
  198. ~(filter_df['llm_sensitivity'] > 0)
  199. ]
  200. length_level3 = filter_df.shape[0]
  201. # 第4层通过相关性分数过滤
  202. filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]
  203. length_level4 = filter_df.shape[0]
  204. log(
  205. task="category_publish_task",
  206. function="publish_filter_articles",
  207. message="过滤后文章总数",
  208. data={
  209. "total_articles": length_level4,
  210. "category": category
  211. }
  212. )
  213. bot(
  214. title="冷启任务发布通知",
  215. detail={
  216. "总文章数量": total_length,
  217. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
  218. total_length - length_level1, length_level1),
  219. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
  220. length_level1 - length_level2, length_level2),
  221. "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
  222. length_level2 - length_level3, length_level3
  223. ),
  224. "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
  225. length_level3 - length_level4, length_level4
  226. ),
  227. "品类": category,
  228. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  229. "阅读量阈值": self.READ_THRESHOLD,
  230. "标题长度阈值": self.LIMIT_TITLE_LENGTH
  231. },
  232. mention=False
  233. )
  234. return filter_df[:DAILY_CRAWLER_MAX_NUM]
  235. def filter_toutiao_articles(self, articles_df, category):
  236. """
  237. 头条文章过滤漏斗
  238. """
  239. total_length = articles_df.shape[0]
  240. # 第一层漏斗通过状态过滤
  241. zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  242. zero_level_funnel_length = zero_level_funnel_df.shape[0]
  243. bot(
  244. title="账号冷启动---头条推荐流发布",
  245. detail={
  246. "category": category,
  247. "总文章数量": total_length,
  248. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length,
  249. zero_level_funnel_length),
  250. },
  251. mention=False
  252. )
  253. return zero_level_funnel_df
  254. def update_article_sensitive_status(self, article_id, status):
  255. """
  256. 更新文章敏感状态
  257. :return:
  258. """
  259. update_sql = f"""
  260. update crawler_meta_article
  261. set llm_sensitivity = %s
  262. where article_id = %s;
  263. """
  264. self.db_client.update(sql=update_sql, params=(status, article_id))
  265. def publish_filter_articles(self, category, articles_df, article_source):
  266. """
  267. 过滤文章
  268. :param category: 文章品类
  269. :param articles_df: 该品类下的文章data_frame
  270. :param article_source: 文章来源
  271. :return:
  272. """
  273. match article_source:
  274. case "weixin":
  275. filtered_articles_df = self.filter_weixin_articles(articles_df, category)
  276. input_source_channel = 5
  277. case "toutiao":
  278. filtered_articles_df = self.filter_toutiao_articles(articles_df, category)
  279. input_source_channel = 6
  280. case _:
  281. return
  282. success_titles = filtered_articles_df['title'].values.tolist()
  283. article_id_list = filtered_articles_df['article_id'].values.tolist()
  284. if success_titles:
  285. try:
  286. sensitive_results = llm_sensitivity.check_titles(success_titles)
  287. for article_id, sensitive_result in zip(article_id_list, sensitive_results):
  288. self.update_article_sensitive_status(
  289. article_id=article_id,
  290. status=sensitive_result['hit_rule']
  291. )
  292. if sensitive_result['hit_rule'] > TITLE_NOT_SENSITIVE:
  293. filtered_articles_df = filtered_articles_df[filtered_articles_df['article_id'] != article_id]
  294. except Exception as e:
  295. print("failed to update sensitive status: {}".format(e))
  296. # split into different category
  297. for ai_category in self.article_category_list:
  298. filter_category_df = filtered_articles_df[filtered_articles_df['category_by_ai'] == ai_category]
  299. url_list = filter_category_df['link'].values.tolist()
  300. if url_list:
  301. # create_crawler_plan
  302. crawler_plan_response = aiditApi.auto_create_crawler_task(
  303. plan_id=None,
  304. plan_name="自动绑定-{}-{}-{}--{}".format(category, ai_category,datetime.date.today().__str__(), len(url_list)),
  305. plan_tag="品类冷启动",
  306. article_source=article_source,
  307. url_list=url_list
  308. )
  309. log(
  310. task="category_publish_task",
  311. function="publish_filter_articles",
  312. message="成功创建抓取计划",
  313. data=crawler_plan_response
  314. )
  315. # save to db
  316. create_timestamp = int(time.time()) * 1000
  317. crawler_plan_id = crawler_plan_response['data']['id']
  318. crawler_plan_name = crawler_plan_response['data']['name']
  319. self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
  320. # auto bind to generate plan
  321. new_crawler_task_list = [
  322. {
  323. "contentType": 1,
  324. "inputSourceType": 2,
  325. "inputSourceSubType": None,
  326. "fieldName": None,
  327. "inputSourceValue": crawler_plan_id,
  328. "inputSourceLabel": crawler_plan_name,
  329. "inputSourceModal": 3,
  330. "inputSourceChannel": input_source_channel
  331. }
  332. ]
  333. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  334. crawler_task_list=new_crawler_task_list,
  335. generate_task_id=self.category_map[category]
  336. )
  337. log(
  338. task="category_publish_task",
  339. function="publish_filter_articles",
  340. message="成功绑定到生成计划",
  341. data=generate_plan_response
  342. )
  343. # change article status
  344. article_id_list = filter_category_df['article_id'].values.tolist()
  345. self.change_article_status_while_publishing(article_id_list=article_id_list)
  346. def do_job(self, article_source, category_list=None):
  347. """
  348. 执行任务
  349. :return:
  350. """
  351. if not category_list:
  352. category_list = self.category_map.keys()
  353. log(
  354. task="category_publish_task",
  355. function="do_job",
  356. message="开始自动创建品类文章抓取计划",
  357. data={
  358. "category_list": list(category_list)
  359. }
  360. )
  361. for category in category_list:
  362. try:
  363. # 已发布标题去重
  364. self.published_articles_title_filter()
  365. category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)
  366. self.publish_filter_articles(
  367. category=category,
  368. articles_df=category_df,
  369. article_source=article_source
  370. )
  371. except Exception as e:
  372. bot(
  373. title="品类冷启任务报错",
  374. detail={
  375. "category": category,
  376. "error": str(e),
  377. "function": "do_job",
  378. "traceback": traceback.format_exc()
  379. }
  380. )