indentify.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 内容识别脚本
  5. 主要功能:
  6. 1. 从数据库中拉取一条 recognition_status = 0 的数据 :
  7. 2. 解析 formatted_content 中的图片和视频
  8. 3. 调用独立的图文识别和视频识别模块
  9. 4. 将识别结果更新到数据库
  10. """
  11. import os
  12. import json
  13. import time
  14. import sys
  15. import argparse
  16. from typing import Dict, Any, List, Optional
  17. from dotenv import load_dotenv
  18. from datetime import datetime
  19. # 导入自定义模块
  20. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  21. from utils.mysql_db import MysqlHelper
  22. from content_indentify.image_identifier import ImageIdentifier
  23. from content_indentify.video_identifier import VideoIdentifier
  24. from utils.logging_config import get_logger
  25. class ContentIdentifier:
  26. def __init__(self):
  27. # 加载环境变量
  28. load_dotenv()
  29. # 设置日志
  30. self.logger = get_logger('ContentIdentifier')
  31. # 初始化数据库连接
  32. self.db = MysqlHelper()
  33. # 初始化识别模块
  34. self.image_identifier = ImageIdentifier()
  35. self.video_identifier = VideoIdentifier()
  36. def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
  37. """从数据库获取一条未处理的数据
  38. 先从 knowledge_content_query 表中选取 category_id = 0 的所有 query_word,
  39. 然后用这些 query_word 去 knowledge_search_content 表中匹配,
  40. 找出 recognition_status = 0 的一条开始处理
  41. """
  42. try:
  43. # 第一步:获取 category_id = 0 的所有 query_word
  44. query_sql = """
  45. SELECT query_word
  46. FROM knowledge_content_query
  47. WHERE category_id = 0
  48. """
  49. query_result = self.db.get_values(query_sql)
  50. if not query_result:
  51. self.logger.warning("未找到 category_id = 0 的 query_word")
  52. return None
  53. query_words = [row[0] for row in query_result]
  54. self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
  55. # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
  56. # 使用 IN 查询来匹配多个 query_word
  57. if len(query_words) > 0:
  58. # 构建带引号的查询条件,因为 query_word 是字符串
  59. quoted_words = [f"'{word}'" for word in query_words]
  60. placeholders = ','.join(quoted_words)
  61. content_sql = f"""
  62. SELECT id, formatted_content
  63. FROM knowledge_search_content
  64. WHERE recognition_status = 0
  65. AND query_word IN ({placeholders})
  66. LIMIT 1
  67. """
  68. self.logger.info(f"执行查询: {content_sql}")
  69. # 不需要传递参数,因为SQL已经包含了具体的值
  70. result = self.db.get_values(content_sql)
  71. else:
  72. self.logger.warning("没有可用的 query_word 进行匹配")
  73. return None
  74. if result and len(result) > 0:
  75. record = result[0]
  76. # 检查返回的字段数量
  77. return {
  78. 'id': record[0],
  79. 'formatted_content': record[1]
  80. }
  81. else:
  82. self.logger.info("未找到匹配 query_word 且 recognition_status = 0 的记录")
  83. return None
  84. except Exception as e:
  85. self.logger.error(f"获取未处理记录失败: {e}")
  86. return None
  87. def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
  88. """解析 formatted_content JSON 字符串"""
  89. try:
  90. if isinstance(formatted_content, str):
  91. return json.loads(formatted_content)
  92. elif isinstance(formatted_content, dict):
  93. return formatted_content
  94. else:
  95. raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
  96. except json.JSONDecodeError as e:
  97. self.logger.error(f"解析 formatted_content JSON 失败: {e}")
  98. raise
  99. def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
  100. """处理内容识别,调用独立的识别模块"""
  101. self.logger.info("开始内容识别处理...")
  102. # 图片识别
  103. image_result = self.image_identifier.process_images(formatted_content)
  104. # 视频识别
  105. video_result = self.video_identifier.process_videos(formatted_content)
  106. # 整合结果
  107. recognition_result = {
  108. 'image_analysis': image_result,
  109. 'video_analysis': video_result
  110. }
  111. self.logger.info(f"识别结果: {recognition_result}")
  112. return recognition_result
  113. def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:
  114. """更新数据库中的 multimodal_recognition 字段"""
  115. try:
  116. # 将结果转换为JSON字符串,并处理换行符问题
  117. result_json = json.dumps(recognition_result, ensure_ascii=False)
  118. # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
  119. result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
  120. # 构建更新SQL - 使用参数化查询避免换行符问题
  121. sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
  122. params = (result_json, record_id)
  123. # 执行更新
  124. result = self.db.update_values(sql, params)
  125. if result is not None:
  126. self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
  127. return True
  128. else:
  129. self.logger.error(f"更新记录 {record_id} 失败")
  130. return False
  131. except Exception as e:
  132. self.logger.error(f"更新数据库失败: {e}")
  133. return False
  134. def process_single_record(self) -> bool:
  135. """处理单条记录"""
  136. try:
  137. # 获取未处理的记录
  138. record = self.get_unprocessed_record()
  139. if not record:
  140. self.logger.warning("没有找到未处理的记录")
  141. return False
  142. self.logger.info(f"开始处理记录 ID: {record['id']}")
  143. # self.logger.info(f" 多模态识别: {record['multimodal_recognition'][:300]}...")
  144. # 先设置这条记录的 recognition_status = 1
  145. self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
  146. # 解析 formatted_content
  147. formatted_content = self.parse_formatted_content(record['formatted_content'])
  148. # 提取基本信息,处理 null 值
  149. title = formatted_content.get('title') or ''
  150. content = formatted_content.get('body_text') or ''
  151. channel = formatted_content.get('channel') or ''
  152. images = formatted_content.get('image_url_list') or []
  153. videos = formatted_content.get('video_url_list') or []
  154. author = formatted_content.get('channel_account_name') or ''
  155. like_count = formatted_content.get('like_count') or 0
  156. collect_count = formatted_content.get('collect_count') or 0
  157. comment_count = formatted_content.get('comment_count') or 0
  158. view_count = formatted_content.get('view_count') or 0
  159. publish_time = formatted_content.get('publish_time') or ''
  160. update_timestamp = formatted_content.get('update_timestamp') or ''
  161. content_link = formatted_content.get('content_link') or ''
  162. content_id = formatted_content.get('channel_content_id') or ''
  163. # 调用内容识别处理
  164. recognition_result = self.process_content_recognition(formatted_content)
  165. # 构建完整的识别结果
  166. complete_result = {
  167. 'id': record['id'],
  168. 'channel': channel,
  169. 'title': title,
  170. 'content': content,
  171. 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
  172. 'videos': recognition_result.get('video_analysis', {}),
  173. 'meta': {
  174. 'author': author,
  175. 'like_count': like_count,
  176. 'collect_count': collect_count,
  177. 'comment_count': comment_count,
  178. 'view_count': view_count,
  179. 'publish_time': publish_time,
  180. 'update_timestamp': update_timestamp,
  181. 'content_link': content_link,
  182. 'content_id': content_id,
  183. }
  184. }
  185. # 更新数据库
  186. success = self.update_multimodal_recognition(record['id'], complete_result)
  187. if success:
  188. self.logger.info(f"记录 {record['id']} 处理完成")
  189. return True
  190. else:
  191. self.logger.error(f"记录 {record['id']} 处理失败")
  192. return False
  193. except Exception as e:
  194. self.logger.error(f"处理记录失败: {e}")
  195. return False
  196. def process_all_records(self, max_records: int = 10):
  197. """处理多条记录"""
  198. self.logger.info(f"开始批量处理,最多处理 {max_records} 条记录")
  199. processed_count = 0
  200. success_count = 0
  201. for i in range(max_records):
  202. self.logger.info(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
  203. if self.process_single_record():
  204. success_count += 1
  205. else:
  206. self.logger.warning("没有更多记录需要处理,结束批量处理")
  207. break
  208. processed_count += 1
  209. # 添加延迟避免API限制
  210. time.sleep(2)
  211. self.logger.info(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
  212. def process_continuous(self, max_records: int = None, delay_seconds: int = 2):
  213. """连续处理记录,直到没有更多记录或达到最大数量限制"""
  214. self.logger.info("启动连续处理模式...")
  215. self.logger.info("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
  216. self.logger.info(f"处理间隔: {delay_seconds} 秒")
  217. if max_records:
  218. self.logger.info(f"最大处理数量: {max_records} 条")
  219. else:
  220. self.logger.info("无数量限制,将处理所有可用记录")
  221. self.logger.info("按 Ctrl+C 可以随时停止处理")
  222. self.logger.info("-" * 60)
  223. processed_count = 0
  224. success_count = 0
  225. consecutive_failures = 0
  226. max_consecutive_failures = 3 # 连续失败3次后停止
  227. try:
  228. while True:
  229. # 检查是否达到最大数量限制
  230. if max_records and processed_count >= max_records:
  231. self.logger.info(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
  232. break
  233. self.logger.info(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
  234. self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  235. # 处理单条记录
  236. if self.process_single_record():
  237. success_count += 1
  238. consecutive_failures = 0 # 重置连续失败计数
  239. self.logger.info(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
  240. else:
  241. consecutive_failures += 1
  242. self.logger.warning(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
  243. # 检查连续失败次数
  244. if consecutive_failures >= max_consecutive_failures:
  245. self.logger.warning(f"\n⚠️ 连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
  246. self.logger.info("停止连续处理")
  247. break
  248. processed_count += 1
  249. # 检查是否还有更多记录
  250. remaining_records = self.get_remaining_records_count()
  251. if remaining_records == 0:
  252. self.logger.info(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
  253. break
  254. self.logger.info(f"剩余待处理记录: {remaining_records} 条")
  255. # 添加延迟避免API限制
  256. if delay_seconds > 0:
  257. self.logger.info(f"等待 {delay_seconds} 秒后处理下一条记录...")
  258. time.sleep(delay_seconds)
  259. except KeyboardInterrupt:
  260. self.logger.info(f"\n\n⏹️ 用户中断处理")
  261. self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
  262. except Exception as e:
  263. self.logger.error(f"\n\n💥 处理过程中发生错误: {e}")
  264. self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
  265. self.logger.info(f"\n📊 连续处理完成!")
  266. self.logger.info(f"总处理数量: {processed_count}")
  267. self.logger.info(f"成功数量: {success_count}")
  268. self.logger.info(f"失败数量: {processed_count - success_count}")
  269. if processed_count > 0:
  270. success_rate = (success_count / processed_count) * 100
  271. self.logger.info(f"成功率: {success_rate:.1f}%")
  272. def get_remaining_records_count(self) -> int:
  273. """获取剩余待处理记录数量"""
  274. try:
  275. sql = "SELECT COUNT(*) FROM knowledge_search_content WHERE recognition_status = 0"
  276. result = self.db.get_values(sql)
  277. if result and len(result) > 0:
  278. return result[0][0]
  279. return 0
  280. except Exception as e:
  281. self.logger.error(f"获取剩余记录数量失败: {e}")
  282. return 0
  283. def main():
  284. """主函数"""
  285. parser = argparse.ArgumentParser(description='内容识别脚本 - 分析图片和视频内容')
  286. parser.add_argument('--single', action='store_true', help='只处理一条记录')
  287. parser.add_argument('--batch', type=int, default=10, help='批量处理记录数量,默认10条')
  288. parser.add_argument('--continuous', action='store_true', help='连续处理模式,自动处理所有可用记录')
  289. parser.add_argument('--max-records', type=int, help='连续处理模式下的最大处理数量限制')
  290. parser.add_argument('--delay', type=int, default=2, help='处理间隔时间(秒),默认2秒')
  291. args = parser.parse_args()
  292. try:
  293. # 创建ContentIdentifier实例
  294. identifier = ContentIdentifier()
  295. if args.single:
  296. # 处理单条记录
  297. identifier.process_single_record()
  298. elif args.continuous:
  299. # 连续处理模式
  300. identifier.process_continuous(args.max_records, args.delay)
  301. else:
  302. # 批量处理记录
  303. identifier.process_all_records(args.batch)
  304. except Exception as e:
  305. sys.stderr.write(f"程序执行失败: {e}\n")
  306. sys.exit(1)
  307. if __name__ == '__main__':
  308. main()