basic.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import datetime
  7. import pandas as pd
  8. import traceback
  9. from pandas import DataFrame
  10. from tqdm import tqdm
  11. from applications import log, aiditApi, bot
  12. from applications.const import ColdStartTaskConst
  13. from config import apolloConfig
  14. const = ColdStartTaskConst()
  15. config = apolloConfig()
  16. category_cold_start_threshold = json.loads(config.getConfigValue("category_cold_start_threshold"))
  17. READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  18. READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  19. LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  20. TITLE_LENGTH_MAX = category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
  21. def get_article_from_meta_table(db_client, category: str, platform: str) -> DataFrame:
  22. """
  23. get article from meta data
  24. :param db_client: database connector
  25. :param category: article category
  26. :param platform: article platform
  27. :return: article dataframe
  28. """
  29. sql = f"""
  30. select
  31. article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
  32. from crawler_meta_article
  33. where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE}
  34. order score desc;
  35. """
  36. article_list = db_client.fetch(sql)
  37. log(
  38. task="category_publish_task",
  39. function="get_articles_from_meta_table",
  40. message="获取品类文章总数",
  41. data={
  42. "total_articles": len(article_list),
  43. "category": category
  44. }
  45. )
  46. article_df = pd.DataFrame(article_list,
  47. columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
  48. 'llm_sensitivity', 'score'])
  49. return article_df
  50. def update_published_articles_status(db_client) -> None:
  51. """
  52. filter published articles
  53. """
  54. category_map = json.loads(config.getConfigValue("category_cold_start_map"))
  55. category_list = list(category_map.keys())
  56. processing_bar = tqdm(category_list, desc="fileter_published_articles")
  57. for category in processing_bar:
  58. plan_id = category_map.get(category)
  59. if plan_id:
  60. article_list = aiditApi.get_generated_article_list(plan_id)
  61. title_list = [i[1] for i in article_list]
  62. if title_list:
  63. update_sql = f"""
  64. update crawler_meta_article
  65. set status = %s
  66. where title in %s and status = %s;
  67. """
  68. affected_rows = db_client.save(
  69. sql=update_sql,
  70. params=(const.PUBLISHED_STATUS, tuple(title_list), const.INIT_STATUS)
  71. )
  72. processing_bar.set_postfix({"category": affected_rows})
  73. else:
  74. return
  75. def filter_by_read_times(article_df: DataFrame) -> DataFrame:
  76. """
  77. filter by read times
  78. """
  79. article_df['average_read'] = article_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  80. article_df['read_times'] = article_df['read_cnt'] / article_df['average_read']
  81. filter_df = article_df[article_df['read_times'] >= READ_TIMES_THRESHOLD]
  82. return filter_df
  83. def filter_by_status(article_df: DataFrame) -> DataFrame:
  84. """
  85. filter by status
  86. """
  87. filter_df = article_df[article_df['status'] == const.INIT_STATUS]
  88. return filter_df
  89. def filter_by_read_cnt(article_df: DataFrame) -> DataFrame:
  90. """
  91. filter by read cnt
  92. """
  93. filter_df = article_df[article_df['read_cnt'] >= READ_THRESHOLD]
  94. return filter_df
  95. def filter_by_title_length(article_df: DataFrame) -> DataFrame:
  96. """
  97. filter by title length
  98. """
  99. filter_df = article_df[
  100. (article_df['title'].str.len() >= LIMIT_TITLE_LENGTH)
  101. & (article_df['title'].str.len() <= TITLE_LENGTH_MAX)
  102. ]
  103. return filter_df
  104. def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame:
  105. """
  106. filter by sensitive words
  107. """
  108. filter_df = article_df[
  109. (~article_df['title'].str.contains('农历'))
  110. & (~article_df['title'].str.contains('太极'))
  111. & (~article_df['title'].str.contains('节'))
  112. & (~article_df['title'].str.contains('早上好'))
  113. & (~article_df['title'].str.contains('赖清德'))
  114. & (~article_df['title'].str.contains('普京'))
  115. & (~article_df['title'].str.contains('俄'))
  116. & (~article_df['title'].str.contains('南海'))
  117. & (~article_df['title'].str.contains('台海'))
  118. & (~article_df['title'].str.contains('解放军'))
  119. & (~article_df['title'].str.contains('蔡英文'))
  120. & (~article_df['title'].str.contains('中国'))
  121. ]
  122. return filter_df
  123. def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame:
  124. """
  125. filter by similarity score
  126. """
  127. filter_df = article_df[article_df['score'] >= score]
  128. return filter_df
  129. def insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_name, create_timestamp):
  130. """
  131. insert into article crawler plan
  132. """
  133. insert_sql = f"""
  134. INSERT INTO article_crawler_plan
  135. (crawler_plan_id, name, create_timestamp)
  136. values
  137. (%s, %s, %s)
  138. """
  139. try:
  140. db_client.save(
  141. query=insert_sql,
  142. params=(crawler_plan_id, crawler_plan_name, create_timestamp)
  143. )
  144. except Exception as e:
  145. bot(
  146. title="品类冷启任务,记录抓取计划id失败",
  147. detail={
  148. "error": str(e),
  149. "error_msg": traceback.format_exc(),
  150. "crawler_plan_id": crawler_plan_id,
  151. "crawler_plan_name": crawler_plan_name
  152. }
  153. )
  154. def create_crawler_plan(db_client, url_list, plan_tag, platform):
  155. """
  156. create crawler plan
  157. """
  158. crawler_plan_response = aiditApi.auto_create_crawler_task(
  159. plan_id=None,
  160. plan_name="自动绑定-文章联想--{}--{}".format(datetime.date.today().__str__(), len(url_list)),
  161. plan_tag=plan_tag,
  162. article_source=platform,
  163. url_list=url_list
  164. )
  165. log(
  166. task="category_publish_task",
  167. function="publish_filter_articles",
  168. message="成功创建抓取计划",
  169. data=crawler_plan_response
  170. )
  171. # save to db
  172. create_timestamp = int(time.time()) * 1000
  173. crawler_plan_id = crawler_plan_response['data']['id']
  174. crawler_plan_name = crawler_plan_response['data']['name']
  175. insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_name, create_timestamp)
  176. bind_to_generate_plan(crawler_plan_id, crawler_plan_name, )
  177. def bind_to_generate_plan(crawler_plan_id, crawler_plan_name, input_source_channel):
  178. """
  179. auto bind to generate plan
  180. """
  181. new_crawler_task_list = [
  182. {
  183. "contentType": 1,
  184. "inputSourceType": 2,
  185. "inputSourceSubType": None,
  186. "fieldName": None,
  187. "inputSourceValue": crawler_plan_id,
  188. "inputSourceLabel": crawler_plan_name,
  189. "inputSourceModal": 3,
  190. "inputSourceChannel": input_source_channel
  191. }
  192. ]
  193. category_map = json.loads(config.getConfigValue("category_cold_start_map"))
  194. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  195. crawler_task_list=new_crawler_task_list,
  196. generate_task_id=category_map[category]
  197. )
  198. log(
  199. task="category_publish_task",
  200. function="publish_filter_articles",
  201. message="成功绑定到生成计划",
  202. data=generate_plan_response
  203. )