import time import traceback from tqdm import tqdm from applications.api import fetch_deepseek_completion from applications.utils import yield_batch class Const: # title rewrite status TITLE_REWRITE_INIT_STATUS = 0 TITLE_REWRITE_SUCCESS_STATUS = 1 TITLE_REWRITE_FAIL_STATUS = 99 TITLE_REWRITE_LOCK_STATUS = 101 # article status ARTICLE_AUDIT_PASSED_STATUS = 1 ARTICLE_POSITIVE_STATUS = 0 # title useful status TITLE_USEFUL_STATUS = 1 # prompt version PROMPT_VERSION = "xx_250228" # 信欣2025-02-28提供 # block expire time 1h TITLE_REWRITE_LOCK_TIME = 60 * 60 # task status INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAIL_STATUS = 99 # max processing time MAX_PROCESSING_TIME = 3600 # article_status ARTICLE_INIT_STATUS = 1 ARTICLE_PUBLISHED_STATUS = 2 ARTICLE_BAD_STATUS = 0 # limit score LIMIT_SCORE = 0.4 BATCH_SIZE = 20 class TitleProcess(Const): def __init__(self, pool, aliyun_log, trace_id): self.pool = pool self.aliyun_log = aliyun_log self.trace_id = trace_id @staticmethod def generate_title_rewrite_prompt(ori_title): """ 生成prompt """ prompt = f""" 请将以下标题改写成适合公众号中小程序点击和传播的文章标题,文章标题的写作规范如下,请学习后进行文章标题的编写。直接输出最终的文章标题,文章标题撰写规范如下: 1. 标题结构:要点前置,信息明确 核心信息前置:标题开头直接点出文章的核心内容或亮点,吸引读者注意。例如: “我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?” “亩产7000斤,被误认成萝卜却曾是‘救命粮’,如今成我国出口名蔬”。 简洁明了:标题通常在20字以内,信息集中且易于理解。 悬念前置结构:前半句设置反常/冲突场景(如"刑满释放蹬三轮")+后半句用结果反转制造悬念("政府领导登门分配工作") 多要素拼接:通过冒号/逗号分隔不同叙事主体(地域+人物冲突+权威评价),如"辽宁女子住高档小区被敲门,法院判决意外" 2. 情绪表达:激发共鸣,引发好奇 情感共鸣:通过情感化的语言触动读者,泪崩/守护/抱头痛哭等情感冲击词,配合家庭伦理场景 例如: “老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了”。 “儿子卖车卖房给母亲治病,母亲去世后儿媳收拾房间,打开床底柜,儿子突然痛哭”。 悬念与好奇心:通过提问或制造悬念,激发读者点击欲望。例如: “你知道是哪五家吗?” “打开床底柜,儿子突然痛哭”。 冲突性情绪词:拍桌大骂/气愤不已/眼红不已/算计等强对抗性词汇 结果反差刺激:用"风光善终/价值过亿/判决意外"等违反预期的结果 3. 语言风格:口语化、接地气 口语化表达:使用通俗易懂的语言,贴近读者生活。 刻意使用"赶都赶不走/各吃各的/我就知道你在家"等市井化用语。 例如: “狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石”。 “聪明的女人,不会帮婆家3种忙,而蠢女人才一再插手”。 接地气的词汇:使用“狗屎运”“蠢女人”等口语化词汇,增强亲切感。 身份反差构建:突出人物命运转折(老农→亿万富翁/囚犯→政府帮扶对象) 权威背书暗示:"专家气愤/法院判决/网友评价"等第三方视角增强可信度 4. 标点运用:增强语气,突出重点 问号与感叹号:通过问号制造悬念,感叹号强化情感。 在关键转折点使用("太气人了!/赔不了!") 问号制造互动:如"容嬷嬷是校花?"激发读者验证心理 例如: “你知道是哪五家吗?” “太无耻了!湖南,一名厨师被公司派到云南‘出差’被拒……” 引号与冒号:用于突出关键词或转折点。 破折号递进:用"——"引导关键信息("吃不完最好扔掉——") 例如: “被误认成萝卜却曾是‘救命粮’”。 “女子归还后,失主拒绝支付报酬,还说:要有格局”。 5. 热点与话题性:结合社会热点或争议 社会热点:结合当前热点事件或争议话题,吸引关注。例如: “上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!” 争议性话题:通过争议性内容引发讨论。例如: “李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔”。 6. 数字与具体细节:增强可信度与吸引力 数字的运用:通过具体数字增强标题的可信度和吸引力。例如: “亩产7000斤”。 “22年河南男子跳河救人,体力耗尽留遗言”。 细节描述:通过细节让标题更具画面感。例如: “打开床底柜,儿子突然痛哭”。 “扒开后捡到鸡蛋大小的青鱼石”。 7. 价值诉求:传递实用信息或情感价值 实用信息:提供对读者有价值的信息。例如: “我国存款最安全的五大银行,永远都不会倒闭”。 “72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花”。 情感价值:通过情感故事或人生哲理打动读者。例如: “父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待”。 8. 名人效应与历史情怀:增强吸引力 名人效应:提及名人或历史事件,吸引关注。例如: “难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美”。 “1975年‘下馆子’的老照片,2元能吃些什么,勾起那段最难忘的时光”。 9.隐藏传播逻辑:通过标题中暗含的、能触发人性弱点(如猎奇、贪婪、同情)或社会痛点的心理机制,通过潜意识刺激读者点击欲望 人性弱点触发:贪婪(200万保单)、猎奇(林彪密件)、窥私(家庭算计) 生存焦虑关联:医疗(脑瘫儿)、养老(子女不孝)、食品安全(二次加热) 身份代入设计:选择"老太太/外甥女/退休母亲"等易引发群体共鸣的角色 输入的标题是: '{ori_title}' """ return prompt @staticmethod def category_generation_from_title(title_list): """ generate prompt category for given title """ prompt = f""" 请帮我完成以下任务:输入为文章的标题,根据标题判断其内容所属的类目,输出为文章标题及其对应的类目。 类目需从以下15个品类内选择: 1. 知识科普 定义:以通俗易懂的方式普及科学、技术、健康、安全、生活常识、财产保护、医保政策、为人处事方式等内容,旨在提高公众的知识水平和认知能力。内容通常具有教育性和实用性,涵盖自然、社会、文化等多个领域。 标题示例: 我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗? 借条上不要写“这3个字”,不然变成一张废纸,否则用法律也没用 不能二次加热的3种食物!再次提醒:这3种食物吃不完最好扔掉 2. 军事历史 定义:聚焦于历史上的军事事件、战争故事、军事策略、英雄人物等内容,旨在还原战争场景、探讨军事决策、揭示历史真相,并展现战争中的人物命运与历史影响。内容通常以叙事、分析或回忆的形式呈现,兼具历史深度和故事性。 标题示例: 对越作战永远失踪的332人,陵园没有墓碑,没有名字,只有烈士证 淮海大战丢失阵地,师长带头冲锋!最后出一口恶气:活捉敌最高指挥官 抗战时,一村民被敌拉去带路,半道回头忽发现:后面跟个游击队员 3. 家长里短 定义:围绕家庭成员之间的关系、矛盾、情感、道德、等展开的故事或讨论,内容常涉及婚姻、亲子、婆媳、兄弟姐妹等关系,或是人情往来、金钱纠纷、情感变化等内容,反映家庭生活中的温情、冲突与人性。 标题示例: 父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待 老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了 我花150一天雇了阿姨,两天后上班回来给她300,阿姨说我账算错了 4. 社会法治 定义:聚焦社会事件、法律纠纷、法院判决、社会现象等内容,通常涉及道德、法律、公平正义等议题,旨在揭示社会问题、探讨法律规则或反映人性与社会现实。 标题示例: 山东,女子在小区捡到16万天价项链,业主悬赏3万找回,女子归还后,失主拒绝支付报酬,还说:要有格局,女子认为被骗,将失主告上法庭 陕西,女子22万买26层房,2年后,楼盘24层就已经封顶!开发商:你闹事造成100万损失,道歉才给赔偿! 上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了! 5. 奇闻趣事 定义:以猎奇、娱乐为主,涵盖罕见、奇特、有趣的事件、发现或故事,内容通常具有趣味性和话题性,能够引发读者的好奇心和讨论。 标题示例: 狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石,网友:起码值几千! 内蒙古小伙河边捡到金牌,拒绝上交将其熔成金手镯,专家气愤不已 男子买了一辆废弃坦克,拆油箱时,他发现了一根又一根的金条…… 6. 名人八卦 定义:围绕名人的生活、言论、事件、八卦等内容展开,通常涉及娱乐圈、政界、历史人物等,旨在满足公众对名人隐私和动态的好奇心。 标题示例: 难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美 心狠手辣的容嬷嬷年轻时是校花?看了照片后,网友直接闭嘴了! 李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔 7. 健康养生 定义:关注健康、养生、疾病预防、生活习惯等方面的知识和建议,内容通常具有实用性和指导性,旨在帮助读者改善生活质量、提升健康水平。 标题示例: 72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花 40岁女子每天吃水煮蛋,一年后去体检,检查报告令医生都羡慕不已 2024年血糖新标准已公布,不再是3.9~6.1,你的血糖还不算高吗? 8. 情感故事 定义:以人与人之间的情感交流、感人故事、情感经历为主题,内容通常充满温情、感动或反思,旨在引发读者的情感共鸣和思考。 标题示例: 男孩饭店吃饭,发现陌生女子和去世母亲很像,走过去说:我妈妈去世了,能抱一下我吗? 河南一女子直播时,被失散 32 年的父亲认出:闺女等着爸爸接你回家 1987年,江苏男子借好友一千元,25年后朋友成富豪还他1000万报恩 流浪狗跟着骑行夫妻跑了一百多公里,一直守护在女主身边,赶都赶不走,当男主得知原因后竟抱着狗狗大哭起来 9. 国家大事 定义:涉及国家实力、科技发展、资源发现、国际合作等内容,通常以宏观视角展现国家的综合实力、科技成就或国际影响力,体现国家的崛起与发展。 标题示例: 我国在南极发现“海上粮仓”,储量高达10亿吨,世界各国眼红不已 我国贵州发现7000万吨宝藏,价值高达上万亿,多国求合作被拒绝 距我国3000公里,塞班岛明明归美国管辖,为何岛上大多是中国人? 10. 现代人物 定义:聚焦活跃在21世纪后具有传奇色彩或巨大贡献的人物、事迹、成就等,内容通常充满戏剧性和启发性,旨在展现人物的非凡经历或历史贡献。 标题示例: 她曾狂贪国家上百亿,被发现时已经移居美国,最终还风光一时得善终 山东女子因坐月子无聊,破译美国2套绝密系统的密码,国家:奖励711万! 牺牲太大了!航天女英雄刘洋:结婚8年未生子,回地面后“消失”的她怎样了? 11. 怀旧时光 定义:以回忆和怀旧为主题,涉及过去的历史、文化、生活、照片等内容,旨在唤起读者对过去时光的情感共鸣和怀念。 标题示例: 1975年“下馆子”的老照片,2元能吃些什么,勾起那段最难忘的时光 82年,北京老人捡回两张“破椅子”,遭家人数落,29年后拍出2300万 这张老照片第一次看到,邓颖超和李讷的罕见合影! 12. 政治新闻 定义:聚焦政治事件、领导人动态、国际关系等内容,通常以新闻或分析的形式呈现,旨在揭示政治局势、政策变化或国际关系的动态。 标题示例: 中方外长行程有变,提前结束访欧匆匆回国,带回来一个好消息 宋庆龄在北京逝世后,远在美国的宋美龄只说了7个字,字字揪心! 庐山会议后,叶帅去劝彭德怀认个错,哭着说了一句心里话 13. 历史人物 定义:聚焦于21世纪前具有重要影响的人物,包括他们的生平、事迹、成就、性格、趣事及其对历史进程的贡献。内容通常以传记、回忆录或历史分析的形式呈现,旨在还原人物的真实面貌并探讨其历史意义。 标题示例: 林彪去世后,蒋介石收到林彪与戴笠的一份密谈文件,看后拍桌大骂 张学良软禁时的一张实拍照片,头发秃顶,两眼无光,像个中年老头 1912年,孙中山和两个女儿罕见留影,面对镜头父女三人看起来很幸福 14. 社会现象 定义:关注社会中出现的普遍现象、趋势或问题,通常涉及文化、经济、教育、民生等领域。内容以观察、分析或评论为主,旨在揭示现象背后的原因、影响及社会意义,引发公众的思考和讨论。 标题示例: 22年河南男子跳河救人,体力耗尽留遗言,被救女子猛然抓住他:一起走 浙江一老人刑满释放,靠蹬三轮为生,6年后,政府领导登门拜访:我们帮您分配工作 儿子收到清华通知书,父亲花5万请全村吃席,镇长看一眼竟说:这是假的 15.财经科技 定义:聚焦于经济、金融、投资及行业发展的分析与预测,涵盖未来经济趋势、资产价值变化、行业变革及个人理财策略等内容。可以提供前瞻性的财经视角和实用的理财建议,帮助其把握经济动态、优化财务规划并应对行业变化。 标题示例: 未来10年,现金和房子都将贬值,只有2样东西最值钱 外卖时代将被终结?一个全新行业正悄悄取代外卖,你准备好了吗? 准备存款的一定要知道,今明两年,定期存款要记住“4不存” 输入是一个 LIST, LIST 中的每个元素是一个元组,元组的第一个元素是文章的 ID,第二个元素是文章的标题。 最后输出结果请用JSON格式输出,key为ID,value为品类,仅输出JSON,不要markdown格式,不要任何其他内容 如果标题中包含半角双引号,则进行转义 输入的 LIST 是 {title_list} 检查你的输出,输出的品类一定需要是输入的 15个品类,只需要输出品类的中文汉字 """ return prompt async def _roll_back_lock_tasks(self, table_name): query = f""" update {table_name} set category_status = %s where category_status = %s and category_status_update_ts <= %s; """ await self.pool.async_save( query=query, params=( self.INIT_STATUS, self.PROCESSING_STATUS, int(time.time()) - self.MAX_PROCESSING_TIME, ) ) async def process_single_article(self, content_type, article): match content_type: case "video": article_id = article["id"] title = article["article_title"] case "article": article_id = article["article_id"] title = article["title"] case _: raise ValueError("content type is not supported") title_batch = [(article_id, title)] prompt = self.category_generation_from_title(title_batch) try: completion = fetch_deepseek_completion( model="DeepSeek-V3", prompt=prompt, output_type="json" ) return completion except Exception as e: await self.aliyun_log.log( contents={ "trace_id": self.trace_id, "data": { "article_id": article_id, "error": str(e), "traceback": traceback.format_exc(), } } ) return None class TitleRewrite(TitleProcess): async def roll_back_blocked_tasks(self): """ rollback blocked tasks """ query = f""" select id, title_rewrite_status_update_timestamp from publish_single_video_source where title_rewrite_status = {self.TITLE_REWRITE_LOCK_STATUS}; """ article_list = await self.pool.async_fetch( query=query, db_name="long_articles", ) if article_list: blocked_id_list = [ i["id"] for i in article_list if (int(time.time()) - i["title_rewrite_status_update_timestamp"]) > self.TITLE_REWRITE_LOCK_TIME ] if blocked_id_list: update_query = f""" update publish_single_video_source set title_rewrite_status = %s where id in %s and title_rewrite_status = %s; """ await self.pool.async_save( query=update_query, params=( self.TITLE_REWRITE_INIT_STATUS, tuple(blocked_id_list), self.TITLE_REWRITE_LOCK_STATUS, ), ) async def get_articles_batch(self, batch_size=1000): query = f""" select content_trace_id, article_title from publish_single_video_source where bad_status = {self.ARTICLE_POSITIVE_STATUS} and audit_status = {self.ARTICLE_AUDIT_PASSED_STATUS} and title_rewrite_status = {self.TITLE_REWRITE_INIT_STATUS} and platform in ('hksp', 'sph') limit {batch_size}; """ return await self.pool.async_fetch(query=query, db_name="long_articles") async def update_title_rewrite_status( self, content_trace_id, ori_status, new_status ): query = f""" update publish_single_video_source set title_rewrite_status = %s, title_rewrite_status_update_timestamp = %s where content_trace_id = %s and title_rewrite_status= %s; """ affected_rows = await self.pool.async_save( query=query, params=(new_status, int(time.time()), content_trace_id, ori_status), ) return affected_rows async def insert_into_rewrite_table(self, content_trace_id, new_title): """ insert into rewrite_table """ insert_sql = f""" insert into video_title_rewrite (content_trace_id, new_title, status, prompt_version) values (%s, %s, %s, %s); """ await self.pool.async_save( query=insert_sql, params=( content_trace_id, new_title, self.TITLE_USEFUL_STATUS, self.PROMPT_VERSION, ), ) async def rewrite_each_article(self, article): """ rewrite each article """ content_trace_id = article["content_trace_id"] article_title = article["article_title"] # lock each task affected_rows = await self.update_title_rewrite_status( content_trace_id=content_trace_id, ori_status=self.TITLE_REWRITE_INIT_STATUS, new_status=self.TITLE_REWRITE_LOCK_STATUS, ) if not affected_rows: return try: prompt = self.generate_title_rewrite_prompt(article_title) new_title = fetch_deepseek_completion(model="default", prompt=prompt) # insert into rewrite table await self.insert_into_rewrite_table( content_trace_id=content_trace_id, new_title=new_title ) # unlock await self.update_title_rewrite_status( content_trace_id=content_trace_id, ori_status=self.TITLE_REWRITE_LOCK_STATUS, new_status=self.TITLE_REWRITE_SUCCESS_STATUS, ) except Exception as e: await self.aliyun_log.log( contents={ "task": "title rewrite task", "function": "rewrite_each_article", "message": content_trace_id, "status": "fail", "data": { "error_message": str(e), "error_type": type(e).__name__, "traceback": traceback.format_exc(), }, } ) await self.update_title_rewrite_status( content_trace_id=content_trace_id, ori_status=self.TITLE_REWRITE_LOCK_STATUS, new_status=self.TITLE_REWRITE_FAIL_STATUS, ) async def deal(self): """title rewrite task deal""" await self.roll_back_blocked_tasks() task_list = await self.get_articles_batch() bar = tqdm(task_list, desc="title rewrite task") for article in bar: await self.rewrite_each_article(article) bar.set_description("title rewrite task") class VideoPoolCategoryGeneration: pass class ArticlePoolCategoryGeneration(TitleProcess): def __init__(self, pool, aliyun_log, trace_id): super().__init__(pool, aliyun_log, trace_id) async def lock_task(self, article_id_tuple: tuple[int, ...]) -> int: update_query = f""" update long_articles.crawler_meta_article set category_status = %s, category_status_update_ts = %s where article_id in %s and category_status = %s; """ return await self.pool.async_save( query=update_query, params=( self.PROCESSING_STATUS, int(time.time()), article_id_tuple, self.INIT_STATUS, ), ) async def get_task_list(self, limit=1000): query = f""" select article_id, title from long_articles.crawler_meta_article where category_status = %s and status = %s and score > %s order by score desc limit %s; """ return await self.pool.async_fetch( query=query, params=(self.INIT_STATUS, self.ARTICLE_INIT_STATUS, self.LIMIT_SCORE, limit), ) async def set_category_status_as_success(self,article_id: int, category: str) -> int: update_query = f""" update long_articles.crawler_meta_article set category_by_ai = %s, category_status = %s, category_status_update_ts = %s where article_id = %s and category_status = %s; """ return await self.pool.async_save( query=update_query, params=( category, self.SUCCESS_STATUS, int(time.time()), article_id, self.PROCESSING_STATUS, ), ) async def set_category_status_as_fail(self, article_id: int) -> int: update_query = f""" update long_articles.crawler_meta_article set category_status = %s, category_status_update_ts = %s where article_id = %s and category_status = %s; """ return await self.pool.async_save( query=update_query, params=( self.FAIL_STATUS, int(time.time()), article_id, self.PROCESSING_STATUS, ), ) async def process_each_batch(self, task_batch): title_batch = [(i["article_id"], i["title"]) for i in task_batch] id_tuple = tuple([int(i["article_id"]) for i in task_batch]) if await self.lock_task(id_tuple): prompt = self.category_generation_from_title(title_batch) try: completion = fetch_deepseek_completion( model="DeepSeek-V3", prompt=prompt, output_type="json" ) for article in title_batch: article_id = article[0] category = completion.get(str(article_id)) if category: await self.set_category_status_as_success(article_id, category) else: await self.set_category_status_as_fail(article_id) except Exception as e: await self.aliyun_log.log( contents={ "task": "ArticlePoolCategoryGeneration", "function": "process_each_batch", "message": "batch 中存在敏感词,AI 拒绝返回", "status": "fail", "trace_id": self.trace_id, "data": { "article_id": id_tuple, "error": str(e), "traceback": traceback.format_exc(), } } ) for article in tqdm(title_batch): single_completion = await self.process_single_article(content_type="article", article=article) article_id = article[0] if single_completion: category = single_completion.get(str(article_id)) if category: # set as success await self.set_category_status_as_success(article_id, category) else: await self.set_category_status_as_fail(article_id) else: # set as fail await self.set_category_status_as_fail(article_id) return else: return async def deal(self): await self._roll_back_lock_tasks(table_name="crawler_meta_article") task_list = await self.get_task_list() task_batch_list = yield_batch(data=task_list, batch_size=self.BATCH_SIZE) for task_batch in task_batch_list: await self.process_each_batch(task_batch)