import os import time import datetime import requests from applications.db import DatabaseConnector def get_status_field_by_task(task: str) -> tuple[str, str]: match task: case "upload": status = "upload_status" update_timestamp = "upload_status_ts" case "extract": status = "extract_status" update_timestamp = "extract_status_ts" case "get_cover": status = "get_cover_status" update_timestamp = "get_cover_status_ts" case _: raise ValueError(f"Unexpected task: {task}") return status, update_timestamp def roll_back_lock_tasks( db_client: DatabaseConnector, task: str, max_process_time: int, init_status: int, processing_status: int ) -> int: """ rollback tasks which have been locked for a long time """ status, update_timestamp = get_status_field_by_task(task) now_timestamp = int(time.time()) timestamp_threshold = now_timestamp - max_process_time update_query = f""" update long_articles_new_video_cover set {status} = %s where {status} = %s and {update_timestamp} < %s; """ rollback_rows = db_client.save( query=update_query, params=(init_status, processing_status, timestamp_threshold) ) return rollback_rows def download_file(task_id, oss_path): """ 下载视频文件 """ video_url = "https://rescdn.yishihui.com/" + oss_path file_name = "static/{}.mp4".format(task_id) if os.path.exists(file_name): return file_name proxies = {"http": None, "https": None} with open(file_name, "wb") as f: response = requests.get(video_url, proxies=proxies) f.write(response.content) return file_name def generate_summary_prompt(text): prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 视频总结:{text} 第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。 你最终输出一段总结内容,将第一段和第二段之间空格一行。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt def update_task_queue_status( db_client: DatabaseConnector, task_id: int, task: str, ori_status: int, new_status: int) -> int: # update task queue status status, update_timestamp = get_status_field_by_task(task) update_query = f""" update long_articles_new_video_cover set {status} = %s, {update_timestamp} = %s where {status} = %s and id = %s; """ update_rows = db_client.save( query=update_query, params=( new_status, datetime.datetime.now(), ori_status, task_id, ), ) return update_rows