123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- import os
- import sys
- import json
- from typing import Any, Dict, List, Optional
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from utils.logging_config import get_logger
- from utils.mysql_db import MysqlHelper
- from indentify.indentify import ContentIdentifier
- from structure import StructureProcessor
- logger = get_logger('AgentTools')
- class QueryDataTool:
- """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段"""
- @staticmethod
- def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]:
- sql = "SELECT * FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC"
- rows = MysqlHelper.get_values(sql, (request_id,))
- if not rows:
- logger.info(f"request_id={request_id} 未查询到数据,使用默认值")
- # 返回默认数据
- default_data = [{
- "request_id": request_id,
- "content_id": "1",
- "id": 1,
- "task_id": 1,
- "crawl_data": {
- "channel": 1,
- "channel_content_id": "684a789b000000002202a61b",
- "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b",
- "wx_sn": None,
- "title": "一个视频学会,5个剪辑工具,超详细教程",
- "content_type": "video",
- "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#",
- "location": "未知",
- "source_url": None,
- "mini_program": None,
- "topic_list": [],
- "image_url_list": [
- {
- "image_type": 2,
- "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg"
- }
- ],
- "video_url_list": [
- # {
- # "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4",
- # "video_duration": 615
- # }
- ],
- "bgm_data": None,
- "ad_info": None,
- "is_original": False,
- "voice_data": None,
- "channel_account_id": "670a10ac000000001d0216ec",
- "channel_account_name": "小伍剪辑视频",
- "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg",
- "item_index": None,
- "view_count": None,
- "play_count": None,
- "like_count": 692,
- "collect_count": 996,
- "comment_count": 37,
- "share_count": None,
- "looking_count": None,
- "publish_timestamp": 1749711589000,
- "modify_timestamp": 1749711589000,
- "update_timestamp": 1755239186502
- }
- },{
- "request_id": request_id,
- "content_id": "2",
- "id": 2,
- "task_id": 2,
- "crawl_data": {
- "channel": 1,
- "channel_content_id": "684a789b000000002202a61b",
- "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b",
- "wx_sn": None,
- "title": "一个视频学会,5个剪辑工具,超详细教程",
- "content_type": "video",
- "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#",
- "location": "未知",
- "source_url": None,
- "mini_program": None,
- "topic_list": [],
- "image_url_list": [
- {
- "image_type": 2,
- "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg"
- }
- ],
- "video_url_list": [
- {
- "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4",
- "video_duration": 615
- }
- ],
- "bgm_data": None,
- "ad_info": None,
- "is_original": False,
- "voice_data": None,
- "channel_account_id": "670a10ac000000001d0216ec",
- "channel_account_name": "小伍剪辑视频",
- "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg",
- "item_index": None,
- "view_count": None,
- "play_count": None,
- "like_count": 692,
- "collect_count": 996,
- "comment_count": 37,
- "share_count": None,
- "looking_count": None,
- "publish_timestamp": 1749711589000,
- "modify_timestamp": 1749711589000,
- "update_timestamp": 1755239186502
- }
- }]
- return default_data
- results: List[Dict[str, Any]] = []
- for row in rows:
- data_cell = row[0]
- if not data_cell:
- continue
- try:
- parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell
- if isinstance(parsed, list):
- for item in parsed:
- if isinstance(item, dict):
- crawl_data = item.get('crawl_data')
- if isinstance(crawl_data, (dict, list)):
- results.append({"crawl_data": crawl_data, "raw": item})
- else:
- results.append({"crawl_data": item, "raw": item})
- elif isinstance(parsed, dict):
- crawl_data = parsed.get('crawl_data')
- if isinstance(crawl_data, (dict, list)):
- results.append({"crawl_data": crawl_data, "raw": parsed})
- else:
- results.append({"crawl_data": parsed, "raw": parsed})
- else:
- logger.warning("data 字段非期望的 JSON 结构,已跳过一行")
- except Exception as e:
- logger.error(f"解析 data JSON 失败: {e}")
- logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}")
- return results
- class IdentifyTool:
- """调用 indentify 内部能力,完成图像/视频识别"""
- def __init__(self) -> None:
- self.identifier = ContentIdentifier()
- def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]:
- try:
- formatted_content = self.identifier.parse_formatted_content(crawl_data)
- recognition_result = self.identifier.process_content_recognition(formatted_content)
- title = formatted_content.get('title') or ''
- content = formatted_content.get('body_text') or ''
- channel = formatted_content.get('channel') or ''
- author = formatted_content.get('channel_account_name') or ''
- like_count = formatted_content.get('like_count') or 0
- collect_count = formatted_content.get('collect_count') or 0
- comment_count = formatted_content.get('comment_count') or 0
- view_count = formatted_content.get('view_count') or 0
- publish_time = formatted_content.get('publish_time') or ''
- update_timestamp = formatted_content.get('update_timestamp') or ''
- content_link = formatted_content.get('content_link') or ''
- content_id = formatted_content.get('channel_content_id') or ''
- complete_result = {
- 'channel': channel,
- 'title': title,
- 'content': content,
- 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
- 'videos': recognition_result.get('video_analysis', {}),
- 'meta': {
- 'author': author,
- 'like_count': like_count,
- 'collect_count': collect_count,
- 'comment_count': comment_count,
- 'view_count': view_count,
- 'publish_time': publish_time,
- 'update_timestamp': update_timestamp,
- 'content_link': content_link,
- 'content_id': content_id,
- }
- }
- return complete_result
- except Exception as e:
- logger.error(f"识别失败: {e}")
- return {
- 'channel': '',
- 'title': '',
- 'content': '',
- 'images': [],
- 'videos': [],
- 'meta': {},
- 'error': str(e)
- }
- class UpdateDataTool:
- """
- 结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合,
- 并存入 knowledge_parsing_content 表。
- """
-
- @staticmethod
- def store_indentify_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]:
- """
- 存储解析结果到 knowledge_parsing_content 表
-
- Args:
- request_id: 请求ID
- crawl_raw: 原始爬取数据
- identify_result: 识别结果
-
- Returns:
- 插入的行ID,失败返回None
- """
- try:
- # 从原始数据中提取必要字段
- content_id = crawl_raw.get('content_id') or ''
- task_id = crawl_raw.get('task_id') or '' # 默认任务ID,可根据需要调整
-
- # 构建存储数据
-
- sql = (
- "INSERT INTO knowledge_parsing_content "
- "(content_id, request_id, task_id, indentify_data, create_time, status) "
- "VALUES (%s, %s, %s, %s, NOW(), %s)"
- )
-
- # 状态:2 表示处理完成
- status = 2
- params = (
- content_id,
- request_id,
- task_id,
- json.dumps(identify_result, ensure_ascii=False),
- status
- )
-
- result = MysqlHelper.insert_and_get_id(sql, params)
- if result:
- logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}, insert_id={result}")
- return result
-
- except Exception as e:
- logger.error(f"存储解析结果失败: request_id={request_id}, error={e}")
- return None
- @staticmethod
- def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], parsing_result: Dict[str, Any]) -> Optional[int]:
- """
- 存储解析结果到 knowledge_parsing_content 表
-
- Args:
- request_id: 请求ID
- crawl_raw: 原始爬取数据
- parsing_result: 结构化的结果(建议传入 StructureTool.process_content_structure 的返回值)
-
- Returns:
- 受影响的行数,失败返回None
- """
- try:
- # 从原始数据中提取必要字段
- content_id = crawl_raw.get('content_id') or ''
-
- # 只提取result字段的内容,如果不存在则使用整个对象
- structured_content = parsing_result.get('structured_content', {})
- if isinstance(structured_content, dict) and 'result' in structured_content:
- # 如果structured_content是字典且包含result字段,只存储result字段
- parsing_payload = structured_content['result']
- else:
- # 否则存储整个structured_content
- parsing_payload = structured_content
-
- # 更新数据
- sql = (
- "UPDATE knowledge_parsing_content "
- "SET parsing_data = %s, status = %s "
- "WHERE content_id = %s"
- )
-
- # 状态:5 表示结构化处理完成
- status = 5
- params = (
- json.dumps(parsing_payload, ensure_ascii=False),
- status,
- content_id
- )
-
- result = MysqlHelper.update_values(sql, params)
- if result:
- logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}")
- return result
-
- except Exception as e:
- logger.error(f"存储解析结果失败: request_id={request_id}, error={e}")
- return None
- class StructureTool:
- """
- 内容结构化工具:调用tools/structure内部的方法进行内容结构化处理
- """
-
- def __init__(self):
- """初始化结构化工具"""
- self.structure_processor = StructureProcessor()
-
- def process_content_structure(self, content_data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 处理内容结构化
-
- Args:
- content_data: 包含识别结果的内容数据,格式如下:
- {
- 'channel': str,
- 'title': str,
- 'content': str,
- 'images': List[str],
- 'videos': Dict,
- 'meta': Dict
- }
-
- Returns:
- Dict[str, Any]: 结构化处理后的结果
- """
- try:
- # 结构化输入规范化
- structure_input = {
- "title": content_data.get('title', ''),
- "body_text": content_data.get('content', ''),
- "images_comprehension": content_data.get('images', [])
- }
-
- # 调用结构化处理器
- structured_content = self.structure_processor.process_content(structure_input)
-
- # 若返回为字符串或字典,直接封装;不访问 .result
- result = {
- 'original_data': content_data,
- 'structured_content': structured_content,
- 'structure_status': 'success',
- 'process_time': self._get_current_timestamp()
- }
-
- logger.info(f"内容结构化处理成功: title={content_data.get('title', '')}")
- return result
-
- except Exception as e:
- logger.error(f"内容结构化处理失败: {e}")
- return {
- 'original_data': content_data,
- 'structured_content': '',
- 'structure_status': 'failed',
- 'error': str(e),
- 'process_time': self._get_current_timestamp()
- }
-
- def _get_current_timestamp(self) -> str:
- """获取当前时间戳字符串"""
- from datetime import datetime
- return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|