basic.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import datetime
  7. import pandas as pd
  8. import traceback
  9. from pandas import DataFrame
  10. from tqdm import tqdm
  11. from applications import log, aiditApi, bot
  12. from applications.const import ColdStartTaskConst
  13. from config import apolloConfig
  14. const = ColdStartTaskConst()
  15. config = apolloConfig()
  16. category_cold_start_threshold = json.loads(
  17. config.getConfigValue("category_cold_start_threshold")
  18. )
  19. READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  20. READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  21. LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  22. TITLE_LENGTH_MAX = category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
  23. def get_article_from_meta_table(db_client, category: str, platform: str) -> DataFrame:
  24. """
  25. get article from meta data
  26. :param db_client: database connector
  27. :param category: article category
  28. :param platform: article platform
  29. :return: article dataframe
  30. """
  31. sql = f"""
  32. select
  33. article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
  34. from crawler_meta_article
  35. where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE}
  36. order by score desc;
  37. """
  38. article_list = db_client.fetch(sql)
  39. log(
  40. task="category_publish_task",
  41. function="get_articles_from_meta_table",
  42. message="获取品类文章总数",
  43. data={"total_articles": len(article_list), "category": category},
  44. )
  45. article_df = pd.DataFrame(
  46. article_list,
  47. columns=[
  48. "article_id",
  49. "gh_id",
  50. "position",
  51. "title",
  52. "link",
  53. "read_cnt",
  54. "status",
  55. "llm_sensitivity",
  56. "score",
  57. ],
  58. )
  59. return article_df
  60. def update_published_articles_status(db_client) -> None:
  61. """
  62. filter published articles
  63. """
  64. category_map = json.loads(config.getConfigValue("category_cold_start_map"))
  65. category_list = list(category_map.keys())
  66. processing_bar = tqdm(category_list, desc="fileter_published_articles")
  67. for category in processing_bar:
  68. plan_id = category_map.get(category)
  69. if plan_id:
  70. article_list = aiditApi.get_generated_article_list(plan_id)
  71. title_list = [i[1] for i in article_list]
  72. if title_list:
  73. update_sql = f"""
  74. update crawler_meta_article
  75. set status = %s
  76. where title in %s and status = %s;
  77. """
  78. affected_rows = db_client.save(
  79. query=update_sql,
  80. params=(
  81. const.PUBLISHED_STATUS,
  82. tuple(title_list),
  83. const.INIT_STATUS,
  84. ),
  85. )
  86. processing_bar.set_postfix(
  87. {"category": category, "affected_rows": affected_rows}
  88. )
  89. else:
  90. return
  91. def filter_by_read_times(article_df: DataFrame) -> DataFrame:
  92. """
  93. filter by read times
  94. """
  95. article_df["average_read"] = article_df.groupby(["gh_id", "position"])[
  96. "read_cnt"
  97. ].transform("mean")
  98. article_df["read_times"] = article_df["read_cnt"] / article_df["average_read"]
  99. filter_df = article_df[article_df["read_times"] >= READ_TIMES_THRESHOLD]
  100. return filter_df
  101. def filter_by_status(article_df: DataFrame) -> DataFrame:
  102. """
  103. filter by status
  104. """
  105. filter_df = article_df[article_df["status"] == const.INIT_STATUS]
  106. return filter_df
  107. def filter_by_read_cnt(article_df: DataFrame) -> DataFrame:
  108. """
  109. filter by read cnt
  110. """
  111. filter_df = article_df[article_df["read_cnt"] >= READ_THRESHOLD]
  112. return filter_df
  113. def filter_by_title_length(article_df: DataFrame) -> DataFrame:
  114. """
  115. filter by title length
  116. """
  117. filter_df = article_df[
  118. (article_df["title"].str.len() >= LIMIT_TITLE_LENGTH)
  119. & (article_df["title"].str.len() <= TITLE_LENGTH_MAX)
  120. ]
  121. return filter_df
  122. def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame:
  123. """
  124. filter by sensitive words
  125. """
  126. filter_df = article_df[
  127. (~article_df["title"].str.contains("农历"))
  128. & (~article_df["title"].str.contains("太极"))
  129. & (~article_df["title"].str.contains("节"))
  130. & (~article_df["title"].str.contains("早上好"))
  131. & (~article_df["title"].str.contains("赖清德"))
  132. & (~article_df["title"].str.contains("普京"))
  133. & (~article_df["title"].str.contains("俄"))
  134. & (~article_df["title"].str.contains("南海"))
  135. & (~article_df["title"].str.contains("台海"))
  136. & (~article_df["title"].str.contains("解放军"))
  137. & (~article_df["title"].str.contains("蔡英文"))
  138. & (~article_df["title"].str.contains("中国"))
  139. ]
  140. return filter_df
  141. def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame:
  142. """
  143. filter by similarity score
  144. """
  145. filter_df = article_df[article_df["score"] >= score]
  146. return filter_df
  147. def insert_into_article_crawler_plan(
  148. db_client, crawler_plan_id, crawler_plan_name, create_timestamp
  149. ):
  150. """
  151. insert into article crawler plan
  152. """
  153. insert_sql = f"""
  154. insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
  155. values (%s, %s, %s);
  156. """
  157. try:
  158. db_client.save(
  159. query=insert_sql,
  160. params=(crawler_plan_id, crawler_plan_name, create_timestamp),
  161. )
  162. except Exception as e:
  163. bot(
  164. title="品类冷启任务,记录抓取计划id失败",
  165. detail={
  166. "error": str(e),
  167. "error_msg": traceback.format_exc(),
  168. "crawler_plan_id": crawler_plan_id,
  169. "crawler_plan_name": crawler_plan_name,
  170. },
  171. )
  172. def create_crawler_plan(url_list, plan_tag, platform) -> tuple:
  173. """
  174. create crawler plan
  175. """
  176. crawler_plan_response = aiditApi.auto_create_crawler_task(
  177. plan_id=None,
  178. plan_name="自动绑定-文章联想--{}--{}".format(
  179. datetime.date.today().__str__(), len(url_list)
  180. ),
  181. plan_tag=plan_tag,
  182. article_source=platform,
  183. url_list=url_list,
  184. )
  185. log(
  186. task="category_publish_task",
  187. function="publish_filter_articles",
  188. message="成功创建抓取计划",
  189. data=crawler_plan_response,
  190. )
  191. # save to db
  192. create_timestamp = int(time.time()) * 1000
  193. crawler_plan_id = crawler_plan_response["data"]["id"]
  194. crawler_plan_name = crawler_plan_response["data"]["name"]
  195. return crawler_plan_id, crawler_plan_name, create_timestamp
  196. def bind_to_generate_plan(category, crawler_plan_id, crawler_plan_name, platform):
  197. """
  198. auto bind to generate plan
  199. """
  200. match platform:
  201. case "weixin":
  202. input_source_channel = 5
  203. case "toutiao":
  204. input_source_channel = 6
  205. case _:
  206. input_source_channel = 5
  207. new_crawler_task_list = [
  208. {
  209. "contentType": 1,
  210. "inputSourceType": 2,
  211. "inputSourceSubType": None,
  212. "fieldName": None,
  213. "inputSourceValue": crawler_plan_id,
  214. "inputSourceLabel": crawler_plan_name,
  215. "inputSourceModal": 3,
  216. "inputSourceChannel": input_source_channel,
  217. }
  218. ]
  219. category_map = json.loads(config.getConfigValue("category_cold_start_map"))
  220. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  221. crawler_task_list=new_crawler_task_list, generate_task_id=category_map[category]
  222. )
  223. log(
  224. task="category_publish_task",
  225. function="publish_filter_articles",
  226. message="成功绑定到生成计划",
  227. data=generate_plan_response,
  228. )
  229. def update_article_status_after_publishing(db_client, article_id_list):
  230. """
  231. update article status after publishing
  232. """
  233. update_sql = f"""
  234. update crawler_meta_article
  235. set status = %s
  236. where article_id in %s and status = %s;
  237. """
  238. affect_rows = db_client.save(
  239. query=update_sql,
  240. params=(const.PUBLISHED_STATUS, tuple(article_id_list), const.INIT_STATUS),
  241. )
  242. if affect_rows != len(article_id_list):
  243. bot(
  244. title="品类冷启任务中,出现更新状文章状态失败异常",
  245. detail={"affected_rows": affect_rows, "task_rows": len(article_id_list)},
  246. )