publishCategoryArticles.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. from pandas import DataFrame
  8. from applications import aiditApi, log, bot
  9. from config import apolloConfig
  10. apollo = apolloConfig()
  11. class CategoryColdStartTask(object):
  12. """
  13. 品类冷启动发布任务
  14. """
  15. PUBLISHED_STATUS = 2
  16. INIT_STATUS = 1
  17. BAD_STATUS = 0
  18. def __init__(self, db_client):
  19. """
  20. :param db_client:
  21. """
  22. self.db_client = db_client
  23. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  24. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  25. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  26. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  27. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  28. log(
  29. task="category_publish_task",
  30. function="__init__",
  31. message="数据库初始化连接完成,apollo配置获取完成",
  32. status="success",
  33. data={
  34. "category": self.category_map,
  35. "threshold": self.category_cold_start_threshold
  36. }
  37. )
  38. def get_articles_from_meta_table(self, category):
  39. """
  40. 从长文 meta 库中获取冷启文章
  41. :return:
  42. """
  43. sql = f"""
  44. SELECT
  45. article_id, out_account_id, article_index, title, link, read_cnt
  46. FROM
  47. crawler_meta_article
  48. WHERE
  49. category = "{category}" and status = '{self.INIT_STATUS}';
  50. """
  51. article_list = self.db_client.select(sql)
  52. log(
  53. task="category_publish_task",
  54. function="get_articles_from_meta_table",
  55. message="获取品类文章总数",
  56. status="success",
  57. data={
  58. "total_articles": len(article_list),
  59. "category": category
  60. }
  61. )
  62. article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt'])
  63. return article_df
  64. def change_article_status(self, category):
  65. """
  66. 已经发布到生成计划中的 id,
  67. :return:
  68. """
  69. plan_id = self.category_map.get(category)
  70. if plan_id:
  71. sql = f"""
  72. SELECT
  73. account.wx_gh,
  74. content.title,
  75. content.content_link,
  76. content.view_count,
  77. content.like_count,
  78. from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
  79. from_unixtime(content.publish_timestamp / 1000) AS 发布时间
  80. FROM crawler_plan_result_rel cprr
  81. JOIN crawler_plan plan ON cprr.plan_id = plan.id
  82. JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
  83. JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
  84. WHERE plan_id IN (
  85. SELECT
  86. input_source_value
  87. FROM
  88. produce_plan_input_source
  89. WHERE plan_id = '{plan_id}'
  90. );
  91. """
  92. article_list = self.db_client.select(sql)
  93. title_list = [i[1] for i in article_list]
  94. if title_list:
  95. # update
  96. update_sql = f"""
  97. UPDATE
  98. crawler_meta_article
  99. SET
  100. status = %s
  101. WHERE
  102. title in %s and status = %s;
  103. """
  104. self.db_client.update(
  105. sql=update_sql,
  106. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  107. )
  108. else:
  109. return
  110. def change_article_status_while_publishing(self, article_id_list):
  111. """
  112. :param: article_id_list: 文章的唯一 id
  113. :return:
  114. """
  115. update_sql = f"""
  116. UPDATE
  117. crawler_meta_article
  118. SET
  119. status = %s
  120. WHERE
  121. article_id in %s and status = %s;
  122. """
  123. affect_rows = self.db_client.update(
  124. sql=update_sql,
  125. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  126. )
  127. if affect_rows != len(article_id_list):
  128. bot(
  129. title="品类冷启任务中,出现更新状文章状态失败异常",
  130. detail={
  131. "affected_rows": affect_rows,
  132. "task_rows": len(article_id_list)
  133. }
  134. )
  135. def publish_filter_articles(self, category, articles_df):
  136. """
  137. 过滤文章
  138. :param category:
  139. :param articles_df:
  140. :return:
  141. """
  142. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  143. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  144. filter_df = articles_df[
  145. (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
  146. & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
  147. & (articles_df['title'].str.len() > self.LIMIT_TITLE_LENGTH)
  148. & (~articles_df['title'].str.contains('农历'))
  149. & (~articles_df['title'].str.contains('太极'))
  150. & (~articles_df['title'].str.contains('节'))
  151. & (~articles_df['title'].str.contains('早上好'))
  152. & (~articles_df['title'].str.contains('赖清德'))
  153. & (~articles_df['title'].str.contains('普京'))
  154. & (~articles_df['title'].str.contains('俄'))
  155. & (~articles_df['title'].str.contains('南海'))
  156. & (~articles_df['title'].str.contains('台海'))
  157. & (~articles_df['title'].str.contains('解放军'))
  158. & (~articles_df['title'].str.contains('蔡英文'))
  159. & (~articles_df['title'].str.contains('中国'))
  160. ]
  161. url_list = filter_df['link'].values.tolist()
  162. log(
  163. task="category_publish_task",
  164. function="publish_filter_articles",
  165. message="过滤后文章总数",
  166. status="success",
  167. data={
  168. "total_articles": len(url_list),
  169. "category": category
  170. }
  171. )
  172. if url_list:
  173. crawler_plan_response = aiditApi.auto_create_crawler_task(
  174. plan_id=None,
  175. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  176. plan_tag="品类冷启动",
  177. url_list=url_list
  178. )
  179. log(
  180. task="category_publish_task",
  181. function="publish_filter_articles",
  182. message="成功创建抓取计划",
  183. status="success",
  184. data=crawler_plan_response
  185. )
  186. # auto bind to generate plan
  187. new_crawler_task_list = [
  188. {
  189. "contentType": 1,
  190. "inputSourceType": 2,
  191. "inputSourceSubType": None,
  192. "fieldName": None,
  193. "inputSourceValue": crawler_plan_response['data']['id'],
  194. "inputSourceLabel": crawler_plan_response['data']['name'],
  195. "inputSourceModal": 3,
  196. "inputSourceChannel": 5
  197. }
  198. ]
  199. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  200. crawler_task_list=new_crawler_task_list,
  201. generate_task_id=self.category_map[category]
  202. )
  203. log(
  204. task="category_publish_task",
  205. function="publish_filter_articles",
  206. message="成功绑定到生成计划",
  207. status="success",
  208. data=generate_plan_response
  209. )
  210. article_id_list = filter_df['article_id'].values.tolist()
  211. self.change_article_status_while_publishing(article_id_list=article_id_list)
  212. def do_job(self, category_list=None):
  213. """
  214. 执行任务
  215. :return:
  216. """
  217. if not category_list:
  218. category_list = self.category_map.keys()
  219. log(
  220. task="category_publish_task",
  221. function="do_job",
  222. message="开始自动创建品类文章抓取计划",
  223. data={
  224. "category_list": list(category_list)
  225. }
  226. )
  227. for category in category_list:
  228. try:
  229. category_df = self.get_articles_from_meta_table(category=category)
  230. self.publish_filter_articles(
  231. category=category,
  232. articles_df=category_df
  233. )
  234. except Exception as e:
  235. bot(
  236. title="品类冷启任务报错",
  237. detail={
  238. "category": category,
  239. "error": str(e),
  240. "function": "do_job"
  241. }
  242. )