publishCategoryArticles.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. from pandas import DataFrame
  8. from applications import aiditApi, log, bot
  9. from config import apolloConfig
  10. apollo = apolloConfig()
  11. class CategoryColdStartTask(object):
  12. """
  13. 品类冷启动发布任务
  14. """
  15. PUBLISHED_STATUS = 2
  16. INIT_STATUS = 1
  17. BAD_STATUS = 0
  18. def __init__(self, db_client):
  19. """
  20. :param db_client:
  21. """
  22. self.db_client = db_client
  23. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  24. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  25. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  26. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  27. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  28. log(
  29. task="category_publish_task",
  30. function="__init__",
  31. message="数据库初始化连接完成,apollo配置获取完成",
  32. data={
  33. "category": self.category_map,
  34. "threshold": self.category_cold_start_threshold
  35. }
  36. )
  37. def get_articles_from_meta_table(self, category):
  38. """
  39. 从长文 meta 库中获取冷启文章
  40. :return:
  41. """
  42. sql = f"""
  43. SELECT
  44. article_id, out_account_id, article_index, title, link, read_cnt, status
  45. FROM
  46. crawler_meta_article
  47. WHERE
  48. category = "{category}";
  49. """
  50. article_list = self.db_client.select(sql)
  51. log(
  52. task="category_publish_task",
  53. function="get_articles_from_meta_table",
  54. message="获取品类文章总数",
  55. data={
  56. "total_articles": len(article_list),
  57. "category": category
  58. }
  59. )
  60. article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status'])
  61. return article_df
  62. def change_article_status(self, category):
  63. """
  64. 已经发布到生成计划中的 id,
  65. :return:
  66. """
  67. plan_id = self.category_map.get(category)
  68. if plan_id:
  69. article_list = aiditApi.get_generated_article_list(plan_id)
  70. title_list = [i[1] for i in article_list]
  71. if title_list:
  72. # update
  73. update_sql = f"""
  74. UPDATE
  75. crawler_meta_article
  76. SET
  77. status = %s
  78. WHERE
  79. title in %s and status = %s;
  80. """
  81. self.db_client.update(
  82. sql=update_sql,
  83. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  84. )
  85. else:
  86. return
  87. def change_article_status_while_publishing(self, article_id_list):
  88. """
  89. :param: article_id_list: 文章的唯一 id
  90. :return:
  91. """
  92. update_sql = f"""
  93. UPDATE
  94. crawler_meta_article
  95. SET
  96. status = %s
  97. WHERE
  98. article_id in %s and status = %s;
  99. """
  100. affect_rows = self.db_client.update(
  101. sql=update_sql,
  102. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  103. )
  104. if affect_rows != len(article_id_list):
  105. bot(
  106. title="品类冷启任务中,出现更新状文章状态失败异常",
  107. detail={
  108. "affected_rows": affect_rows,
  109. "task_rows": len(article_id_list)
  110. }
  111. )
  112. def publish_filter_articles(self, category, articles_df):
  113. """
  114. 过滤文章
  115. :param category:
  116. :param articles_df:
  117. :return:
  118. """
  119. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  120. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  121. total_length = articles_df.shape[0]
  122. # 第0层过滤已经发布的文章
  123. zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  124. zero_level_funnel_length = zero_level_funnel_df.shape[0]
  125. # 第一层漏斗通过阅读均值倍数过滤
  126. first_level_funnel_df = zero_level_funnel_df[zero_level_funnel_df['read_times'] >= self.READ_TIMES_THRESHOLD]
  127. first_level_funnel_length = first_level_funnel_df.shape[0]
  128. # 第二层漏斗通过阅读量过滤
  129. second_level_funnel_df = first_level_funnel_df[
  130. first_level_funnel_df['read_cnt'] >= self.READ_THRESHOLD
  131. ]
  132. second_level_funnel_length = second_level_funnel_df.shape[0]
  133. # 第三层漏斗通过标题长度过滤
  134. third_level_funnel_df = second_level_funnel_df[
  135. second_level_funnel_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH
  136. ]
  137. third_level_funnel_length = third_level_funnel_df.shape[0]
  138. # 最后一层通过敏感词过滤
  139. filter_df = third_level_funnel_df[
  140. (~third_level_funnel_df['title'].str.contains('农历'))
  141. & (~third_level_funnel_df['title'].str.contains('太极'))
  142. & (~third_level_funnel_df['title'].str.contains('节'))
  143. & (~third_level_funnel_df['title'].str.contains('早上好'))
  144. & (~third_level_funnel_df['title'].str.contains('赖清德'))
  145. & (~third_level_funnel_df['title'].str.contains('普京'))
  146. & (~third_level_funnel_df['title'].str.contains('俄'))
  147. & (~third_level_funnel_df['title'].str.contains('南海'))
  148. & (~third_level_funnel_df['title'].str.contains('台海'))
  149. & (~third_level_funnel_df['title'].str.contains('解放军'))
  150. & (~third_level_funnel_df['title'].str.contains('蔡英文'))
  151. & (~third_level_funnel_df['title'].str.contains('中国'))
  152. ]
  153. final_length = filter_df.shape[0]
  154. url_list = filter_df['link'].values.tolist()
  155. log(
  156. task="category_publish_task",
  157. function="publish_filter_articles",
  158. message="过滤后文章总数",
  159. data={
  160. "total_articles": final_length,
  161. "category": category
  162. }
  163. )
  164. bot(
  165. title="冷启任务发布通知",
  166. detail={
  167. "总文章数量": total_length,
  168. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length, zero_level_funnel_length),
  169. "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(zero_level_funnel_length - first_level_funnel_length, first_level_funnel_length),
  170. "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(first_level_funnel_length - second_level_funnel_length, second_level_funnel_length),
  171. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(second_level_funnel_length - third_level_funnel_length, third_level_funnel_length),
  172. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(third_level_funnel_length - final_length, final_length),
  173. "品类": category,
  174. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  175. "阅读量阈值": self.READ_THRESHOLD,
  176. "标题长度阈值": self.LIMIT_TITLE_LENGTH
  177. },
  178. mention=False
  179. )
  180. if url_list:
  181. crawler_plan_response = aiditApi.auto_create_crawler_task(
  182. plan_id=None,
  183. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  184. plan_tag="品类冷启动",
  185. url_list=url_list
  186. )
  187. log(
  188. task="category_publish_task",
  189. function="publish_filter_articles",
  190. message="成功创建抓取计划",
  191. data=crawler_plan_response
  192. )
  193. # auto bind to generate plan
  194. new_crawler_task_list = [
  195. {
  196. "contentType": 1,
  197. "inputSourceType": 2,
  198. "inputSourceSubType": None,
  199. "fieldName": None,
  200. "inputSourceValue": crawler_plan_response['data']['id'],
  201. "inputSourceLabel": crawler_plan_response['data']['name'],
  202. "inputSourceModal": 3,
  203. "inputSourceChannel": 5
  204. }
  205. ]
  206. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  207. crawler_task_list=new_crawler_task_list,
  208. generate_task_id=self.category_map[category]
  209. )
  210. log(
  211. task="category_publish_task",
  212. function="publish_filter_articles",
  213. message="成功绑定到生成计划",
  214. data=generate_plan_response
  215. )
  216. article_id_list = filter_df['article_id'].values.tolist()
  217. self.change_article_status_while_publishing(article_id_list=article_id_list)
  218. def do_job(self, category_list=None):
  219. """
  220. 执行任务
  221. :return:
  222. """
  223. if not category_list:
  224. category_list = self.category_map.keys()
  225. log(
  226. task="category_publish_task",
  227. function="do_job",
  228. message="开始自动创建品类文章抓取计划",
  229. data={
  230. "category_list": list(category_list)
  231. }
  232. )
  233. for category in category_list:
  234. try:
  235. category_df = self.get_articles_from_meta_table(category=category)
  236. self.publish_filter_articles(
  237. category=category,
  238. articles_df=category_df
  239. )
  240. except Exception as e:
  241. bot(
  242. title="品类冷启任务报错",
  243. detail={
  244. "category": category,
  245. "error": str(e),
  246. "function": "do_job"
  247. }
  248. )