publishCategoryArticles.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. from pandas import DataFrame
  8. from applications import aiditApi, log, bot
  9. from config import apolloConfig
  10. apollo = apolloConfig()
  11. class CategoryColdStartTask(object):
  12. """
  13. 品类冷启动发布任务
  14. """
  15. PUBLISHED_STATUS = 2
  16. INIT_STATUS = 1
  17. BAD_STATUS = 0
  18. def __init__(self, db_client):
  19. """
  20. :param db_client:
  21. """
  22. self.db_client = db_client
  23. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  24. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  25. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  26. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  27. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  28. log(
  29. task="category_publish_task",
  30. function="__init__",
  31. message="数据库初始化连接完成,apollo配置获取完成",
  32. data={
  33. "category": self.category_map,
  34. "threshold": self.category_cold_start_threshold
  35. }
  36. )
  37. def get_articles_from_meta_table(self, category):
  38. """
  39. 从长文 meta 库中获取冷启文章
  40. :return:
  41. """
  42. sql = f"""
  43. SELECT
  44. article_id, out_account_id, article_index, title, link, read_cnt
  45. FROM
  46. crawler_meta_article
  47. WHERE
  48. category = "{category}" and status = '{self.INIT_STATUS}';
  49. """
  50. article_list = self.db_client.select(sql)
  51. log(
  52. task="category_publish_task",
  53. function="get_articles_from_meta_table",
  54. message="获取品类文章总数",
  55. data={
  56. "total_articles": len(article_list),
  57. "category": category
  58. }
  59. )
  60. article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt'])
  61. return article_df
  62. def change_article_status(self, category):
  63. """
  64. 已经发布到生成计划中的 id,
  65. :return:
  66. """
  67. plan_id = self.category_map.get(category)
  68. if plan_id:
  69. article_list = aiditApi.get_generated_article_list(plan_id)
  70. title_list = [i[1] for i in article_list]
  71. if title_list:
  72. # update
  73. update_sql = f"""
  74. UPDATE
  75. crawler_meta_article
  76. SET
  77. status = %s
  78. WHERE
  79. title in %s and status = %s;
  80. """
  81. self.db_client.update(
  82. sql=update_sql,
  83. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  84. )
  85. else:
  86. return
  87. def change_article_status_while_publishing(self, article_id_list):
  88. """
  89. :param: article_id_list: 文章的唯一 id
  90. :return:
  91. """
  92. update_sql = f"""
  93. UPDATE
  94. crawler_meta_article
  95. SET
  96. status = %s
  97. WHERE
  98. article_id in %s and status = %s;
  99. """
  100. affect_rows = self.db_client.update(
  101. sql=update_sql,
  102. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  103. )
  104. if affect_rows != len(article_id_list):
  105. bot(
  106. title="品类冷启任务中,出现更新状文章状态失败异常",
  107. detail={
  108. "affected_rows": affect_rows,
  109. "task_rows": len(article_id_list)
  110. }
  111. )
  112. def publish_filter_articles(self, category, articles_df):
  113. """
  114. 过滤文章
  115. :param category:
  116. :param articles_df:
  117. :return:
  118. """
  119. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  120. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  121. total_length = articles_df.shape[0]
  122. # 第一层漏斗通过阅读均值倍数过滤
  123. first_level_funnel_df = articles_df[articles_df['read_times'] >= self.READ_TIMES_THRESHOLD]
  124. first_level_funnel_length = first_level_funnel_df.shape[0]
  125. # 第二层漏斗通过阅读量过滤
  126. second_level_funnel_df = first_level_funnel_df[
  127. first_level_funnel_df['read_cnt'] >= self.READ_THRESHOLD
  128. ]
  129. second_level_funnel_length = second_level_funnel_df.shape[0]
  130. # 第三层漏斗通过标题长度过滤
  131. third_level_funnel_df = second_level_funnel_df[
  132. second_level_funnel_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH
  133. ]
  134. third_level_funnel_length = third_level_funnel_df.shape[0]
  135. # 最后一层通过敏感词过滤
  136. filter_df = third_level_funnel_df[
  137. (~third_level_funnel_df['title'].str.contains('农历'))
  138. & (~third_level_funnel_df['title'].str.contains('太极'))
  139. & (~third_level_funnel_df['title'].str.contains('节'))
  140. & (~third_level_funnel_df['title'].str.contains('早上好'))
  141. & (~third_level_funnel_df['title'].str.contains('赖清德'))
  142. & (~third_level_funnel_df['title'].str.contains('普京'))
  143. & (~third_level_funnel_df['title'].str.contains('俄'))
  144. & (~third_level_funnel_df['title'].str.contains('南海'))
  145. & (~third_level_funnel_df['title'].str.contains('台海'))
  146. & (~third_level_funnel_df['title'].str.contains('解放军'))
  147. & (~third_level_funnel_df['title'].str.contains('蔡英文'))
  148. & (~third_level_funnel_df['title'].str.contains('中国'))
  149. ]
  150. final_length = filter_df.shape[0]
  151. url_list = filter_df['link'].values.tolist()
  152. log(
  153. task="category_publish_task",
  154. function="publish_filter_articles",
  155. message="过滤后文章总数",
  156. data={
  157. "total_articles": final_length,
  158. "category": category
  159. }
  160. )
  161. bot(
  162. title="冷启任务发布通知",
  163. detail={
  164. "总文章数量": total_length,
  165. "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(total_length - first_level_funnel_length, first_level_funnel_length),
  166. "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(first_level_funnel_length - second_level_funnel_length, second_level_funnel_length),
  167. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(second_level_funnel_length - third_level_funnel_length, third_level_funnel_length),
  168. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(third_level_funnel_length - final_length, final_length),
  169. "品类": category,
  170. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  171. "阅读量阈值": self.READ_THRESHOLD,
  172. "标题长度阈值": self.LIMIT_TITLE_LENGTH
  173. },
  174. mention=False
  175. )
  176. if url_list:
  177. crawler_plan_response = aiditApi.auto_create_crawler_task(
  178. plan_id=None,
  179. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  180. plan_tag="品类冷启动",
  181. url_list=url_list
  182. )
  183. log(
  184. task="category_publish_task",
  185. function="publish_filter_articles",
  186. message="成功创建抓取计划",
  187. data=crawler_plan_response
  188. )
  189. # auto bind to generate plan
  190. new_crawler_task_list = [
  191. {
  192. "contentType": 1,
  193. "inputSourceType": 2,
  194. "inputSourceSubType": None,
  195. "fieldName": None,
  196. "inputSourceValue": crawler_plan_response['data']['id'],
  197. "inputSourceLabel": crawler_plan_response['data']['name'],
  198. "inputSourceModal": 3,
  199. "inputSourceChannel": 5
  200. }
  201. ]
  202. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  203. crawler_task_list=new_crawler_task_list,
  204. generate_task_id=self.category_map[category]
  205. )
  206. log(
  207. task="category_publish_task",
  208. function="publish_filter_articles",
  209. message="成功绑定到生成计划",
  210. data=generate_plan_response
  211. )
  212. article_id_list = filter_df['article_id'].values.tolist()
  213. self.change_article_status_while_publishing(article_id_list=article_id_list)
  214. def do_job(self, category_list=None):
  215. """
  216. 执行任务
  217. :return:
  218. """
  219. if not category_list:
  220. category_list = self.category_map.keys()
  221. log(
  222. task="category_publish_task",
  223. function="do_job",
  224. message="开始自动创建品类文章抓取计划",
  225. data={
  226. "category_list": list(category_list)
  227. }
  228. )
  229. for category in category_list:
  230. try:
  231. category_df = self.get_articles_from_meta_table(category=category)
  232. self.publish_filter_articles(
  233. category=category,
  234. articles_df=category_df
  235. )
  236. except Exception as e:
  237. bot(
  238. title="品类冷启任务报错",
  239. detail={
  240. "category": category,
  241. "error": str(e),
  242. "function": "do_job"
  243. }
  244. )