crawler_pipeline.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from typing import Dict, Tuple
  2. from pydantic import BaseModel
  3. from app.infra.external import AsyncApolloApi
  4. from .schemas import CrawlerMetaArticle, CrawlerMetaAccount
  5. class CrawlerPipeline:
  6. MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = {
  7. "article": (CrawlerMetaArticle, "crawler_meta_article"),
  8. "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"),
  9. # 如后续有新类型,直接在这里加即可
  10. }
  11. def __init__(self, pool, log_client, config):
  12. self.pool = pool
  13. self.log_client = log_client
  14. self.apollo_client = AsyncApolloApi(apollo_config=config, app_id=None, env=None)
  15. async def whether_title_sensitive(self, title: str) -> bool:
  16. sensitive_word_list = await self.apollo_client.get_config_value(
  17. "sensitive_word_list"
  18. )
  19. for word in sensitive_word_list:
  20. if word in title:
  21. return True
  22. return False
  23. async def whether_article_title_duplicate(self, title: str) -> bool:
  24. query = f"""select article_id from crawler_meta_article where title = %s;"""
  25. duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
  26. return True if duplicated_id else False
  27. async def whether_account_exist(self, account_id: str, media_type: str) -> bool:
  28. query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;"
  29. duplicated_id = await self.pool.async_fetch(
  30. query=query, params=(account_id, media_type)
  31. )
  32. return True if duplicated_id else False
  33. async def save_single_record(self, media_type: str, item: dict) -> None:
  34. try:
  35. model_cls, table_name = self.MODEL_TABLE_MAP[media_type]
  36. except KeyError:
  37. raise ValueError(f"Unknown media type: {media_type!r}")
  38. record = model_cls(**item).model_dump(mode="python")
  39. insert_data = {k: v for k, v in record.items() if v is not None}
  40. if not insert_data:
  41. raise ValueError("All fields are None, nothing to insert")
  42. columns = ", ".join(f"`{col}`" for col in insert_data)
  43. placeholders = ", ".join(["%s"] * len(insert_data))
  44. sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
  45. await self.pool.async_save(sql, tuple(insert_data.values()))
  46. async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
  47. """deal function"""
  48. item["trace_id"] = trace_id
  49. match media_type:
  50. case "video":
  51. await self.save_single_record(media_type, item)
  52. case "article":
  53. log_data = {
  54. "title": item["title"],
  55. "platform": item["platform"],
  56. "mode": item["mode"],
  57. "source": item["category"],
  58. }
  59. await self.log_client.log(
  60. contents={
  61. "task": "save_article",
  62. "data": log_data,
  63. "message": "start crawler article",
  64. "code": 1001,
  65. }
  66. )
  67. if await self.whether_article_title_duplicate(log_data["title"]):
  68. await self.log_client.log(
  69. contents={
  70. "task": "save_article",
  71. "data": log_data,
  72. "message": "duplicate article title",
  73. "code": 1002,
  74. }
  75. )
  76. return
  77. if await self.whether_title_sensitive(item["title"]):
  78. await self.log_client.log(
  79. contents={
  80. "task": "save_article",
  81. "data": log_data,
  82. "message": "title_sensitive",
  83. "code": 1003,
  84. }
  85. )
  86. item["title_sensitive"] = 1
  87. await self.save_single_record(media_type, item)
  88. await self.log_client.log(
  89. contents={
  90. "trace_id": trace_id,
  91. "function": "save_article",
  92. "data": item,
  93. "message": "save article successfully",
  94. }
  95. )
  96. case "account":
  97. if await self.whether_account_exist(
  98. item["account_id"], item["media_type"]
  99. ):
  100. return
  101. await self.save_single_record(media_type, item)
  102. await self.log_client.log(
  103. contents={
  104. "trace_id": trace_id,
  105. "function": "save_account",
  106. "data": item,
  107. "message": "save account successfully",
  108. }
  109. )
  110. case _:
  111. raise Exception("Unknown media type")