piaoquan_crawler_mapper.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from typing import List, Dict, Set
  2. from app.core.database import DatabaseManager
  3. class PiaoquanCrawlerDatabaseMapper:
  4. # 兜底修改发文时间戳, 使用相同群发的msgID
  5. @staticmethod
  6. async def fallback_mechanism_by_msg_id(pool: DatabaseManager) -> int:
  7. # 通过msgId 来修改publish_timestamp
  8. query = """
  9. UPDATE official_articles_v2 oav
  10. JOIN (
  11. SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp
  12. FROM official_articles_v2
  13. WHERE publish_timestamp > %s
  14. GROUP by ghId, appMsgId
  15. ) vv
  16. ON oav.appMsgId = vv.appMsgId AND oav.ghId = vv.ghId
  17. SET oav.publish_timestamp = vv.publish_timestamp
  18. WHERE oav.publish_timestamp <= %s;
  19. """
  20. return await pool.async_save(
  21. query=query, params=(0, 0), db_name="piaoquan_crawler"
  22. )
  23. # 兜底修改发文时间戳,使用 update_time
  24. @staticmethod
  25. async def fallback_mechanism_by_update_time(pool: DatabaseManager) -> int:
  26. query = """
  27. UPDATE official_articles_v2
  28. SET publish_timestamp = updateTime
  29. WHERE publish_timestamp < %s;
  30. """
  31. return await pool.async_save(
  32. query=query, params=(0,), db_name="piaoquan_crawler"
  33. )
  34. # 获取账号的历史发文
  35. @staticmethod
  36. async def get_published_articles(pool: DatabaseManager, gh_id: str) -> Set[str]:
  37. query = """
  38. SELECT title FROM official_articles_v2 WHERE ghId = %s AND ItemIndex = %s;
  39. """
  40. response = await pool.async_fetch(
  41. query=query, db_name="piaoquan_crawler", params=(gh_id, 1)
  42. )
  43. return set([i["title"] for i in response])