crawler_pipeline.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import time
  2. from applications.api import AsyncApolloApi
  3. from applications.utils import CrawlerMetaArticle
  4. class CrawlerPipeline(AsyncApolloApi):
  5. def __init__(self, pool, log_client):
  6. super().__init__()
  7. self.pool = pool
  8. self.log_client = log_client
  9. async def whether_title_sensitive(self, title: str) -> bool:
  10. sensitive_word_list = await self.get_config_value("sensitive_word_list")
  11. for word in sensitive_word_list:
  12. if word in title:
  13. return True
  14. return False
  15. async def whether_article_title_duplicate(self, title: str) -> bool:
  16. query = f"""select article_id from crawler_meta_article where title = %s;"""
  17. duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
  18. return True if duplicated_id else False
  19. async def save_article(self, article_item: dict) -> None:
  20. """save articles into database"""
  21. query = f"""
  22. insert into crawler_meta_article
  23. (platform, mode, category, out_account_id, article_index, title, link,
  24. read_cnt, like_cnt, description, publish_time, crawler_time, score, status,
  25. unique_index, source_article_title, source_account, title_sensitivity)
  26. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s);
  27. """
  28. await self.pool.async_save(
  29. query=query,
  30. params=(
  31. article_item.get("platform", "weixin"),
  32. article_item.get("mode"),
  33. article_item.get("category"),
  34. article_item.get("out_account_id"),
  35. article_item.get("article_index"),
  36. article_item.get("title"),
  37. article_item.get("link"),
  38. article_item.get("read_cnt", 0),
  39. article_item.get("like_cnt", 0),
  40. article_item.get("description"),
  41. article_item.get("publish_time"),
  42. article_item.get("crawler_time", int(time.time())),
  43. article_item.get("score"),
  44. article_item.get("status", 1),
  45. article_item.get("unique_index"),
  46. article_item.get("source_article_title", None),
  47. article_item.get("source_account", None),
  48. article_item.get("title_sensitivity", 0),
  49. ),
  50. )
  51. async def save_article_v2(self, article_item: dict) -> None:
  52. """save articles into database"""
  53. new_article = CrawlerMetaArticle(**article_item)
  54. new_article_dict = new_article.model_dump()
  55. insert_template = (
  56. """insert into crawler_meta_article ({columns}) values ({values});"""
  57. )
  58. insert_data = {k: v for k, v in new_article_dict.items() if v is not None}
  59. columns = ", ".join(insert_data.keys())
  60. values = ", ".join([f"%s" for i in range(len(insert_data))])
  61. query = insert_template.format(columns=columns, values=values)
  62. await self.pool.async_save(
  63. query=query,
  64. params=tuple(list(insert_data.values())),
  65. )
  66. async def save_video(self, video_item: dict) -> str:
  67. pass
  68. async def save_item_to_database(self, media_type: str, item: dict):
  69. """deal function"""
  70. match media_type:
  71. case "video":
  72. await self.save_video(item)
  73. case "article":
  74. log_data = {
  75. "title": item["title"],
  76. "platform": item["platform"],
  77. "mode": item["mode"],
  78. "source": item["category"],
  79. }
  80. await self.log_client.log(
  81. contents={
  82. "task": "save_article",
  83. "data": log_data,
  84. "message": "start crawler article",
  85. "code": 1001,
  86. }
  87. )
  88. # 判断文章标题是否已经存在
  89. if await self.whether_article_title_duplicate(log_data["title"]):
  90. await self.log_client.log(
  91. contents={
  92. "task": "save_article",
  93. "data": log_data,
  94. "message": "duplicate article title",
  95. "code": 1002,
  96. }
  97. )
  98. return
  99. # 判断标题是否敏感
  100. if await self.whether_title_sensitive(item["title"]):
  101. await self.log_client.log(
  102. contents={
  103. "task": "save_article",
  104. "data": log_data,
  105. "message": "title_sensitive",
  106. "code": 1003,
  107. }
  108. )
  109. item["title_sensitive"] = 1
  110. # save article
  111. await self.save_article_v2(item)
  112. case _:
  113. raise Exception("Unknown media type")