process_title.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. import json
  2. import time
  3. import traceback
  4. from typing import Optional, List, Dict, Tuple
  5. from applications.api import fetch_deepseek_completion
  6. from applications.utils import yield_batch
  7. from applications.tasks.llm_tasks.prompts import extract_article_features
  8. from applications.tasks.llm_tasks.prompts import extract_article_category
  9. from tqdm.asyncio import tqdm
  10. class Const:
  11. # title rewrite status
  12. TITLE_REWRITE_INIT_STATUS = 0
  13. TITLE_REWRITE_SUCCESS_STATUS = 1
  14. TITLE_REWRITE_FAIL_STATUS = 99
  15. TITLE_REWRITE_LOCK_STATUS = 101
  16. # article status
  17. ARTICLE_AUDIT_PASSED_STATUS = 1
  18. ARTICLE_POSITIVE_STATUS = 0
  19. # title useful status
  20. TITLE_USEFUL_STATUS = 1
  21. # prompt version
  22. PROMPT_VERSION = "xx_250228" # 信欣2025-02-28提供
  23. # block expire time 1h
  24. TITLE_REWRITE_LOCK_TIME = 60 * 60
  25. # task status
  26. INIT_STATUS = 0
  27. PROCESSING_STATUS = 1
  28. SUCCESS_STATUS = 2
  29. FAIL_STATUS = 99
  30. # max processing time
  31. MAX_PROCESSING_TIME = 3600
  32. # article_status
  33. ARTICLE_INIT_STATUS = 1
  34. ARTICLE_PUBLISHED_STATUS = 2
  35. ARTICLE_BAD_STATUS = 0
  36. # limit score
  37. LIMIT_SCORE = 0.4
  38. BATCH_SIZE = 20
  39. PROCESS_NUM = 1000
  40. class TitleProcess(Const):
  41. def __init__(self, pool, aliyun_log, trace_id):
  42. self.pool = pool
  43. self.aliyun_log = aliyun_log
  44. self.trace_id = trace_id
  45. @staticmethod
  46. def generate_title_rewrite_prompt(ori_title: str) -> str:
  47. """
  48. 生成prompt
  49. """
  50. prompt = f"""
  51. 请将以下标题改写成适合公众号中小程序点击和传播的文章标题,文章标题的写作规范如下,请学习后进行文章标题的编写。直接输出最终的文章标题,文章标题撰写规范如下:
  52. 1. 标题结构:要点前置,信息明确
  53. 核心信息前置:标题开头直接点出文章的核心内容或亮点,吸引读者注意。例如:
  54. “我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?”
  55. “亩产7000斤,被误认成萝卜却曾是‘救命粮’,如今成我国出口名蔬”。
  56. 简洁明了:标题通常在20字以内,信息集中且易于理解。
  57. 悬念前置结构:前半句设置反常/冲突场景(如"刑满释放蹬三轮")+后半句用结果反转制造悬念("政府领导登门分配工作")
  58. 多要素拼接:通过冒号/逗号分隔不同叙事主体(地域+人物冲突+权威评价),如"辽宁女子住高档小区被敲门,法院判决意外"
  59. 2. 情绪表达:激发共鸣,引发好奇
  60. 情感共鸣:通过情感化的语言触动读者,泪崩/守护/抱头痛哭等情感冲击词,配合家庭伦理场景
  61. 例如:
  62. “老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了”。
  63. “儿子卖车卖房给母亲治病,母亲去世后儿媳收拾房间,打开床底柜,儿子突然痛哭”。
  64. 悬念与好奇心:通过提问或制造悬念,激发读者点击欲望。例如:
  65. “你知道是哪五家吗?”
  66. “打开床底柜,儿子突然痛哭”。
  67. 冲突性情绪词:拍桌大骂/气愤不已/眼红不已/算计等强对抗性词汇
  68. 结果反差刺激:用"风光善终/价值过亿/判决意外"等违反预期的结果
  69. 3. 语言风格:口语化、接地气
  70. 口语化表达:使用通俗易懂的语言,贴近读者生活。
  71. 刻意使用"赶都赶不走/各吃各的/我就知道你在家"等市井化用语。
  72. 例如:
  73. “狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石”。
  74. “聪明的女人,不会帮婆家3种忙,而蠢女人才一再插手”。
  75. 接地气的词汇:使用“狗屎运”“蠢女人”等口语化词汇,增强亲切感。
  76. 身份反差构建:突出人物命运转折(老农→亿万富翁/囚犯→政府帮扶对象)
  77. 权威背书暗示:"专家气愤/法院判决/网友评价"等第三方视角增强可信度
  78. 4. 标点运用:增强语气,突出重点
  79. 问号与感叹号:通过问号制造悬念,感叹号强化情感。
  80. 在关键转折点使用("太气人了!/赔不了!")
  81. 问号制造互动:如"容嬷嬷是校花?"激发读者验证心理
  82. 例如:
  83. “你知道是哪五家吗?”
  84. “太无耻了!湖南,一名厨师被公司派到云南‘出差’被拒……”
  85. 引号与冒号:用于突出关键词或转折点。
  86. 破折号递进:用"——"引导关键信息("吃不完最好扔掉——")
  87. 例如:
  88. “被误认成萝卜却曾是‘救命粮’”。
  89. “女子归还后,失主拒绝支付报酬,还说:要有格局”。
  90. 5. 热点与话题性:结合社会热点或争议
  91. 社会热点:结合当前热点事件或争议话题,吸引关注。例如:
  92. “上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!”
  93. 争议性话题:通过争议性内容引发讨论。例如:
  94. “李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔”。
  95. 6. 数字与具体细节:增强可信度与吸引力
  96. 数字的运用:通过具体数字增强标题的可信度和吸引力。例如:
  97. “亩产7000斤”。
  98. “22年河南男子跳河救人,体力耗尽留遗言”。
  99. 细节描述:通过细节让标题更具画面感。例如:
  100. “打开床底柜,儿子突然痛哭”。
  101. “扒开后捡到鸡蛋大小的青鱼石”。
  102. 7. 价值诉求:传递实用信息或情感价值
  103. 实用信息:提供对读者有价值的信息。例如:
  104. “我国存款最安全的五大银行,永远都不会倒闭”。
  105. “72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花”。
  106. 情感价值:通过情感故事或人生哲理打动读者。例如:
  107. “父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待”。
  108. 8. 名人效应与历史情怀:增强吸引力
  109. 名人效应:提及名人或历史事件,吸引关注。例如:
  110. “难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美”。
  111. “1975年‘下馆子’的老照片,2元能吃些什么,勾起那段最难忘的时光”。
  112. 9.隐藏传播逻辑:通过标题中暗含的、能触发人性弱点(如猎奇、贪婪、同情)或社会痛点的心理机制,通过潜意识刺激读者点击欲望
  113. 人性弱点触发:贪婪(200万保单)、猎奇(林彪密件)、窥私(家庭算计)
  114. 生存焦虑关联:医疗(脑瘫儿)、养老(子女不孝)、食品安全(二次加热)
  115. 身份代入设计:选择"老太太/外甥女/退休母亲"等易引发群体共鸣的角色
  116. 输入的标题是: '{ori_title}'
  117. """
  118. return prompt
  119. @staticmethod
  120. def category_generation_from_title(title_list: List[Tuple[str, str]]) -> str:
  121. """
  122. generate prompt category for given title
  123. """
  124. prompt = f"""
  125. 请帮我完成以下任务:输入为文章的标题,根据标题判断其内容所属的类目,输出为文章标题及其对应的类目。
  126. 类目需从以下15个品类内选择:
  127. 1. 知识科普
  128. 定义:以通俗易懂的方式普及科学、技术、健康、安全、生活常识、财产保护、医保政策、为人处事方式等内容,旨在提高公众的知识水平和认知能力。内容通常具有教育性和实用性,涵盖自然、社会、文化等多个领域。
  129. 标题示例:
  130. 我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?
  131. 借条上不要写“这3个字”,不然变成一张废纸,否则用法律也没用
  132. 不能二次加热的3种食物!再次提醒:这3种食物吃不完最好扔掉
  133. 2. 军事历史
  134. 定义:聚焦于历史上的军事事件、战争故事、军事策略、英雄人物等内容,旨在还原战争场景、探讨军事决策、揭示历史真相,并展现战争中的人物命运与历史影响。内容通常以叙事、分析或回忆的形式呈现,兼具历史深度和故事性。
  135. 标题示例:
  136. 对越作战永远失踪的332人,陵园没有墓碑,没有名字,只有烈士证
  137. 淮海大战丢失阵地,师长带头冲锋!最后出一口恶气:活捉敌最高指挥官
  138. 抗战时,一村民被敌拉去带路,半道回头忽发现:后面跟个游击队员
  139. 3. 家长里短
  140. 定义:围绕家庭成员之间的关系、矛盾、情感、道德、等展开的故事或讨论,内容常涉及婚姻、亲子、婆媳、兄弟姐妹等关系,或是人情往来、金钱纠纷、情感变化等内容,反映家庭生活中的温情、冲突与人性。
  141. 标题示例:
  142. 父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待
  143. 老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了
  144. 我花150一天雇了阿姨,两天后上班回来给她300,阿姨说我账算错了
  145. 4. 社会法治
  146. 定义:聚焦社会事件、法律纠纷、法院判决、社会现象等内容,通常涉及道德、法律、公平正义等议题,旨在揭示社会问题、探讨法律规则或反映人性与社会现实。
  147. 标题示例:
  148. 山东,女子在小区捡到16万天价项链,业主悬赏3万找回,女子归还后,失主拒绝支付报酬,还说:要有格局,女子认为被骗,将失主告上法庭
  149. 陕西,女子22万买26层房,2年后,楼盘24层就已经封顶!开发商:你闹事造成100万损失,道歉才给赔偿!
  150. 上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!
  151. 5. 奇闻趣事
  152. 定义:以猎奇、娱乐为主,涵盖罕见、奇特、有趣的事件、发现或故事,内容通常具有趣味性和话题性,能够引发读者的好奇心和讨论。
  153. 标题示例:
  154. 狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石,网友:起码值几千!
  155. 内蒙古小伙河边捡到金牌,拒绝上交将其熔成金手镯,专家气愤不已
  156. 男子买了一辆废弃坦克,拆油箱时,他发现了一根又一根的金条……
  157. 6. 名人八卦
  158. 定义:围绕名人的生活、言论、事件、八卦等内容展开,通常涉及娱乐圈、政界、历史人物等,旨在满足公众对名人隐私和动态的好奇心。
  159. 标题示例:
  160. 难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美
  161. 心狠手辣的容嬷嬷年轻时是校花?看了照片后,网友直接闭嘴了!
  162. 李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔
  163. 7. 健康养生
  164. 定义:关注健康、养生、疾病预防、生活习惯等方面的知识和建议,内容通常具有实用性和指导性,旨在帮助读者改善生活质量、提升健康水平。
  165. 标题示例:
  166. 72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花
  167. 40岁女子每天吃水煮蛋,一年后去体检,检查报告令医生都羡慕不已
  168. 2024年血糖新标准已公布,不再是3.9~6.1,你的血糖还不算高吗?
  169. 8. 情感故事
  170. 定义:以人与人之间的情感交流、感人故事、情感经历为主题,内容通常充满温情、感动或反思,旨在引发读者的情感共鸣和思考。
  171. 标题示例:
  172. 男孩饭店吃饭,发现陌生女子和去世母亲很像,走过去说:我妈妈去世了,能抱一下我吗?
  173. 河南一女子直播时,被失散 32 年的父亲认出:闺女等着爸爸接你回家
  174. 1987年,江苏男子借好友一千元,25年后朋友成富豪还他1000万报恩
  175. 流浪狗跟着骑行夫妻跑了一百多公里,一直守护在女主身边,赶都赶不走,当男主得知原因后竟抱着狗狗大哭起来
  176. 9. 国家大事
  177. 定义:涉及国家实力、科技发展、资源发现、国际合作等内容,通常以宏观视角展现国家的综合实力、科技成就或国际影响力,体现国家的崛起与发展。
  178. 标题示例:
  179. 我国在南极发现“海上粮仓”,储量高达10亿吨,世界各国眼红不已
  180. 我国贵州发现7000万吨宝藏,价值高达上万亿,多国求合作被拒绝
  181. 距我国3000公里,塞班岛明明归美国管辖,为何岛上大多是中国人?
  182. 10. 现代人物
  183. 定义:聚焦活跃在21世纪后具有传奇色彩或巨大贡献的人物、事迹、成就等,内容通常充满戏剧性和启发性,旨在展现人物的非凡经历或历史贡献。
  184. 标题示例:
  185. 她曾狂贪国家上百亿,被发现时已经移居美国,最终还风光一时得善终
  186. 山东女子因坐月子无聊,破译美国2套绝密系统的密码,国家:奖励711万!
  187. 牺牲太大了!航天女英雄刘洋:结婚8年未生子,回地面后“消失”的她怎样了?
  188. 11. 怀旧时光
  189. 定义:以回忆和怀旧为主题,涉及过去的历史、文化、生活、照片等内容,旨在唤起读者对过去时光的情感共鸣和怀念。
  190. 标题示例:
  191. 1975年“下馆子”的老照片,2元能吃些什么,勾起那段最难忘的时光
  192. 82年,北京老人捡回两张“破椅子”,遭家人数落,29年后拍出2300万
  193. 这张老照片第一次看到,邓颖超和李讷的罕见合影!
  194. 12. 政治新闻
  195. 定义:聚焦政治事件、领导人动态、国际关系等内容,通常以新闻或分析的形式呈现,旨在揭示政治局势、政策变化或国际关系的动态。
  196. 标题示例:
  197. 中方外长行程有变,提前结束访欧匆匆回国,带回来一个好消息
  198. 宋庆龄在北京逝世后,远在美国的宋美龄只说了7个字,字字揪心!
  199. 庐山会议后,叶帅去劝彭德怀认个错,哭着说了一句心里话
  200. 13. 历史人物
  201. 定义:聚焦于21世纪前具有重要影响的人物,包括他们的生平、事迹、成就、性格、趣事及其对历史进程的贡献。内容通常以传记、回忆录或历史分析的形式呈现,旨在还原人物的真实面貌并探讨其历史意义。
  202. 标题示例:
  203. 林彪去世后,蒋介石收到林彪与戴笠的一份密谈文件,看后拍桌大骂
  204. 张学良软禁时的一张实拍照片,头发秃顶,两眼无光,像个中年老头
  205. 1912年,孙中山和两个女儿罕见留影,面对镜头父女三人看起来很幸福
  206. 14. 社会现象
  207. 定义:关注社会中出现的普遍现象、趋势或问题,通常涉及文化、经济、教育、民生等领域。内容以观察、分析或评论为主,旨在揭示现象背后的原因、影响及社会意义,引发公众的思考和讨论。
  208. 标题示例:
  209. 22年河南男子跳河救人,体力耗尽留遗言,被救女子猛然抓住他:一起走
  210. 浙江一老人刑满释放,靠蹬三轮为生,6年后,政府领导登门拜访:我们帮您分配工作
  211. 儿子收到清华通知书,父亲花5万请全村吃席,镇长看一眼竟说:这是假的
  212. 15.财经科技
  213. 定义:聚焦于经济、金融、投资及行业发展的分析与预测,涵盖未来经济趋势、资产价值变化、行业变革及个人理财策略等内容。可以提供前瞻性的财经视角和实用的理财建议,帮助其把握经济动态、优化财务规划并应对行业变化。
  214. 标题示例:
  215. 未来10年,现金和房子都将贬值,只有2样东西最值钱
  216. 外卖时代将被终结?一个全新行业正悄悄取代外卖,你准备好了吗?
  217. 准备存款的一定要知道,今明两年,定期存款要记住“4不存”
  218. 输入是一个 LIST, LIST 中的每个元素是一个元组,元组的第一个元素是文章的 ID,第二个元素是文章的标题。
  219. 最后输出结果请用JSON格式输出,key为ID,value为品类,仅输出JSON,不要markdown格式,不要任何其他内容
  220. 如果标题中包含半角双引号,则进行转义
  221. 输入的 LIST 是 {title_list}
  222. 检查你的输出,输出的品类一定需要是输入的 15个品类,只需要输出品类的中文汉字
  223. """
  224. return prompt
  225. async def _roll_back_lock_tasks(self, table_name: str) -> int:
  226. query = f"""
  227. update {table_name}
  228. set category_status = %s
  229. where category_status = %s and category_status_update_ts <= %s;
  230. """
  231. return await self.pool.async_save(
  232. query=query,
  233. params=(
  234. self.INIT_STATUS,
  235. self.PROCESSING_STATUS,
  236. int(time.time()) - self.MAX_PROCESSING_TIME,
  237. ),
  238. )
  239. async def process_single_article(
  240. self, content_type: str, article: Dict
  241. ) -> Optional[Dict]:
  242. match content_type:
  243. case "video":
  244. article_id = article["id"]
  245. title = article["article_title"]
  246. case "article":
  247. article_id = article["article_id"]
  248. title = article["title"]
  249. case _:
  250. raise ValueError("content type is not supported")
  251. title_batch = [(article_id, title)]
  252. prompt = self.category_generation_from_title(title_batch)
  253. try:
  254. completion = fetch_deepseek_completion(
  255. model="DeepSeek-V3", prompt=prompt, output_type="json"
  256. )
  257. return completion
  258. except Exception as e:
  259. await self.aliyun_log.log(
  260. contents={
  261. "trace_id": self.trace_id,
  262. "data": {
  263. "article_id": article_id,
  264. "error": str(e),
  265. "traceback": traceback.format_exc(),
  266. },
  267. }
  268. )
  269. return None
  270. class VideoPoolCategoryGeneration:
  271. pass
  272. class ArticlePoolCategoryGeneration(TitleProcess):
  273. def __init__(self, pool, aliyun_log, trace_id):
  274. super().__init__(pool, aliyun_log, trace_id)
  275. async def lock_task(self, article_id_tuple: tuple[int, ...]) -> int:
  276. update_query = f"""
  277. update long_articles.crawler_meta_article
  278. set category_status = %s, category_status_update_ts = %s
  279. where article_id in %s and category_status = %s;
  280. """
  281. return await self.pool.async_save(
  282. query=update_query,
  283. params=(
  284. self.PROCESSING_STATUS,
  285. int(time.time()),
  286. article_id_tuple,
  287. self.INIT_STATUS,
  288. ),
  289. )
  290. async def get_task_list(self, limit):
  291. query = f"""
  292. select article_id, title from crawler_meta_article
  293. where category_status = %s and status = %s and score > %s
  294. order by score desc limit %s;
  295. """
  296. return await self.pool.async_fetch(
  297. query=query,
  298. params=(
  299. self.INIT_STATUS,
  300. self.ARTICLE_INIT_STATUS,
  301. self.LIMIT_SCORE,
  302. limit,
  303. ),
  304. )
  305. async def set_category_status_as_success(
  306. self, article_id: int, category: str
  307. ) -> int:
  308. update_query = f"""
  309. update long_articles.crawler_meta_article
  310. set category_by_ai = %s, category_status = %s, category_status_update_ts = %s
  311. where article_id = %s and category_status = %s;
  312. """
  313. return await self.pool.async_save(
  314. query=update_query,
  315. params=(
  316. category,
  317. self.SUCCESS_STATUS,
  318. int(time.time()),
  319. article_id,
  320. self.PROCESSING_STATUS,
  321. ),
  322. )
  323. async def set_category_status_as_fail(self, article_id: int) -> int:
  324. update_query = f"""
  325. update long_articles.crawler_meta_article
  326. set category_status = %s, category_status_update_ts = %s
  327. where article_id = %s and category_status = %s;
  328. """
  329. return await self.pool.async_save(
  330. query=update_query,
  331. params=(
  332. self.FAIL_STATUS,
  333. int(time.time()),
  334. article_id,
  335. self.PROCESSING_STATUS,
  336. ),
  337. )
  338. async def process_each_batch(self, task_batch):
  339. title_batch = [(i["article_id"], i["title"]) for i in task_batch]
  340. id_tuple = tuple([int(i["article_id"]) for i in task_batch])
  341. if await self.lock_task(id_tuple):
  342. prompt = self.category_generation_from_title(title_batch)
  343. try:
  344. completion = fetch_deepseek_completion(
  345. model="DeepSeek-V3", prompt=prompt, output_type="json"
  346. )
  347. for article in title_batch:
  348. article_id = article[0]
  349. category = completion.get(str(article_id))
  350. if category:
  351. await self.set_category_status_as_success(article_id, category)
  352. else:
  353. await self.set_category_status_as_fail(article_id)
  354. except Exception as e:
  355. await self.aliyun_log.log(
  356. contents={
  357. "task": "ArticlePoolCategoryGeneration",
  358. "function": "process_each_batch",
  359. "message": "batch 中存在敏感词,AI 拒绝返回",
  360. "status": "fail",
  361. "trace_id": self.trace_id,
  362. "data": {
  363. "article_id": id_tuple,
  364. "error": str(e),
  365. "traceback": traceback.format_exc(),
  366. },
  367. }
  368. )
  369. for article in task_batch:
  370. single_completion = await self.process_single_article(
  371. content_type="article", article=article
  372. )
  373. article_id = article["article_id"]
  374. if single_completion:
  375. category = single_completion.get(str(article_id))
  376. if category:
  377. # set as success
  378. await self.set_category_status_as_success(
  379. article_id, category
  380. )
  381. else:
  382. await self.set_category_status_as_fail(article_id)
  383. else:
  384. # set as fail
  385. await self.set_category_status_as_fail(article_id)
  386. return
  387. else:
  388. return
  389. async def deal(self, limit):
  390. # await self._roll_back_lock_tasks(table_name="crawler_meta_article")
  391. if not limit:
  392. limit = self.PROCESS_NUM
  393. task_list = await self.get_task_list(limit=limit)
  394. print(type(task_list))
  395. await self.aliyun_log.log(
  396. contents={
  397. "task": "ArticlePoolCategoryGeneration",
  398. "function": "deal",
  399. "trace_id": self.trace_id,
  400. "message": f"总共获取{len(task_list)}条文章",
  401. }
  402. )
  403. print(task_list)
  404. task_batch_list = yield_batch(data=task_list, batch_size=self.BATCH_SIZE)
  405. batch_index = 0
  406. for task_batch in task_batch_list:
  407. batch_index += 1
  408. try:
  409. await self.process_each_batch(task_batch)
  410. print(f"batch :{batch_index} 处理成功")
  411. except Exception as e:
  412. await self.aliyun_log.log(
  413. contents={
  414. "task": "ArticlePoolCategoryGeneration",
  415. "function": "deal",
  416. "message": f"batch {batch_index} processed failed",
  417. "status": "fail",
  418. "trace_id": self.trace_id,
  419. "data": {
  420. "error": str(e),
  421. "traceback": traceback.format_exc(),
  422. },
  423. }
  424. )
  425. class TitleRewrite(TitleProcess):
  426. async def roll_back_blocked_tasks(self):
  427. """
  428. rollback blocked tasks
  429. """
  430. query = f"""
  431. select id, title_rewrite_status_update_timestamp
  432. from publish_single_video_source
  433. where title_rewrite_status = %s;
  434. """
  435. article_list = await self.pool.async_fetch(
  436. query=query, params=(self.TITLE_REWRITE_LOCK_STATUS,)
  437. )
  438. if article_list:
  439. blocked_id_list = [
  440. i["id"]
  441. for i in article_list
  442. if (int(time.time()) - i["title_rewrite_status_update_timestamp"])
  443. > self.TITLE_REWRITE_LOCK_TIME
  444. ]
  445. if blocked_id_list:
  446. update_query = """
  447. update publish_single_video_source
  448. set title_rewrite_status = %s
  449. where id in %s and title_rewrite_status = %s;
  450. """
  451. await self.pool.async_save(
  452. query=update_query,
  453. params=(
  454. self.TITLE_REWRITE_INIT_STATUS,
  455. tuple(blocked_id_list),
  456. self.TITLE_REWRITE_LOCK_STATUS,
  457. ),
  458. )
  459. async def get_articles_batch(self, batch_size=1000):
  460. query = f"""
  461. select content_trace_id, article_title
  462. from publish_single_video_source
  463. where bad_status = {self.ARTICLE_POSITIVE_STATUS}
  464. and audit_status = {self.ARTICLE_AUDIT_PASSED_STATUS}
  465. and title_rewrite_status = {self.TITLE_REWRITE_INIT_STATUS}
  466. and platform in ('hksp', 'sph')
  467. limit {batch_size};
  468. """
  469. return await self.pool.async_fetch(query=query, db_name="long_articles")
  470. async def update_title_rewrite_status(
  471. self, content_trace_id, ori_status, new_status
  472. ):
  473. query = f"""
  474. update publish_single_video_source
  475. set title_rewrite_status = %s, title_rewrite_status_update_timestamp = %s
  476. where content_trace_id = %s and title_rewrite_status= %s;
  477. """
  478. affected_rows = await self.pool.async_save(
  479. query=query,
  480. params=(new_status, int(time.time()), content_trace_id, ori_status),
  481. )
  482. return affected_rows
  483. async def insert_into_rewrite_table(self, content_trace_id, new_title):
  484. """
  485. insert into rewrite_table
  486. """
  487. insert_sql = f"""
  488. insert into video_title_rewrite
  489. (content_trace_id, new_title, status, prompt_version)
  490. values (%s, %s, %s, %s);
  491. """
  492. await self.pool.async_save(
  493. query=insert_sql,
  494. params=(
  495. content_trace_id,
  496. new_title,
  497. self.TITLE_USEFUL_STATUS,
  498. self.PROMPT_VERSION,
  499. ),
  500. )
  501. async def rewrite_each_article(self, article):
  502. """
  503. rewrite each article
  504. """
  505. content_trace_id = article["content_trace_id"]
  506. article_title = article["article_title"]
  507. # lock each task
  508. affected_rows = await self.update_title_rewrite_status(
  509. content_trace_id=content_trace_id,
  510. ori_status=self.TITLE_REWRITE_INIT_STATUS,
  511. new_status=self.TITLE_REWRITE_LOCK_STATUS,
  512. )
  513. if not affected_rows:
  514. return
  515. try:
  516. prompt = self.generate_title_rewrite_prompt(article_title)
  517. new_title = fetch_deepseek_completion(model="default", prompt=prompt)
  518. # insert into rewrite table
  519. await self.insert_into_rewrite_table(
  520. content_trace_id=content_trace_id, new_title=new_title
  521. )
  522. # unlock
  523. await self.update_title_rewrite_status(
  524. content_trace_id=content_trace_id,
  525. ori_status=self.TITLE_REWRITE_LOCK_STATUS,
  526. new_status=self.TITLE_REWRITE_SUCCESS_STATUS,
  527. )
  528. except Exception as e:
  529. await self.aliyun_log.log(
  530. contents={
  531. "task": "title rewrite task",
  532. "function": "rewrite_each_article",
  533. "message": content_trace_id,
  534. "status": "fail",
  535. "data": {
  536. "error_message": str(e),
  537. "error_type": type(e).__name__,
  538. "traceback": traceback.format_exc(),
  539. },
  540. }
  541. )
  542. await self.update_title_rewrite_status(
  543. content_trace_id=content_trace_id,
  544. ori_status=self.TITLE_REWRITE_LOCK_STATUS,
  545. new_status=self.TITLE_REWRITE_FAIL_STATUS,
  546. )
  547. async def deal(self):
  548. """title rewrite task deal"""
  549. await self.roll_back_blocked_tasks()
  550. task_list = await self.get_articles_batch()
  551. for article in task_list:
  552. await self.rewrite_each_article(article)
  553. class ExtractTitleFeatures(Const):
  554. def __init__(self, pool, aliyun_log, trace_id):
  555. self.pool = pool
  556. self.aliyun_log = aliyun_log
  557. self.trace_id = trace_id
  558. async def get_tasks(self, version: int, batch_size=100):
  559. query = """
  560. select id, title
  561. from title_features
  562. where status = %s and version = %s
  563. limit %s;
  564. """
  565. return await self.pool.async_fetch(
  566. query=query, params=(self.INIT_STATUS, version, batch_size)
  567. )
  568. async def update_status(self, title_id, ori_status, new_status):
  569. query = """
  570. UPDATE title_features
  571. SET status = %s
  572. WHERE id = %s AND status = %s;
  573. """
  574. return await self.pool.async_save(
  575. query=query,
  576. params=(new_status, title_id, ori_status),
  577. )
  578. async def update_status_batch(self, title_id_list, ori_status, new_status):
  579. query = """
  580. UPDATE title_features
  581. SET status = %s
  582. WHERE id in %s
  583. AND status = %s;
  584. """
  585. return await self.pool.async_save(
  586. query=query,
  587. params=(new_status, tuple(title_id_list), ori_status),
  588. )
  589. async def set_feature_for_each_title(self, title_id, feature_dict):
  590. query = """
  591. UPDATE title_features
  592. SET category = %s, era = %s, characters = %s, emotions = %s,
  593. is_strong_reverse = %s, is_person_comment = %s, title_type = %s,
  594. status = %s
  595. WHERE id = %s and status = %s;
  596. """
  597. return await self.pool.async_save(
  598. query=query,
  599. params=(
  600. feature_dict["category"],
  601. feature_dict["era"],
  602. json.dumps(feature_dict["characters"], ensure_ascii=False),
  603. json.dumps(feature_dict["emotions"], ensure_ascii=False),
  604. 1 if feature_dict.get("structure", {}).get("is_strong_reverse") else 0,
  605. 1 if feature_dict.get("structure", {}).get("is_person_comment") else 0,
  606. feature_dict["structure"]["title_type"],
  607. self.SUCCESS_STATUS,
  608. title_id,
  609. self.PROCESSING_STATUS,
  610. ),
  611. )
  612. async def set_category_for_each_title(self, title_id, category):
  613. query = """
  614. UPDATE title_features
  615. SET category = %s, status = %s
  616. WHERE id = %s and status = %s;
  617. """
  618. return await self.pool.async_save(
  619. query=query,
  620. params=(category, self.SUCCESS_STATUS, title_id, self.PROCESSING_STATUS),
  621. )
  622. async def get_title_features(self, task_list: list):
  623. title_list = [i["title"] for i in task_list]
  624. id_list = [i["id"] for i in task_list]
  625. title_id_map = {i["title"]: i["id"] for i in task_list}
  626. prompt = extract_article_features(title_list)
  627. # 设置状态为处理中
  628. await self.update_status_batch(
  629. id_list, self.INIT_STATUS, self.PROCESSING_STATUS
  630. )
  631. try:
  632. feature_dict = fetch_deepseek_completion(
  633. model="default", prompt=prompt, output_type="json"
  634. )
  635. except Exception as e:
  636. await self.aliyun_log.log(
  637. contents={
  638. "task": "extract_title_features",
  639. "function": "deal",
  640. "message": "fetch deepseek completion failed",
  641. "status": "fail",
  642. "data": {
  643. "error_message": str(e),
  644. "error_type": type(e).__name__,
  645. "traceback": traceback.format_exc(),
  646. },
  647. }
  648. )
  649. await self.update_status_batch(
  650. id_list, self.PROCESSING_STATUS, self.FAIL_STATUS
  651. )
  652. return
  653. if not feature_dict:
  654. await self.aliyun_log.log(
  655. contents={
  656. "task": "extract_title_features",
  657. "function": "deal",
  658. "message": "fetch deepseek completion return empty",
  659. "status": "fail",
  660. "data": {
  661. "error_message": "fetch deepseek completion return empty",
  662. "error_type": "EmptyResponseError",
  663. "traceback": traceback.format_exc(),
  664. },
  665. }
  666. )
  667. await self.update_status_batch(
  668. id_list, self.PROCESSING_STATUS, self.FAIL_STATUS
  669. )
  670. return
  671. for title in tqdm(title_list):
  672. features = feature_dict.get(title, {})
  673. if not features:
  674. continue
  675. title_id = title_id_map[title]
  676. await self.set_feature_for_each_title(title_id, features)
  677. async def get_title_category(self, task_list: list):
  678. title_list = [i["title"] for i in task_list]
  679. id_list = [i["id"] for i in task_list]
  680. title_id_map = {i["title"]: i["id"] for i in task_list}
  681. generate_category_prompt = extract_article_category(title_list)
  682. # 设置状态为处理中
  683. await self.update_status_batch(
  684. id_list, self.INIT_STATUS, self.PROCESSING_STATUS
  685. )
  686. try:
  687. category_dict = fetch_deepseek_completion(
  688. model="DeepSeek-V3", prompt=generate_category_prompt, output_type="json"
  689. )
  690. except Exception as e:
  691. await self.aliyun_log.log(
  692. contents={
  693. "task": "extract_title_category",
  694. "function": "deal",
  695. "message": "fetch deepseek completion failed",
  696. "status": "fail",
  697. "data": {
  698. "error_message": str(e),
  699. "error_type": type(e).__name__,
  700. "traceback": traceback.format_exc(),
  701. },
  702. }
  703. )
  704. await self.update_status_batch(
  705. id_list, self.PROCESSING_STATUS, self.FAIL_STATUS
  706. )
  707. return
  708. if not category_dict:
  709. await self.aliyun_log.log(
  710. contents={
  711. "task": "extract_title_category",
  712. "function": "deal",
  713. "message": "fetch deepseek completion return empty",
  714. "status": "fail",
  715. "data": {
  716. "error_message": "fetch deepseek completion return empty",
  717. "error_type": "EmptyResponseError",
  718. "traceback": traceback.format_exc(),
  719. },
  720. }
  721. )
  722. await self.update_status_batch(
  723. id_list, self.PROCESSING_STATUS, self.FAIL_STATUS
  724. )
  725. return
  726. for title in tqdm(title_list):
  727. category = category_dict.get(title, {})
  728. if not category:
  729. continue
  730. title_id = title_id_map[title]
  731. await self.set_category_for_each_title(title_id, category)
  732. async def deal(self, data):
  733. batch_size = data.get("batch_size", 50)
  734. version = data.get("version", 1)
  735. task_list = await self.get_tasks(version=version, batch_size=batch_size)
  736. match version:
  737. case 1:
  738. await self.get_title_features(task_list)
  739. case 2:
  740. await self.get_title_category(task_list)
  741. case _:
  742. await self.aliyun_log.log(
  743. contents={
  744. "task": "extract_title_features",
  745. "function": "deal",
  746. "message": "version not supported",
  747. "status": "fail",
  748. "data": {
  749. "error_message": "version not supported",
  750. "error_type": "VersionNotSupportedError",
  751. "traceback": traceback.format_exc(),
  752. },
  753. }
  754. )