aigc_mapper.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from typing import List, Dict
  2. from app.core.database import DatabaseManager
  3. class AigcDatabaseMapper:
  4. # 从 aigc 数据库查询 channel_content_id && channel 信息
  5. @staticmethod
  6. async def fetch_channel_info(pool: DatabaseManager, content_id: str) -> List[Dict]:
  7. query = """
  8. SELECT t1.channel_content_id, t2.channel
  9. FROM produce_plan_exe_record t1 JOIN crawler_content t2 ON t1.channel_content_id = t2.channel_content_id
  10. WHERE plan_exe_id = %s;
  11. """
  12. return await pool.async_fetch(query=query, db_name="aigc", params=(content_id,))
  13. # 从 aigc 如何查询文章封面信息
  14. @staticmethod
  15. async def fetch_aigc_cover(
  16. pool: DatabaseManager, channel_content_id: str
  17. ) -> List[Dict]:
  18. """
  19. use channel_content_id to find article cover
  20. """
  21. COVER_TYPE = 2
  22. query = """
  23. SELECT image_url, oss_object_key
  24. FROM crawler_content_image
  25. WHERE channel_content_id = %s AND image_type = %s;
  26. """
  27. return await pool.async_fetch(
  28. query=query, db_name="aigc", params=(channel_content_id, COVER_TYPE)
  29. )
  30. # 从 AIGC 数据查询Daily发文账号信息
  31. @staticmethod
  32. async def get_daily_publish_accounts(pool: DatabaseManager) -> List[Dict]:
  33. query = """
  34. select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
  35. t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
  36. group_concat(distinct t5.remark) as account_remark
  37. from
  38. publish_plan t1
  39. join publish_plan_account t2 on t1.id = t2.plan_id
  40. join publish_account t3 on t2.account_id = t3.id
  41. left join publish_account_wx_type t4 on t3.id = t4.account_id
  42. left join publish_account_remark t5 on t3.id = t5.publish_account_id
  43. where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
  44. group by t3.id;
  45. """
  46. account_list = await pool.async_fetch(query, db_name="aigc")
  47. return [i for i in account_list if "自动回复" not in str(i["account_remark"])]