agent_tools.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import os
  2. import sys
  3. import json
  4. from typing import Any, Dict, List, Optional
  5. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  6. from utils.logging_config import get_logger
  7. from utils.mysql_db import MysqlHelper
  8. from indentify.indentify import ContentIdentifier
  9. from structure import StructureProcessor
  10. logger = get_logger('AgentTools')
  11. class QueryDataTool:
  12. """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段"""
  13. @staticmethod
  14. def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]:
  15. sql = "SELECT content_id, crawl_data FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC"
  16. rows = MysqlHelper.get_values(sql, (request_id,))
  17. if not rows:
  18. logger.info(f"request_id={request_id} 未查询到数据,使用默认值")
  19. # 返回默认数据
  20. default_data = [{
  21. "request_id": request_id,
  22. "content_id": "1",
  23. "id": 1,
  24. "task_id": 1,
  25. "crawl_data": {
  26. "channel": 1,
  27. "channel_content_id": "684a789b000000002202a61b",
  28. "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b",
  29. "wx_sn": None,
  30. "title": "一个视频学会,5个剪辑工具,超详细教程",
  31. "content_type": "video",
  32. "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#",
  33. "location": "未知",
  34. "source_url": None,
  35. "mini_program": None,
  36. "topic_list": [],
  37. "image_url_list": [
  38. {
  39. "image_type": 2,
  40. "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg"
  41. }
  42. ],
  43. "video_url_list": [
  44. # {
  45. # "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4",
  46. # "video_duration": 615
  47. # }
  48. ],
  49. "bgm_data": None,
  50. "ad_info": None,
  51. "is_original": False,
  52. "voice_data": None,
  53. "channel_account_id": "670a10ac000000001d0216ec",
  54. "channel_account_name": "小伍剪辑视频",
  55. "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg",
  56. "item_index": None,
  57. "view_count": None,
  58. "play_count": None,
  59. "like_count": 692,
  60. "collect_count": 996,
  61. "comment_count": 37,
  62. "share_count": None,
  63. "looking_count": None,
  64. "publish_timestamp": 1749711589000,
  65. "modify_timestamp": 1749711589000,
  66. "update_timestamp": 1755239186502
  67. }
  68. },{
  69. "request_id": request_id,
  70. "content_id": "2",
  71. "id": 2,
  72. "task_id": 2,
  73. "crawl_data": {
  74. "channel": 1,
  75. "channel_content_id": "684a789b000000002202a61b",
  76. "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b",
  77. "wx_sn": None,
  78. "title": "一个视频学会,5个剪辑工具,超详细教程",
  79. "content_type": "video",
  80. "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#",
  81. "location": "未知",
  82. "source_url": None,
  83. "mini_program": None,
  84. "topic_list": [],
  85. "image_url_list": [
  86. {
  87. "image_type": 2,
  88. "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg"
  89. }
  90. ],
  91. "video_url_list": [
  92. {
  93. "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4",
  94. "video_duration": 615
  95. }
  96. ],
  97. "bgm_data": None,
  98. "ad_info": None,
  99. "is_original": False,
  100. "voice_data": None,
  101. "channel_account_id": "670a10ac000000001d0216ec",
  102. "channel_account_name": "小伍剪辑视频",
  103. "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg",
  104. "item_index": None,
  105. "view_count": None,
  106. "play_count": None,
  107. "like_count": 692,
  108. "collect_count": 996,
  109. "comment_count": 37,
  110. "share_count": None,
  111. "looking_count": None,
  112. "publish_timestamp": 1749711589000,
  113. "modify_timestamp": 1749711589000,
  114. "update_timestamp": 1755239186502
  115. }
  116. }]
  117. return []
  118. results: List[Dict[str, Any]] = []
  119. for row in rows:
  120. data_cell = row
  121. if not data_cell:
  122. continue
  123. try:
  124. parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell
  125. # logger.info(f"parsed: {parsed}")
  126. # 处理元组类型(数据库查询结果)
  127. if isinstance(parsed, tuple) and len(parsed) >= 2:
  128. # 依据查询列顺序: (content_id, crawl_data)
  129. json_str = parsed[1]
  130. content_id = parsed[0]
  131. if isinstance(json_str, str):
  132. try:
  133. json_data = json.loads(json_str)
  134. if isinstance(json_data, dict):
  135. results.append({"crawl_data": json_data, "content_id": content_id, "raw": parsed})
  136. elif isinstance(json_data, list):
  137. for item in json_data:
  138. if isinstance(item, dict):
  139. results.append({"crawl_data": item, "content_id": content_id, "raw": parsed})
  140. except json.JSONDecodeError:
  141. logger.warning(f"元组中第4个元素不是有效的JSON: {json_str}")
  142. else:
  143. logger.warning(f"元组中第4个元素不是字符串: {type(json_str)}")
  144. # 处理列表类型
  145. elif isinstance(parsed, list):
  146. for item in parsed:
  147. if isinstance(item, dict):
  148. crawl_data = item.get('crawl_data')
  149. content_id = item.get('content_id')
  150. if isinstance(crawl_data, (dict, list)):
  151. results.append({"crawl_data": crawl_data, "content_id": content_id, "raw": item})
  152. else:
  153. results.append({"crawl_data": item, "content_id": content_id, "raw": item})
  154. # 处理字典类型
  155. elif isinstance(parsed, dict):
  156. crawl_data = parsed.get('crawl_data')
  157. content_id = parsed.get('content_id')
  158. if isinstance(crawl_data, (dict, list)):
  159. results.append({"crawl_data": crawl_data, "content_id": content_id, "raw": parsed})
  160. else:
  161. results.append({"crawl_data": parsed, "content_id": content_id, "raw": parsed})
  162. else:
  163. logger.warning(f"data 字段非期望的数据结构: {type(parsed)}, 已跳过一行")
  164. except Exception as e:
  165. logger.error(f"解析 data JSON 失败: {e}")
  166. logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}")
  167. return results
  168. class IdentifyTool:
  169. """调用 indentify 内部能力,完成图像/视频识别"""
  170. def __init__(self) -> None:
  171. self.identifier = ContentIdentifier()
  172. def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]:
  173. try:
  174. formatted_content = self.identifier.parse_formatted_content(crawl_data)
  175. recognition_result = self.identifier.process_content_recognition(formatted_content)
  176. title = formatted_content.get('title') or ''
  177. content = formatted_content.get('body_text') or ''
  178. channel = formatted_content.get('channel') or ''
  179. author = formatted_content.get('channel_account_name') or ''
  180. like_count = formatted_content.get('like_count') or 0
  181. collect_count = formatted_content.get('collect_count') or 0
  182. comment_count = formatted_content.get('comment_count') or 0
  183. view_count = formatted_content.get('view_count') or 0
  184. publish_time = formatted_content.get('publish_time') or ''
  185. update_timestamp = formatted_content.get('update_timestamp') or ''
  186. content_link = formatted_content.get('content_link') or ''
  187. content_id = formatted_content.get('channel_content_id') or ''
  188. complete_result = {
  189. 'channel': channel,
  190. 'title': title,
  191. 'content': content,
  192. 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
  193. 'videos': recognition_result.get('video_analysis', {}),
  194. 'meta': {
  195. 'author': author,
  196. 'like_count': like_count,
  197. 'collect_count': collect_count,
  198. 'comment_count': comment_count,
  199. 'view_count': view_count,
  200. 'publish_time': publish_time,
  201. 'update_timestamp': update_timestamp,
  202. 'content_link': content_link,
  203. 'content_id': content_id,
  204. }
  205. }
  206. return complete_result
  207. except Exception as e:
  208. logger.error(f"识别失败: {e}")
  209. return {
  210. 'channel': '',
  211. 'title': '',
  212. 'content': '',
  213. 'images': [],
  214. 'videos': [],
  215. 'meta': {},
  216. 'error': str(e)
  217. }
  218. class UpdateDataTool:
  219. """
  220. 结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合,
  221. 并存入 knowledge_parsing_content 表。
  222. """
  223. @staticmethod
  224. def store_indentify_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]:
  225. """
  226. 存储解析结果到 knowledge_parsing_content 表
  227. Args:
  228. request_id: 请求ID
  229. crawl_raw: 原始爬取数据
  230. identify_result: 识别结果
  231. Returns:
  232. 插入的行ID,失败返回None
  233. """
  234. try:
  235. logger.info(f"存储识别结果: request_id={request_id}, crawl_raw={crawl_raw}, identify_result={identify_result}")
  236. # 从原始数据中提取必要字段
  237. content_id = crawl_raw.get('content_id') or ''
  238. task_id = crawl_raw.get('task_id') or '' # 默认任务ID,可根据需要调整
  239. # 先查询是否存在相同 request_id + content_id 的记录
  240. check_sql = "SELECT id, status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s LIMIT 1"
  241. exists = MysqlHelper.get_values(check_sql, (request_id, content_id))
  242. # 状态:2 表示识别处理完成
  243. status = 2
  244. serialized_identify = json.dumps(identify_result, ensure_ascii=False)
  245. if exists:
  246. # 已存在则更新,不新建
  247. existing_id = exists[0][0] if isinstance(exists, list) and len(exists) > 0 else None
  248. update_sql = (
  249. "UPDATE knowledge_parsing_content "
  250. "SET indentify_data = %s, task_id = %s, status = %s "
  251. "WHERE request_id = %s AND content_id = %s"
  252. )
  253. update_params = (
  254. serialized_identify,
  255. task_id,
  256. status,
  257. request_id,
  258. content_id
  259. )
  260. updated = MysqlHelper.update_values(update_sql, update_params)
  261. if updated is not None:
  262. logger.info(f"更新识别结果成功: request_id={request_id}, content_id={content_id}, id={existing_id}")
  263. return existing_id
  264. else:
  265. # 不存在则插入
  266. insert_sql = (
  267. "INSERT INTO knowledge_parsing_content "
  268. "(content_id, request_id, task_id, indentify_data, create_time, status) "
  269. "VALUES (%s, %s, %s, %s, NOW(), %s)"
  270. )
  271. insert_params = (
  272. content_id,
  273. request_id,
  274. task_id,
  275. serialized_identify,
  276. status
  277. )
  278. result = MysqlHelper.insert_and_get_id(insert_sql, insert_params)
  279. if result:
  280. logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}, insert_id={result}")
  281. return result
  282. except Exception as e:
  283. logger.error(f"存储解析结果失败: request_id={request_id}, error={e}")
  284. return None
  285. @staticmethod
  286. def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], parsing_result: Dict[str, Any]) -> Optional[int]:
  287. """
  288. 存储解析结果到 knowledge_parsing_content 表
  289. Args:
  290. request_id: 请求ID
  291. crawl_raw: 原始爬取数据
  292. parsing_result: 结构化的结果(建议传入 StructureTool.process_content_structure 的返回值)
  293. Returns:
  294. 受影响的行数,失败返回None
  295. """
  296. try:
  297. # 从原始数据中提取必要字段
  298. content_id = crawl_raw.get('content_id') or ''
  299. # 只提取result字段的内容,如果不存在则使用整个对象
  300. structured_content = parsing_result.get('structured_content', {})
  301. if isinstance(structured_content, dict) and 'result' in structured_content:
  302. # 如果structured_content是字典且包含result字段,只存储result字段
  303. parsing_payload = structured_content['result']
  304. else:
  305. # 否则存储整个structured_content
  306. parsing_payload = structured_content
  307. # 更新数据
  308. sql = (
  309. "UPDATE knowledge_parsing_content "
  310. "SET parsing_data = %s, status = %s "
  311. "WHERE content_id = %s AND id = %s"
  312. )
  313. # 状态:5 表示结构化处理完成
  314. status = 5
  315. # 判断是否需要序列化为字符串
  316. if isinstance(parsing_payload, (dict, list)):
  317. parsing_payload = json.dumps(parsing_payload, ensure_ascii=False)
  318. params = (
  319. parsing_payload,
  320. status,
  321. content_id,
  322. crawl_raw.get('id') or ''
  323. )
  324. result = MysqlHelper.update_values(sql, params)
  325. if result:
  326. logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}")
  327. return result
  328. except Exception as e:
  329. logger.error(f"存储解析结果失败: request_id={request_id}, error={e}")
  330. return None
  331. class StructureTool:
  332. """
  333. 内容结构化工具:调用tools/structure内部的方法进行内容结构化处理
  334. """
  335. def __init__(self):
  336. """初始化结构化工具"""
  337. self.structure_processor = StructureProcessor()
  338. def process_content_structure(self, content_data: Dict[str, Any]) -> Dict[str, Any]:
  339. """
  340. 处理内容结构化
  341. Args:
  342. content_data: 包含识别结果的内容数据,格式如下:
  343. {
  344. 'channel': str,
  345. 'title': str,
  346. 'content': str,
  347. 'images': List[str],
  348. 'videos': Dict,
  349. 'meta': Dict
  350. }
  351. Returns:
  352. Dict[str, Any]: 结构化处理后的结果
  353. """
  354. try:
  355. # 结构化输入规范化
  356. structure_input = {
  357. "title": content_data.get('title', ''),
  358. "body_text": content_data.get('content', ''),
  359. "images_comprehension": content_data.get('images', [])
  360. }
  361. # 调用结构化处理器
  362. structured_content = self.structure_processor.process_content(structure_input)
  363. # 若返回为字符串或字典,直接封装;不访问 .result
  364. result = {
  365. 'original_data': content_data,
  366. 'structured_content': structured_content,
  367. 'structure_status': 'success',
  368. 'process_time': self._get_current_timestamp()
  369. }
  370. logger.info(f"内容结构化处理成功: title={content_data.get('title', '')}")
  371. return result
  372. except Exception as e:
  373. logger.error(f"内容结构化处理失败: {e}")
  374. return {
  375. 'original_data': content_data,
  376. 'structured_content': '',
  377. 'structure_status': 'failed',
  378. 'error': str(e),
  379. 'process_time': self._get_current_timestamp()
  380. }
  381. def _get_current_timestamp(self) -> str:
  382. """获取当前时间戳字符串"""
  383. from datetime import datetime
  384. return datetime.now().strftime('%Y-%m-%d %H:%M:%S')