123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- import os
- import sys
- import json
- from typing import Any, Dict, List, Optional
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from utils.logging_config import get_logger
- from utils.mysql_db import MysqlHelper
- from indentify.indentify import ContentIdentifier
- logger = get_logger('AgentTools')
- class QueryDataTool:
- """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段"""
- @staticmethod
- def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]:
- sql = "SELECT data FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC"
- rows = MysqlHelper.get_values(sql, (request_id,))
- if not rows:
- logger.info(f"request_id={request_id} 未查询到数据,使用默认值")
- # 返回默认数据
- default_data = {
- "crawl_data": {
- "channel": 1,
- "channel_content_id": "684a789b000000002202a61b",
- "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b",
- "wx_sn": None,
- "title": "一个视频学会,5个剪辑工具,超详细教程",
- "content_type": "video",
- "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#",
- "location": "未知",
- "source_url": None,
- "mini_program": None,
- "topic_list": [],
- "image_url_list": [
- {
- "image_type": 2,
- "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg"
- }
- ],
- "video_url_list": [
- {
- "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4",
- "video_duration": 615
- }
- ],
- "bgm_data": None,
- "ad_info": None,
- "is_original": False,
- "voice_data": None,
- "channel_account_id": "670a10ac000000001d0216ec",
- "channel_account_name": "小伍剪辑视频",
- "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg",
- "item_index": None,
- "view_count": None,
- "play_count": None,
- "like_count": 692,
- "collect_count": 996,
- "comment_count": 37,
- "share_count": None,
- "looking_count": None,
- "publish_timestamp": 1749711589000,
- "modify_timestamp": 1749711589000,
- "update_timestamp": 1755239186502
- }
- }
- return [default_data]
- results: List[Dict[str, Any]] = []
- for row in rows:
- data_cell = row[0]
- if not data_cell:
- continue
- try:
- parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell
- if isinstance(parsed, list):
- for item in parsed:
- if isinstance(item, dict):
- crawl_data = item.get('crawl_data')
- if isinstance(crawl_data, (dict, list)):
- results.append({"crawl_data": crawl_data, "raw": item})
- else:
- results.append({"crawl_data": item, "raw": item})
- elif isinstance(parsed, dict):
- crawl_data = parsed.get('crawl_data')
- if isinstance(crawl_data, (dict, list)):
- results.append({"crawl_data": crawl_data, "raw": parsed})
- else:
- results.append({"crawl_data": parsed, "raw": parsed})
- else:
- logger.warning("data 字段非期望的 JSON 结构,已跳过一行")
- except Exception as e:
- logger.error(f"解析 data JSON 失败: {e}")
- logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}")
- return results
- class IdentifyTool:
- """调用 indentify 内部能力,完成图像/视频识别"""
- def __init__(self) -> None:
- self.identifier = ContentIdentifier()
- def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]:
- try:
- formatted_content = self.identifier.parse_formatted_content(crawl_data)
- recognition_result = self.identifier.process_content_recognition(formatted_content)
- title = formatted_content.get('title') or ''
- content = formatted_content.get('body_text') or ''
- channel = formatted_content.get('channel') or ''
- author = formatted_content.get('channel_account_name') or ''
- like_count = formatted_content.get('like_count') or 0
- collect_count = formatted_content.get('collect_count') or 0
- comment_count = formatted_content.get('comment_count') or 0
- view_count = formatted_content.get('view_count') or 0
- publish_time = formatted_content.get('publish_time') or ''
- update_timestamp = formatted_content.get('update_timestamp') or ''
- content_link = formatted_content.get('content_link') or ''
- content_id = formatted_content.get('channel_content_id') or ''
- complete_result = {
- 'channel': channel,
- 'title': title,
- 'content': content,
- 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
- 'videos': recognition_result.get('video_analysis', {}),
- 'meta': {
- 'author': author,
- 'like_count': like_count,
- 'collect_count': collect_count,
- 'comment_count': comment_count,
- 'view_count': view_count,
- 'publish_time': publish_time,
- 'update_timestamp': update_timestamp,
- 'content_link': content_link,
- 'content_id': content_id,
- }
- }
- return complete_result
- except Exception as e:
- logger.error(f"识别失败: {e}")
- return {
- 'channel': '',
- 'title': '',
- 'content': '',
- 'images': [],
- 'videos': [],
- 'meta': {},
- 'error': str(e)
- }
- class StructureTool:
- """
- 结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合,
- 并存入 knowledge_parsing_content 表。
- """
- @staticmethod
- def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]:
- """
- 存储解析结果到 knowledge_parsing_content 表
-
- Args:
- request_id: 请求ID
- crawl_raw: 原始爬取数据
- identify_result: 识别结果
-
- Returns:
- 插入的行ID,失败返回None
- """
- try:
- # 从原始数据中提取必要字段
- content_id = crawl_raw.get('channel_content_id') or ''
- task_id = 0 # 默认任务ID,可根据需要调整
-
- # 构建存储数据
- parsing_data = {
- 'crawl_raw': crawl_raw,
- 'identify_result': identify_result,
- }
-
- sql = (
- "INSERT INTO knowledge_parsing_content "
- "(content_id, request_id, task_id, parsing_data, create_time, status) "
- "VALUES (%s, %s, %s, %s, NOW(), %s)"
- )
-
- # 状态:2 表示处理完成
- status = 2
- params = (
- content_id,
- request_id,
- task_id,
- json.dumps(parsing_data, ensure_ascii=False),
- status
- )
-
- result = MysqlHelper.insert_and_get_id(sql, params)
- if result:
- logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}, insert_id={result}")
- return result
-
- except Exception as e:
- logger.error(f"存储解析结果失败: request_id={request_id}, error={e}")
- return None
|