_mapper.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. from app.core.database import DatabaseManager
  2. from ._const import I2IRecommendDataSyncConst
  3. class I2IRecommendDataSyncMapper(I2IRecommendDataSyncConst):
  4. def __init__(self, pool: DatabaseManager):
  5. self.pool = pool
  6. async def fetch_all_accounts(self):
  7. """查询有待激活数据的账号"""
  8. query = """
  9. SELECT gh_id FROM i2i_recommend WHERE status = %s GROUP BY gh_id;
  10. """
  11. return await self.pool.async_fetch(
  12. query=query,
  13. params=(self.VersionStatus.INIT,),
  14. )
  15. async def fetch_max_version_for_account(self, gh_id: str):
  16. """
  17. 查询单个账号的最大version及其status
  18. 返回: {max_version, status} or None
  19. """
  20. query = """
  21. SELECT version AS max_version, status
  22. FROM i2i_recommend
  23. WHERE gh_id = %s
  24. ORDER BY version DESC
  25. LIMIT 1;
  26. """
  27. rows = await self.pool.async_fetch(query=query, params=(gh_id,))
  28. return rows[0] if rows else None
  29. async def activate_new_version(self, gh_id: str, version: str):
  30. """将该账号最新version的数据 status 0 -> 1"""
  31. query = """
  32. UPDATE i2i_recommend
  33. SET status = %s
  34. WHERE gh_id = %s AND version = %s AND status = %s;
  35. """
  36. return await self.pool.async_save(
  37. query=query,
  38. params=(
  39. self.VersionStatus.ONLINE,
  40. gh_id,
  41. version,
  42. self.VersionStatus.INIT,
  43. ),
  44. )
  45. async def deactivate_old_versions(self, gh_id: str, latest_version: str):
  46. """将该账号旧version的数据 status 1 -> 2(归档)"""
  47. query = """
  48. UPDATE i2i_recommend
  49. SET status = %s
  50. WHERE gh_id = %s AND version != %s AND status = %s;
  51. """
  52. return await self.pool.async_save(
  53. query=query,
  54. params=(
  55. self.VersionStatus.ARCHIVED,
  56. gh_id,
  57. latest_version,
  58. self.VersionStatus.ONLINE,
  59. ),
  60. )
  61. __all__ = ["I2IRecommendDataSyncMapper"]