indentify.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 内容识别脚本
  5. 主要功能:
  6. 1. 从数据库中拉取一条 recognition_status = 0 的数据 :
  7. 2. 解析 formatted_content 中的图片和视频
  8. 3. 调用独立的图文识别和视频识别模块
  9. 4. 将识别结果更新到数据库
  10. """
  11. import os
  12. import json
  13. import time
  14. import sys
  15. import argparse
  16. from typing import Dict, Any, List, Optional
  17. from dotenv import load_dotenv
  18. from datetime import datetime
  19. # 导入自定义模块
  20. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  21. from utils.mysql_db import MysqlHelper
  22. from indentify.image_identifier import ImageIdentifier
  23. from indentify.video_identifier import VideoIdentifier
  24. from utils.logging_config import get_logger
  25. class ContentIdentifier:
  26. def __init__(self):
  27. # 加载环境变量
  28. load_dotenv()
  29. # 设置日志
  30. self.logger = get_logger('ContentIdentifier')
  31. # 初始化数据库连接
  32. self.db = MysqlHelper()
  33. # 初始化识别模块
  34. self.image_identifier = ImageIdentifier()
  35. self.video_identifier = VideoIdentifier()
  36. def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
  37. """从数据库获取一条未处理的数据
  38. 先从 knowledge_content_query 表中选取 category_id = 0 的所有 query_word,
  39. 然后用这些 query_word 去 knowledge_search_content 表中匹配,
  40. 找出 recognition_status = 0 的一条开始处理
  41. """
  42. try:
  43. # 第一步:获取 category_id = 0 的所有 query_word
  44. query_sql = """
  45. SELECT query_word
  46. FROM knowledge_content_query
  47. WHERE category_id = 0
  48. """
  49. query_result = self.db.get_values(query_sql)
  50. if not query_result:
  51. self.logger.warning("未找到 category_id = 0 的 query_word")
  52. return None
  53. query_words = [row[0] for row in query_result]
  54. self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
  55. # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
  56. # 使用 IN 查询来匹配多个 query_word
  57. if len(query_words) > 0:
  58. # 构建带引号的查询条件,因为 query_word 是字符串
  59. quoted_words = [f"'{word}'" for word in query_words]
  60. placeholders = ','.join(quoted_words)
  61. content_sql = f"""
  62. SELECT id, formatted_content
  63. FROM knowledge_search_content
  64. WHERE recognition_status = 0
  65. AND query_word IN ({placeholders})
  66. LIMIT 1
  67. """
  68. # 不需要传递参数,因为SQL已经包含了具体的值
  69. result = self.db.get_values(content_sql)
  70. else:
  71. self.logger.warning("没有可用的 query_word 进行匹配")
  72. return None
  73. if result and len(result) > 0:
  74. record = result[0]
  75. # 检查返回的字段数量
  76. return {
  77. 'id': record[0],
  78. 'formatted_content': record[1]
  79. }
  80. else:
  81. self.logger.info("未找到匹配 query_word 且 recognition_status = 0 的记录")
  82. return None
  83. except Exception as e:
  84. self.logger.error(f"获取未处理记录失败: {e}")
  85. return None
  86. def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
  87. """解析 formatted_content JSON 字符串"""
  88. try:
  89. if isinstance(formatted_content, str):
  90. return json.loads(formatted_content)
  91. elif isinstance(formatted_content, dict):
  92. return formatted_content
  93. else:
  94. raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
  95. except json.JSONDecodeError as e:
  96. self.logger.error(f"解析 formatted_content JSON 失败: {e}")
  97. raise
  98. def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
  99. """处理内容识别,调用独立的识别模块"""
  100. self.logger.info("开始内容识别处理...")
  101. # 图片识别
  102. image_result = self.image_identifier.process_images(formatted_content)
  103. # 视频识别
  104. video_result = self.video_identifier.process_videos(formatted_content)
  105. # 整合结果
  106. recognition_result = {
  107. 'image_analysis': image_result,
  108. 'video_analysis': video_result
  109. }
  110. self.logger.info(f"识别结果: {recognition_result}")
  111. return recognition_result
  112. def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:
  113. """更新数据库中的 multimodal_recognition 字段"""
  114. try:
  115. # 将结果转换为JSON字符串,并处理换行符问题
  116. result_json = json.dumps(recognition_result, ensure_ascii=False)
  117. # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
  118. result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
  119. # 构建更新SQL - 使用参数化查询避免换行符问题
  120. sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
  121. params = (result_json, record_id)
  122. # 执行更新
  123. result = self.db.update_values(sql, params)
  124. if result is not None:
  125. self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
  126. return True
  127. else:
  128. self.logger.error(f"更新记录 {record_id} 失败")
  129. return False
  130. except Exception as e:
  131. self.logger.error(f"更新数据库失败: {e}")
  132. return False
  133. def process_single_record(self) -> bool:
  134. """处理单条记录"""
  135. try:
  136. # 获取未处理的记录
  137. record = self.get_unprocessed_record()
  138. if not record:
  139. self.logger.warning("没有找到未处理的记录")
  140. return False
  141. self.logger.info(f"开始处理记录 ID: {record['id']}")
  142. # self.logger.info(f" 多模态识别: {record['multimodal_recognition'][:300]}...")
  143. # 先设置这条记录的 recognition_status = 1
  144. self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 1 WHERE id = {record['id']}")
  145. # 解析 formatted_content
  146. formatted_content = self.parse_formatted_content(record['formatted_content'])
  147. # 提取基本信息,处理 null 值
  148. title = formatted_content.get('title') or ''
  149. content = formatted_content.get('body_text') or ''
  150. channel = formatted_content.get('channel') or ''
  151. images = formatted_content.get('image_url_list') or []
  152. videos = formatted_content.get('video_url_list') or []
  153. author = formatted_content.get('channel_account_name') or ''
  154. like_count = formatted_content.get('like_count') or 0
  155. collect_count = formatted_content.get('collect_count') or 0
  156. comment_count = formatted_content.get('comment_count') or 0
  157. view_count = formatted_content.get('view_count') or 0
  158. publish_time = formatted_content.get('publish_time') or ''
  159. update_timestamp = formatted_content.get('update_timestamp') or ''
  160. content_link = formatted_content.get('content_link') or ''
  161. content_id = formatted_content.get('channel_content_id') or ''
  162. # 调用内容识别处理
  163. recognition_result = self.process_content_recognition(formatted_content)
  164. # 判断识别是否成功:如果视频任一项的 asr_content 包含“视频上传失败”或者包含“ASR分析失败”,则标记失败
  165. video_analysis = recognition_result.get('video_analysis', {})
  166. video_upload_failed = False
  167. if isinstance(video_analysis, list):
  168. for video_item in video_analysis:
  169. if isinstance(video_item, dict) and 'asr_content' in video_item:
  170. if '视频上传失败' in (video_item.get('asr_content') or '') or 'ASR分析失败' in (video_item.get('asr_content') or ''):
  171. video_upload_failed = True
  172. break
  173. elif isinstance(video_analysis, dict):
  174. if 'asr_content' in video_analysis and ('视频上传失败' in (video_analysis.get('asr_content') or '') or 'ASR分析失败' in (video_analysis.get('asr_content') or '')):
  175. video_upload_failed = True
  176. if video_upload_failed:
  177. self.logger.info(f"记录 {record['id']} 识别失败,将 recognition_status 设置为 3")
  178. self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
  179. return False
  180. # 构建完整的识别结果
  181. complete_result = {
  182. 'id': record['id'],
  183. 'channel': channel,
  184. 'title': title,
  185. 'content': content,
  186. 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
  187. 'videos': recognition_result.get('video_analysis', {}),
  188. 'meta': {
  189. 'author': author,
  190. 'like_count': like_count,
  191. 'collect_count': collect_count,
  192. 'comment_count': comment_count,
  193. 'view_count': view_count,
  194. 'publish_time': publish_time,
  195. 'update_timestamp': update_timestamp,
  196. 'content_link': content_link,
  197. 'content_id': content_id,
  198. }
  199. }
  200. # 更新数据库
  201. success = self.update_multimodal_recognition(record['id'], complete_result)
  202. if success:
  203. self.logger.info(f"记录 {record['id']} 处理完成")
  204. return True
  205. else:
  206. self.logger.error(f"记录 {record['id']} 处理失败")
  207. return False
  208. except Exception as e:
  209. self.logger.error(f"处理记录失败: {e}")
  210. return False
  211. def main():
  212. """主函数"""
  213. identifier = ContentIdentifier()
  214. identifier.process_single_record()
  215. if __name__ == '__main__':
  216. main()