aigc_system.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import json
  2. from typing import Optional, Dict, List, TypedDict
  3. from datetime import datetime, timedelta
  4. from app.infra.shared import AsyncHttpClient
  5. HEADERS = {
  6. "Accept": "application/json",
  7. "Accept-Language": "zh,zh-CN;q=0.9",
  8. "Content-Type": "application/json",
  9. "Origin": "http://admin.cybertogether.net",
  10. "Proxy-Connection": "keep-alive",
  11. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
  12. }
  13. PERSON_COOKIE = {
  14. "token": "af54cdc404c3464d896745df389b2dce",
  15. "appType": 9,
  16. "platform": "pc",
  17. "appVersionCode": 1000,
  18. "clientTimestamp": 1,
  19. "fid": 1,
  20. "loginUid": 1,
  21. "pageSource": 1,
  22. "requestId": 1,
  23. "rid": 1,
  24. "uid": 1,
  25. }
  26. class RelationDict(TypedDict):
  27. videoPoolTraceId: str
  28. channelContentId: str
  29. platform: str
  30. async def delete_illegal_gzh_articles(gh_id: str, title: str):
  31. """
  32. Delete illegal gzh articles
  33. :param gh_id: gzh id
  34. :param title: article title
  35. """
  36. url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
  37. payload = {
  38. "title": title,
  39. "ghId": gh_id,
  40. }
  41. headers = {"Content-Type": "application/json;charset=UTF-8"}
  42. async with AsyncHttpClient(timeout=600) as client:
  43. res = await client.post(url=url, headers=headers, json=payload)
  44. return res
  45. async def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list, platform):
  46. """
  47. Create crawler task
  48. """
  49. match platform:
  50. case "weixin":
  51. channel = 5
  52. case "toutiao":
  53. channel = 6
  54. case _:
  55. raise RuntimeError(f"Unsupported platform: {platform}")
  56. url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
  57. payload = {
  58. "params": {
  59. "contentFilters": [],
  60. "accountFilters": [],
  61. "filterAccountMatchMode": 1,
  62. "filterContentMatchMode": 1,
  63. "selectModeValues": [],
  64. "searchModeValues": [],
  65. "contentModal": 3,
  66. "analyze": {},
  67. "crawlerComment": 0,
  68. "inputGroup": None,
  69. "inputSourceGroups": [],
  70. "modePublishTime": [],
  71. "planType": 2,
  72. "frequencyType": 2,
  73. "planTag": plan_tag,
  74. "tagPenetrateFlag": 0,
  75. "id": plan_id,
  76. "name": plan_name,
  77. "channel": channel,
  78. "crawlerMode": 5,
  79. "inputModeValues": url_list,
  80. "modePublishTimeStart": None,
  81. "modePublishTimeEnd": None,
  82. "executeRate": None,
  83. "executeDate": None,
  84. "executeWindowStart": None,
  85. "executeWindowEnd": None,
  86. "executeTimeInterval": None,
  87. "executeNum": None,
  88. "addModal": None,
  89. "addChannel": None,
  90. "fileUpload": None,
  91. "prompt": None,
  92. "acelFlag": None,
  93. "tasks": [],
  94. },
  95. "baseInfo": PERSON_COOKIE,
  96. }
  97. async with AsyncHttpClient(timeout=600) as client:
  98. res = await client.post(url=url, headers=HEADERS, json=payload)
  99. return res
  100. async def add_to_crawler_task():
  101. pass
  102. async def get_crawler_task_detail():
  103. pass
  104. async def auto_bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id):
  105. url = "http://aigc-api.cybertogether.net/aigc/produce/plan/save"
  106. plan_info = await get_generate_task_detail(generate_task_id)
  107. input_source_groups = plan_info.get("inputSourceGroups")
  108. existed_crawler_task = input_source_groups[0].get("inputSources")
  109. new_task_list = existed_crawler_task + crawler_task_list
  110. input_source_group_0 = input_source_groups[0]
  111. input_source_group_0["inputSources"] = new_task_list
  112. payload = json.dumps(
  113. {
  114. "params": {
  115. "contentFilters": [],
  116. "produceModal": plan_info.get("produceModal"),
  117. "inputModal": plan_info.get("inputModal"),
  118. "tasks": plan_info.get("tasks", []),
  119. "modules": [],
  120. "moduleGroups": plan_info.get("moduleGroups"),
  121. "inputSourceGroups": [input_source_group_0],
  122. "layoutType": plan_info.get("layoutType"),
  123. "activeManualReview": plan_info.get("activeManualReview"),
  124. "totalProduceNum": plan_info.get("totalProduceNum"),
  125. "dailyProduceNum": plan_info.get("dailyProduceNum"),
  126. "maxConcurrentNum": plan_info.get("maxConcurrentNum"),
  127. "id": generate_task_id,
  128. "name": plan_info.get("name"),
  129. "planTag": plan_info.get("planTag"),
  130. "tagPenetrateFlag": plan_info.get("tagPenetrateFlag"),
  131. "inputType": plan_info.get("inputType"),
  132. "inputChannel": plan_info.get("inputChannel"),
  133. "activeManualReviewCount": plan_info.get("activeManualReviewCount"),
  134. "autoComposite": plan_info.get("autoComposite"),
  135. },
  136. "baseInfo": PERSON_COOKIE,
  137. }
  138. )
  139. async with AsyncHttpClient(timeout=600) as client:
  140. response = await client.post(url=url, headers=HEADERS, data=payload)
  141. return response
  142. async def get_generate_task_detail(generate_task_id):
  143. """
  144. 通过生成计划的 id,获取该生成计划已有的抓取计划 list
  145. :param generate_task_id:
  146. :return:
  147. """
  148. url = "http://aigc-api.cybertogether.net/aigc/produce/plan/detail"
  149. payload = json.dumps(
  150. {"params": {"id": generate_task_id}, "baseInfo": PERSON_COOKIE}
  151. )
  152. async with AsyncHttpClient(timeout=600) as client:
  153. res = await client.post(url=url, headers=HEADERS, data=payload)
  154. if res["msg"] == "success":
  155. return res["data"]
  156. else:
  157. return {}
  158. async def insert_crawler_relation_to_aigc_system(
  159. relation_list: List[RelationDict],
  160. ) -> Optional[Dict]:
  161. url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
  162. payload = json.dumps({"params": {"relations": relation_list}})
  163. async with AsyncHttpClient(timeout=60) as client:
  164. res = await client.post(url=url, headers=HEADERS, data=payload)
  165. return res
  166. async def get_titles_from_produce_plan(pool, plan_id, threshold=None):
  167. if not threshold:
  168. fifteen_days_ago = datetime.now() - timedelta(days=30)
  169. threshold = fifteen_days_ago.strftime("%Y%m%d") + 15 * "0"
  170. query = f"""
  171. select distinct t2.title
  172. from produce_plan_input_source t3
  173. join crawler_plan_result_rel t1 on t3.input_source_value = t1.plan_id
  174. join crawler_content t2 on t1.channel_source_id = t2.channel_content_id
  175. where t3.plan_id = %s
  176. and t3.input_source_value > %s;
  177. """
  178. response = await pool.async_fetch(
  179. query=query, db_name="aigc", params=(plan_id, threshold)
  180. )
  181. return tuple([i["title"] for i in response])