crawler_pipeline.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from typing import Dict, Tuple
  2. from pydantic import BaseModel
  3. from app.infra.utils import CrawlerMetaArticle
  4. from app.infra.utils import CrawlerMetaAccount
  5. from app.core.dependency import apollo_client
  6. class CrawlerPipeline:
  7. MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = {
  8. "article": (CrawlerMetaArticle, "crawler_meta_article"),
  9. "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"),
  10. # 如后续有新类型,直接在这里加即可
  11. }
  12. def __init__(self, pool, log_client):
  13. self.pool = pool
  14. self.log_client = log_client
  15. @staticmethod
  16. async def whether_title_sensitive(title: str) -> bool:
  17. sensitive_word_list = await apollo_client.get_config_value("sensitive_word_list")
  18. for word in sensitive_word_list:
  19. if word in title:
  20. return True
  21. return False
  22. async def whether_article_title_duplicate(self, title: str) -> bool:
  23. query = f"""select article_id from crawler_meta_article where title = %s;"""
  24. duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
  25. return True if duplicated_id else False
  26. async def whether_account_exist(self, account_id: str, media_type: str) -> bool:
  27. query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;"
  28. duplicated_id = await self.pool.async_fetch(
  29. query=query, params=(account_id, media_type)
  30. )
  31. return True if duplicated_id else False
  32. async def save_single_record(self, media_type: str, item: dict) -> None:
  33. try:
  34. model_cls, table_name = self.MODEL_TABLE_MAP[media_type]
  35. except KeyError:
  36. raise ValueError(f"Unknown media type: {media_type!r}")
  37. record = model_cls(**item).model_dump(mode="python")
  38. insert_data = {k: v for k, v in record.items() if v is not None}
  39. if not insert_data:
  40. raise ValueError("All fields are None, nothing to insert")
  41. columns = ", ".join(f"`{col}`" for col in insert_data)
  42. placeholders = ", ".join(["%s"] * len(insert_data))
  43. sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
  44. await self.pool.async_save(sql, tuple(insert_data.values()))
  45. async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
  46. """deal function"""
  47. item["trace_id"] = trace_id
  48. match media_type:
  49. case "video":
  50. await self.save_single_record(media_type, item)
  51. case "article":
  52. log_data = {
  53. "title": item["title"],
  54. "platform": item["platform"],
  55. "mode": item["mode"],
  56. "source": item["category"],
  57. }
  58. await self.log_client.log(
  59. contents={
  60. "task": "save_article",
  61. "data": log_data,
  62. "message": "start crawler article",
  63. "code": 1001,
  64. }
  65. )
  66. if await self.whether_article_title_duplicate(log_data["title"]):
  67. await self.log_client.log(
  68. contents={
  69. "task": "save_article",
  70. "data": log_data,
  71. "message": "duplicate article title",
  72. "code": 1002,
  73. }
  74. )
  75. return
  76. if await self.whether_title_sensitive(item["title"]):
  77. await self.log_client.log(
  78. contents={
  79. "task": "save_article",
  80. "data": log_data,
  81. "message": "title_sensitive",
  82. "code": 1003,
  83. }
  84. )
  85. item["title_sensitive"] = 1
  86. await self.save_single_record(media_type, item)
  87. await self.log_client.log(
  88. contents={
  89. "trace_id": trace_id,
  90. "function": "save_article",
  91. "data": item,
  92. "message": "save article successfully",
  93. }
  94. )
  95. case "account":
  96. if await self.whether_account_exist(
  97. item["account_id"], item["media_type"]
  98. ):
  99. return
  100. await self.save_single_record(media_type, item)
  101. await self.log_client.log(
  102. contents={
  103. "trace_id": trace_id,
  104. "function": "save_account",
  105. "data": item,
  106. "message": "save account successfully",
  107. }
  108. )
  109. case _:
  110. raise Exception("Unknown media type")