aigc_system.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import json
  2. from typing import Optional, Dict, List, TypedDict
  3. from datetime import datetime, timedelta
  4. from app.infra.shared import AsyncHttpClient
  5. HEADERS = {
  6. "Accept": "application/json",
  7. "Accept-Language": "zh,zh-CN;q=0.9",
  8. "Content-Type": "application/json",
  9. "Origin": "http://admin.cybertogether.net",
  10. "Proxy-Connection": "keep-alive",
  11. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
  12. }
  13. PERSON_COOKIE = {
  14. "token": "af54cdc404c3464d896745df389b2dce",
  15. "appType": 9,
  16. "platform": "pc",
  17. "appVersionCode": 1000,
  18. "clientTimestamp": 1,
  19. "fid": 1,
  20. "loginUid": 1,
  21. "pageSource": 1,
  22. "requestId": 1,
  23. "rid": 1,
  24. "uid": 1,
  25. }
  26. class RelationDict(TypedDict):
  27. videoPoolTraceId: str
  28. channelContentId: str
  29. platform: str
  30. async def delete_illegal_gzh_articles(gh_id: str, title: str, delete_flag: int = 0):
  31. """
  32. Delete illegal gzh articles
  33. :param gh_id: gzh id
  34. :param title: article title
  35. :param delete_flag: 0: 违规检查删除; 1:html手动删除; 2: 文章限流删除
  36. """
  37. url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
  38. payload = {
  39. "title": title,
  40. "ghId": gh_id,
  41. "deleteFlag": delete_flag,
  42. }
  43. headers = {"Content-Type": "application/json;charset=UTF-8"}
  44. async with AsyncHttpClient(timeout=600) as client:
  45. res = await client.post(url=url, headers=headers, json=payload)
  46. return res
  47. async def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list, platform):
  48. """
  49. Create crawler task
  50. """
  51. match platform:
  52. case "weixin":
  53. channel = 5
  54. case "toutiao":
  55. channel = 6
  56. case _:
  57. raise RuntimeError(f"Unsupported platform: {platform}")
  58. url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
  59. payload = {
  60. "params": {
  61. "contentFilters": [],
  62. "accountFilters": [],
  63. "filterAccountMatchMode": 1,
  64. "filterContentMatchMode": 1,
  65. "selectModeValues": [],
  66. "searchModeValues": [],
  67. "contentModal": 3,
  68. "analyze": {},
  69. "crawlerComment": 0,
  70. "inputGroup": None,
  71. "inputSourceGroups": [],
  72. "modePublishTime": [],
  73. "planType": 2,
  74. "frequencyType": 2,
  75. "planTag": plan_tag,
  76. "tagPenetrateFlag": 0,
  77. "id": plan_id,
  78. "name": plan_name,
  79. "channel": channel,
  80. "crawlerMode": 5,
  81. "inputModeValues": url_list,
  82. "modePublishTimeStart": None,
  83. "modePublishTimeEnd": None,
  84. "executeRate": None,
  85. "executeDate": None,
  86. "executeWindowStart": None,
  87. "executeWindowEnd": None,
  88. "executeTimeInterval": None,
  89. "executeNum": None,
  90. "addModal": None,
  91. "addChannel": None,
  92. "fileUpload": None,
  93. "prompt": None,
  94. "acelFlag": None,
  95. "tasks": [],
  96. },
  97. "baseInfo": PERSON_COOKIE,
  98. }
  99. async with AsyncHttpClient(timeout=600) as client:
  100. res = await client.post(url=url, headers=HEADERS, json=payload)
  101. return res
  102. async def add_to_crawler_task():
  103. pass
  104. async def get_crawler_task_detail():
  105. pass
  106. async def auto_bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id):
  107. url = "http://aigc-api.cybertogether.net/aigc/produce/plan/save"
  108. plan_info = await get_generate_task_detail(generate_task_id)
  109. input_source_groups = plan_info.get("inputSourceGroups")
  110. existed_crawler_task = input_source_groups[0].get("inputSources")
  111. new_task_list = existed_crawler_task + crawler_task_list
  112. input_source_group_0 = input_source_groups[0]
  113. input_source_group_0["inputSources"] = new_task_list
  114. payload = json.dumps(
  115. {
  116. "params": {
  117. "contentFilters": [],
  118. "produceModal": plan_info.get("produceModal"),
  119. "inputModal": plan_info.get("inputModal"),
  120. "tasks": plan_info.get("tasks", []),
  121. "modules": [],
  122. "moduleGroups": plan_info.get("moduleGroups"),
  123. "inputSourceGroups": [input_source_group_0],
  124. "layoutType": plan_info.get("layoutType"),
  125. "activeManualReview": plan_info.get("activeManualReview"),
  126. "totalProduceNum": plan_info.get("totalProduceNum"),
  127. "dailyProduceNum": plan_info.get("dailyProduceNum"),
  128. "maxConcurrentNum": plan_info.get("maxConcurrentNum"),
  129. "id": generate_task_id,
  130. "name": plan_info.get("name"),
  131. "planTag": plan_info.get("planTag"),
  132. "tagPenetrateFlag": plan_info.get("tagPenetrateFlag"),
  133. "inputType": plan_info.get("inputType"),
  134. "inputChannel": plan_info.get("inputChannel"),
  135. "activeManualReviewCount": plan_info.get("activeManualReviewCount"),
  136. "autoComposite": plan_info.get("autoComposite"),
  137. },
  138. "baseInfo": PERSON_COOKIE,
  139. }
  140. )
  141. async with AsyncHttpClient(timeout=600) as client:
  142. response = await client.post(url=url, headers=HEADERS, data=payload)
  143. return response
  144. async def get_generate_task_detail(generate_task_id):
  145. """
  146. 通过生成计划的 id,获取该生成计划已有的抓取计划 list
  147. :param generate_task_id:
  148. :return:
  149. """
  150. url = "http://aigc-api.cybertogether.net/aigc/produce/plan/detail"
  151. payload = json.dumps(
  152. {"params": {"id": generate_task_id}, "baseInfo": PERSON_COOKIE}
  153. )
  154. async with AsyncHttpClient(timeout=600) as client:
  155. res = await client.post(url=url, headers=HEADERS, data=payload)
  156. if res["msg"] == "success":
  157. return res["data"]
  158. else:
  159. return {}
  160. async def insert_crawler_relation_to_aigc_system(
  161. relation_list: List[RelationDict],
  162. ) -> Optional[Dict]:
  163. url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
  164. payload = json.dumps({"params": {"relations": relation_list}})
  165. async with AsyncHttpClient(timeout=60) as client:
  166. res = await client.post(url=url, headers=HEADERS, data=payload)
  167. return res
  168. async def get_titles_from_produce_plan(pool, plan_id, threshold=None):
  169. if not threshold:
  170. fifteen_days_ago = datetime.now() - timedelta(days=30)
  171. threshold = fifteen_days_ago.strftime("%Y%m%d") + 15 * "0"
  172. query = f"""
  173. select distinct t2.title
  174. from produce_plan_input_source t3
  175. join crawler_plan_result_rel t1 on t3.input_source_value = t1.plan_id
  176. join crawler_content t2 on t1.channel_source_id = t2.channel_content_id
  177. where t3.plan_id = %s
  178. and t3.input_source_value > %s;
  179. """
  180. response = await pool.async_fetch(
  181. query=query, db_name="aigc", params=(plan_id, threshold)
  182. )
  183. return tuple([i["title"] for i in response])