indentify.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 内容识别脚本
  5. 主要功能:
  6. 1. 从数据库中拉取一条 recognition_status = 0 的数据 :
  7. 2. 解析 formatted_content 中的图片和视频
  8. 3. 调用独立的图文识别和视频识别模块
  9. 4. 将识别结果更新到数据库
  10. """
  11. import os
  12. import json
  13. import time
  14. import sys
  15. import argparse
  16. from typing import Dict, Any, List, Optional
  17. from dotenv import load_dotenv
  18. from datetime import datetime
  19. # 导入自定义模块
  20. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  21. from utils.mysql_db import MysqlHelper
  22. from content_indentify.image_identifier import ImageIdentifier
  23. from content_indentify.video_identifier import VideoIdentifier
  24. from utils.logging_config import get_logger
  25. class ContentIdentifier:
  26. def __init__(self):
  27. # 加载环境变量
  28. load_dotenv()
  29. # 设置日志
  30. self.logger = get_logger('ContentIdentifier')
  31. # 初始化数据库连接
  32. self.db = MysqlHelper()
  33. # 初始化识别模块
  34. self.image_identifier = ImageIdentifier()
  35. self.video_identifier = VideoIdentifier()
  36. def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
  37. """从数据库获取一条未处理的数据"""
  38. sql = """
  39. SELECT id, formatted_content
  40. FROM knowledge_search_content
  41. WHERE recognition_status = 0
  42. LIMIT 1
  43. """
  44. try:
  45. result = self.db.get_values(sql)
  46. if result and len(result) > 0:
  47. record = result[0]
  48. # 检查返回的字段数量
  49. if len(record) >= 3:
  50. return {
  51. 'id': record[0],
  52. 'formatted_content': record[1],
  53. 'channel_content_id': record[2]
  54. }
  55. elif len(record) == 2:
  56. # 如果没有channel_content_id字段,使用id作为默认值
  57. return {
  58. 'id': record[0],
  59. 'formatted_content': record[1],
  60. 'channel_content_id': record[0] # 使用id作为默认值
  61. }
  62. else:
  63. self.logger.error(f"数据库返回字段数量异常: {len(record)}, 期望至少2个字段")
  64. return None
  65. return None
  66. except Exception as e:
  67. self.logger.error(f"获取未处理记录失败: {e}")
  68. return None
  69. def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
  70. """解析 formatted_content JSON 字符串"""
  71. try:
  72. if isinstance(formatted_content, str):
  73. return json.loads(formatted_content)
  74. elif isinstance(formatted_content, dict):
  75. return formatted_content
  76. else:
  77. raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
  78. except json.JSONDecodeError as e:
  79. self.logger.error(f"解析 formatted_content JSON 失败: {e}")
  80. raise
  81. def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
  82. """处理内容识别,调用独立的识别模块"""
  83. self.logger.info("开始内容识别处理...")
  84. # 图片识别
  85. image_result = self.image_identifier.process_images(formatted_content)
  86. # 视频识别
  87. video_result = self.video_identifier.process_videos(formatted_content)
  88. # 整合结果
  89. recognition_result = {
  90. 'image_analysis': image_result,
  91. 'video_analysis': video_result
  92. }
  93. self.logger.info(f"识别结果: {recognition_result}")
  94. return recognition_result
  95. def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:
  96. """更新数据库中的 multimodal_recognition 字段"""
  97. try:
  98. # 将结果转换为JSON字符串,并处理换行符问题
  99. result_json = json.dumps(recognition_result, ensure_ascii=False)
  100. # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
  101. result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
  102. # 构建更新SQL - 使用参数化查询避免换行符问题
  103. sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
  104. params = (result_json, record_id)
  105. # 执行更新
  106. result = self.db.update_values(sql, params)
  107. if result is not None:
  108. self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
  109. return True
  110. else:
  111. self.logger.error(f"更新记录 {record_id} 失败")
  112. return False
  113. except Exception as e:
  114. self.logger.error(f"更新数据库失败: {e}")
  115. return False
  116. def process_single_record(self) -> bool:
  117. """处理单条记录"""
  118. try:
  119. # 获取未处理的记录
  120. record = self.get_unprocessed_record()
  121. if not record:
  122. self.logger.warning("没有找到未处理的记录")
  123. return False
  124. self.logger.info(f"开始处理记录 ID: {record['id']}, 内容ID: {record['channel_content_id']}")
  125. # self.logger.info(f" 多模态识别: {record['multimodal_recognition'][:300]}...")
  126. # 先设置这条记录的 recognition_status = 1
  127. self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
  128. # 解析 formatted_content
  129. formatted_content = self.parse_formatted_content(record['formatted_content'])
  130. # 提取基本信息,处理 null 值
  131. title = formatted_content.get('title') or ''
  132. content = formatted_content.get('body_text') or ''
  133. channel = formatted_content.get('channel') or ''
  134. images = formatted_content.get('image_url_list') or []
  135. videos = formatted_content.get('video_url_list') or []
  136. author = formatted_content.get('channel_account_name') or ''
  137. like_count = formatted_content.get('like_count') or 0
  138. collect_count = formatted_content.get('collect_count') or 0
  139. comment_count = formatted_content.get('comment_count') or 0
  140. view_count = formatted_content.get('view_count') or 0
  141. publish_time = formatted_content.get('publish_time') or ''
  142. update_timestamp = formatted_content.get('update_timestamp') or ''
  143. content_link = formatted_content.get('content_link') or ''
  144. content_id = formatted_content.get('channel_content_id') or ''
  145. # 调用内容识别处理
  146. recognition_result = self.process_content_recognition(formatted_content)
  147. # 构建完整的识别结果
  148. complete_result = {
  149. 'id': record['id'],
  150. 'channel': channel,
  151. 'title': title,
  152. 'content': content,
  153. 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
  154. 'videos': recognition_result.get('video_analysis', {}),
  155. 'meta': {
  156. 'author': author,
  157. 'like_count': like_count,
  158. 'collect_count': collect_count,
  159. 'comment_count': comment_count,
  160. 'view_count': view_count,
  161. 'publish_time': publish_time,
  162. 'update_timestamp': update_timestamp,
  163. 'content_link': content_link,
  164. 'content_id': content_id,
  165. }
  166. }
  167. # 更新数据库
  168. success = self.update_multimodal_recognition(record['id'], complete_result)
  169. if success:
  170. self.logger.info(f"记录 {record['id']} 处理完成")
  171. return True
  172. else:
  173. self.logger.error(f"记录 {record['id']} 处理失败")
  174. return False
  175. except Exception as e:
  176. self.logger.error(f"处理记录失败: {e}")
  177. return False
  178. def process_all_records(self, max_records: int = 10):
  179. """处理多条记录"""
  180. self.logger.info(f"开始批量处理,最多处理 {max_records} 条记录")
  181. processed_count = 0
  182. success_count = 0
  183. for i in range(max_records):
  184. self.logger.info(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
  185. if self.process_single_record():
  186. success_count += 1
  187. else:
  188. self.logger.warning("没有更多记录需要处理,结束批量处理")
  189. break
  190. processed_count += 1
  191. # 添加延迟避免API限制
  192. time.sleep(2)
  193. self.logger.info(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
  194. def process_continuous(self, max_records: int = None, delay_seconds: int = 2):
  195. """连续处理记录,直到没有更多记录或达到最大数量限制"""
  196. self.logger.info("启动连续处理模式...")
  197. self.logger.info("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
  198. self.logger.info(f"处理间隔: {delay_seconds} 秒")
  199. if max_records:
  200. self.logger.info(f"最大处理数量: {max_records} 条")
  201. else:
  202. self.logger.info("无数量限制,将处理所有可用记录")
  203. self.logger.info("按 Ctrl+C 可以随时停止处理")
  204. self.logger.info("-" * 60)
  205. processed_count = 0
  206. success_count = 0
  207. consecutive_failures = 0
  208. max_consecutive_failures = 3 # 连续失败3次后停止
  209. try:
  210. while True:
  211. # 检查是否达到最大数量限制
  212. if max_records and processed_count >= max_records:
  213. self.logger.info(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
  214. break
  215. self.logger.info(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
  216. self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  217. # 处理单条记录
  218. if self.process_single_record():
  219. success_count += 1
  220. consecutive_failures = 0 # 重置连续失败计数
  221. self.logger.info(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
  222. else:
  223. consecutive_failures += 1
  224. self.logger.warning(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
  225. # 检查连续失败次数
  226. if consecutive_failures >= max_consecutive_failures:
  227. self.logger.warning(f"\n⚠️ 连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
  228. self.logger.info("停止连续处理")
  229. break
  230. processed_count += 1
  231. # 检查是否还有更多记录
  232. remaining_records = self.get_remaining_records_count()
  233. if remaining_records == 0:
  234. self.logger.info(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
  235. break
  236. self.logger.info(f"剩余待处理记录: {remaining_records} 条")
  237. # 添加延迟避免API限制
  238. if delay_seconds > 0:
  239. self.logger.info(f"等待 {delay_seconds} 秒后处理下一条记录...")
  240. time.sleep(delay_seconds)
  241. except KeyboardInterrupt:
  242. self.logger.info(f"\n\n⏹️ 用户中断处理")
  243. self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
  244. except Exception as e:
  245. self.logger.error(f"\n\n💥 处理过程中发生错误: {e}")
  246. self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
  247. self.logger.info(f"\n📊 连续处理完成!")
  248. self.logger.info(f"总处理数量: {processed_count}")
  249. self.logger.info(f"成功数量: {success_count}")
  250. self.logger.info(f"失败数量: {processed_count - success_count}")
  251. if processed_count > 0:
  252. success_rate = (success_count / processed_count) * 100
  253. self.logger.info(f"成功率: {success_rate:.1f}%")
  254. def get_remaining_records_count(self) -> int:
  255. """获取剩余待处理记录数量"""
  256. try:
  257. sql = "SELECT COUNT(*) FROM knowledge_search_content WHERE recognition_status = 0"
  258. result = self.db.get_values(sql)
  259. if result and len(result) > 0:
  260. return result[0][0]
  261. return 0
  262. except Exception as e:
  263. self.logger.error(f"获取剩余记录数量失败: {e}")
  264. return 0
  265. def main():
  266. """主函数"""
  267. parser = argparse.ArgumentParser(description='内容识别脚本 - 分析图片和视频内容')
  268. parser.add_argument('--single', action='store_true', help='只处理一条记录')
  269. parser.add_argument('--batch', type=int, default=10, help='批量处理记录数量,默认10条')
  270. parser.add_argument('--continuous', action='store_true', help='连续处理模式,自动处理所有可用记录')
  271. parser.add_argument('--max-records', type=int, help='连续处理模式下的最大处理数量限制')
  272. parser.add_argument('--delay', type=int, default=2, help='处理间隔时间(秒),默认2秒')
  273. args = parser.parse_args()
  274. try:
  275. # 创建ContentIdentifier实例
  276. identifier = ContentIdentifier()
  277. if args.single:
  278. # 处理单条记录
  279. identifier.process_single_record()
  280. elif args.continuous:
  281. # 连续处理模式
  282. identifier.process_continuous(args.max_records, args.delay)
  283. else:
  284. # 批量处理记录
  285. identifier.process_all_records(args.batch)
  286. except Exception as e:
  287. sys.stderr.write(f"程序执行失败: {e}\n")
  288. sys.exit(1)
  289. if __name__ == '__main__':
  290. main()