crawler_pipeline.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import time
  2. from typing import Any, Dict, Tuple, Callable
  3. from pydantic import BaseModel
  4. from applications.api import AsyncApolloApi
  5. from applications.utils import CrawlerMetaArticle
  6. from applications.utils import CrawlerMetaAccount
  7. class CrawlerPipeline(AsyncApolloApi):
  8. MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = {
  9. "article": (CrawlerMetaArticle, "crawler_meta_article"),
  10. "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"),
  11. # 如后续有新类型,直接在这里加即可
  12. }
  13. def __init__(self, pool, log_client):
  14. super().__init__()
  15. self.pool = pool
  16. self.log_client = log_client
  17. async def whether_title_sensitive(self, title: str) -> bool:
  18. sensitive_word_list = await self.get_config_value("sensitive_word_list")
  19. for word in sensitive_word_list:
  20. if word in title:
  21. return True
  22. return False
  23. async def whether_article_title_duplicate(self, title: str) -> bool:
  24. query = f"""select article_id from crawler_meta_article where title = %s;"""
  25. duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
  26. return True if duplicated_id else False
  27. async def whether_account_exist(self, account_id: str, media_type: str) -> bool:
  28. query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;"
  29. duplicated_id = await self.pool.async_fetch(
  30. query=query, params=(account_id, media_type)
  31. )
  32. return True if duplicated_id else False
  33. async def save_single_record(self, media_type: str, item: dict) -> None:
  34. try:
  35. model_cls, table_name = self.MODEL_TABLE_MAP[media_type]
  36. except KeyError:
  37. raise ValueError(f"Unknown media type: {media_type!r}")
  38. record = model_cls(**item).model_dump(mode="python")
  39. insert_data = {k: v for k, v in record.items() if v is not None}
  40. if not insert_data:
  41. raise ValueError("All fields are None, nothing to insert")
  42. columns = ", ".join(f"`{col}`" for col in insert_data)
  43. placeholders = ", ".join(["%s"] * len(insert_data))
  44. sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
  45. await self.pool.async_save(sql, tuple(insert_data.values()))
  46. async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
  47. """deal function"""
  48. item["trace_id"] = trace_id
  49. match media_type:
  50. case "video":
  51. await self.save_single_record(media_type, item)
  52. case "article":
  53. log_data = {
  54. "title": item["title"],
  55. "platform": item["platform"],
  56. "mode": item["mode"],
  57. "source": item["category"],
  58. }
  59. await self.log_client.log(
  60. contents={
  61. "task": "save_article",
  62. "data": log_data,
  63. "message": "start crawler article",
  64. "code": 1001,
  65. }
  66. )
  67. if await self.whether_article_title_duplicate(log_data["title"]):
  68. await self.log_client.log(
  69. contents={
  70. "task": "save_article",
  71. "data": log_data,
  72. "message": "duplicate article title",
  73. "code": 1002,
  74. }
  75. )
  76. return
  77. if await self.whether_title_sensitive(item["title"]):
  78. await self.log_client.log(
  79. contents={
  80. "task": "save_article",
  81. "data": log_data,
  82. "message": "title_sensitive",
  83. "code": 1003,
  84. }
  85. )
  86. item["title_sensitive"] = 1
  87. await self.save_single_record(media_type, item)
  88. await self.log_client.log(
  89. contents={
  90. "trace_id": trace_id,
  91. "function": "save_article",
  92. "data": item,
  93. "message": "save article successfully",
  94. }
  95. )
  96. case "account":
  97. if await self.whether_account_exist(
  98. item["account_id"], item["media_type"]
  99. ):
  100. return
  101. await self.save_single_record(media_type, item)
  102. await self.log_client.log(
  103. contents={
  104. "trace_id": trace_id,
  105. "function": "save_account",
  106. "data": item,
  107. "message": "save account successfully",
  108. }
  109. )
  110. case _:
  111. raise Exception("Unknown media type")