zhangyong 2 tháng trước cách đây
mục cha
commit
14da4e1ea8
6 tập tin đã thay đổi với 289 bổ sung228 xóa
  1. 16 7
      carry_data_redis.py
  2. 4 4
      carry_nrfx_data_handle.py
  3. 191 216
      carry_video/nrfx_carry_video.py
  4. 46 0
      common/odps_data.py
  5. 5 1
      common/redis.py
  6. 27 0
      data_channel/piaoquan.py

+ 16 - 7
carry_data_redis.py

@@ -17,23 +17,32 @@ def bot_carry_data():
         logger.info(f"[+] 开始获取{NAME},时区为{dt}")
         count = insert_carry_data(dt, REDIS_NAME,FS_SHEET, NAME)
         logger.info(f"[+] {NAME},时区为{dt}共获取{count}条")
-        time.sleep(3)
-        nrfx_count = insert_carry_data(dt, "task:carry_redis_nrfx",FS_SHEET, NAME)
-        logger.info(f"[+] 内容分析时区为{dt}共获取{nrfx_count}条")
     except Exception as e:
         logger.error(f"[+] 获取{NAME},时区为{dt}失败,失败信息{e}")
 
 
+def bot_nrfx_carry_data():
+    try:
+        logger.info(f"[+] 开始获取内容分析")
+        count = insert_carry_data(0, "task:carry_redis_by_nrfx",FS_SHEET, "内容分析")
+        logger.info(f"[+] 开始获取内容分析共获取{count}条")
+    except Exception as e:
+        logger.error(f"[+] 获取内容分析失败,失败信息{e}")
+
+
 
 
 def schedule_tasks():
     schedule.every().hour.at(":05").do(bot_carry_data)
+    schedule.every().hour.at(":45").do(bot_nrfx_carry_data)
+
 
 
 if __name__ == "__main__":
-    schedule_tasks()  # 调用任务调度函数
-    while True:
-        schedule.run_pending()
-        time.sleep(1)  # 每秒钟检查一次
+    bot_nrfx_carry_data()
+    # schedule_tasks()  # 调用任务调度函数
+    # while True:
+    #     schedule.run_pending()
+    #     time.sleep(1)  # 每秒钟检查一次
     # bot_carry_data()
 

+ 4 - 4
carry_nrfx_data_handle.py

@@ -18,7 +18,7 @@ CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/
 
 def nrfx_video_task_start():
     logger.info(f"[+] {REDIS_NAME}任务开始redis获取")
-    data = get_carry_data("task:carry_redis_nrfx")
+    data = get_carry_data("task:carry_redis_by_nrfx")
     if not data:
         return
     try:
@@ -42,9 +42,9 @@ def nrfx_video_task_start():
         return
     except Exception as e:
         data = json.loads(data)
-        in_carry_video_data("task:carry_redis_nrfx", json.dumps(data, ensure_ascii=False, indent=4))
-        AliyunLogger.logging(data["name"], "内容分析", data["tag_transport_channel"], data["video_url"],
-                             f"视频处理失败,失败信息{e}", "3003", str(data))
+        in_carry_video_data("task:carry_redis_by_nrfx", json.dumps(data, ensure_ascii=False, indent=4))
+        AliyunLogger.logging(data["type"],"内容分析", data["channel"], data["videoid"], "视频处理失败,失败信息{e}", "3003", str(data))
+
         logger.error(f"[+] {data}处理失败,失败信息{e}")
         return
 

+ 191 - 216
carry_video/nrfx_carry_video.py

@@ -121,239 +121,214 @@ class NrfxCarryViode:
             return None, None,None
 
 
-    def insert_pq(self, REDIS_NAME, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, n_ids, type):
-        logger.info(f"[+] {REDIS_NAME}的{data}开始写入票圈")
-        if ',' in n_ids:
-            n_id_list = n_ids.split(',')
-        else:
-            n_id_list = [n_ids]
-        pq_list = []
-        for n_id in n_id_list:
-            code = PQ.insert_piaoquantv(oss_object_key, title, n_id)
-            if not code:
-                logger.error(f"[+] {REDIS_NAME}的{data}写入票圈后台失败")
-                AliyunLogger.logging(data["name"], type, tag_transport_channel, data["video_url"],
-                                     "改造失败,写入票圈后台失败", "3003", str(data))
-                continue
+    def insert_pq(self, data, oss_object_key, title, cover):
+        logger.info(f"[+] 开始写入票圈")
 
-            pq_list.append(code)
-            logger.info(f"[+] {REDIS_NAME}的{data}写入票圈成功,返回视频id{code}")
-            tag_status = Tag.video_tag(code, str(tags))
-            if tag_status == 0:
-                logger.info(f"[+] {REDIS_NAME}的{data}写入标签成功,后台视频ID为{code}")
-            try:
-                current_time = datetime.now()
-                formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-                sqlCollect.insert_machine_making_data(data["name"], type, tag_transport_channel,
-                                                      data["video_url"], data["video_url"], data["pq_ids"],
-                                                      data["title_category"],
-                                                      code,
-                                                      formatted_time, data["title_category"], oss_object_key)
-                pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail'  # 站内视频链接
-                values = [
-                    [
-                        str(code),
-                        str(n_id),
-                        formatted_time,
-                        channel_mark,
-                        data["name"],
-                        data["pq_ids"],
-                        data["pq_label"],
-                        data["activate_data"],
-                        data["video_url"],
-                        data["title_category"],
-                        tag_transport_channel,
-                        data["tag_transport_scene"],
-                        data["tag_transport_keyword"],
-                        data["tag"],
-                        data["transform_rule"],
-                        data["video_share"],
-                        data["trailer_share"],
-                        data["trailer_share_audio"],
-                        data["video_clipping"],
-                        data["video_clipping_time"],
-                        data["title_transform"],
-                        pq_url
-                    ]
+
+        code = PQ.insert_piaoquantv(oss_object_key, title, '50322062')
+        if not code:
+            logger.error(f"[+] 写入票圈后台失败")
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 写入票圈后台失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
+            return
+        logger.info(f"[+] 写入票圈成功,返回视频id{code}")
+        tag_status = Tag.video_tag(code, "lev-供给,rol-机器,#str-搬运改造内容理解引导语实验_60")
+        Tag.video_tag(data["videoid"], "lev-供给,rol-机器,#str-搬运改造内容理解引导语base_61")
+
+        if tag_status == 0:
+            logger.info(f"[+] 写入标签成功,后台视频ID为{code}")
+        try:
+            current_time = datetime.now()
+            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+            sqlCollect.insert_machine_making_data(data["channel"], data["name"], data["name"],
+                                                  data["videoid"], data["videoid"], "50322062",
+                                                  title,
+                                                  code,
+                                                  formatted_time, data["title_category"], oss_object_key)
+            pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail'  # 站内视频链接
+            values = [
+                [
+                    data["videoid"],
+                    code,
+                    data["channel"],
+                    data["dt"],
+                    formatted_time,
+                    pq_url
                 ]
-                Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", 'Um1nWA', "ROWS", 1, 2)
-                time.sleep(0.5)
-                Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", 'Um1nWA', "A2:Z2", values)
-                logger.info(f"[+] {REDIS_NAME}的{data}写入飞书成功")
-            except Exception as e:
-                logger.error(f"[+] {REDIS_NAME}的{data}写入飞书失败{e}")
-                pass
-        AliyunLogger.logging(data["name"], "内容分析", tag_transport_channel, data["video_url"],
-                             "改造成功", "1000", str(data), str(pq_list))
-        return
+            ]
+            Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "A2:Z2", values)
+            logger.info(f"[+] 写入飞书成功")
+            return
+        except Exception as e:
+            logger.error(f"[+] 写入飞书失败{e}")
+            return
 
 
 
     def main(self, data, file_path, GEMINI_API_KEY):
-        REDIS_NAME = 'task:carry_redis_nrfx'
-        try:
-            if data["transform_rule"] == '否':
-                return
-            url = data['video_url']
-            if "&vid=" in url or "?modal_id=" in url:
-                host = urlparse(url).netloc
-            else:
-                msg = html.unescape(url).split('?')[0]
-                pattern = re.search(r'https?://[^\s<>"\'\u4e00-\u9fff]+', msg)
-                if not pattern:
-                    in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
-                    return
-                url = pattern.group()
-                host = urlparse(url).netloc
-            if host in ['v.douyin.com', 'www.douyin.com', 'www.iesdouyin.com']:
-                tag_transport_channel = "抖音"
-                logger.info(f"[+] {url}开始获取抖音视频链接")
-                url, original_title, video_id =  self.get_text_dy_video(url=url)
-            elif host in ['v.kuaishou.com', 'www.kuaishou.com', 'v.m.chenzhongtech.com', 'creater.eozatvmq.com']:
-                tag_transport_channel = "快手"
-                logger.info(f"[+] {url}开始获取快手视频链接")
-                url, original_title, video_id = self.get_text_ks_video(url=url)
-            else:
-                AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"], "扫描到一条视频",
-                                     "2001", str(data))
-                logger.error(f"[+] {url}该链接不是抖/快 不做处理")
-                AliyunLogger.logging(data["name"], "内容分析","", data["video_url"],
-                                     "不是抖/快不做处理", "1001", str(data))
-                return
-            if url == "作品不存在":
-                return
-        except Exception as e:
-            logger.info(f"[+] 获取视频链接异常{e}")
-            in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
+        REDIS_NAME = 'task:carry_redis_by_nrfx'
+        video_id = data["videoid"]
+        AliyunLogger.logging(data["type"],"内容分析", data["channel"], video_id, "扫描到一条视频", "2001", str(data))
+        AliyunLogger.logging(data["type"],"内容分析", data["channel"], video_id, "符合规则等待改造", "2004", str(data))
+        logger.info(f"[+] 获取{video_id}的视频链接")
+        video_path, cover_path, old_title = PQ.get_pq_oss_path(video_id)
+        if not video_path:
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "没有获取到视频链接", "3001",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 没有获取到视频链接\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
+
             return
-        AliyunLogger.logging(data["name"],"内容分析", tag_transport_channel, data["video_url"], "扫描到一条视频", "2001", str(data))
-        AliyunLogger.logging(data["name"], "内容分析",tag_transport_channel, data["video_url"],  "符合规则等待改造", "2004", str(data))
-        if not url:
+        video_url = f"http://rescdn.yishihui.com/{video_path}"
+        video_path = DownLoad.download_video(video_url, file_path, '', video_id)
+        if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
             in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
-            logger.info(f"[+] {url}没有获取到视频链接,等待重新处理")
-            AliyunLogger.logging(data["name"], "内容分析",tag_transport_channel, data["video_url"],
-                                 "没有获取到视频链接,等待重新处理", "1002", str(data))
+            logger.error(f"[+] {video_url}下载失败")
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "视频下载失败等待重新处理", "3002",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 视频下载失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
             return
-        if url == "note":
-            logger.info(f"[+] {url}是图文不做处理")
-            AliyunLogger.logging(data["name"], "内容分析", tag_transport_channel, data["video_url"],
-                                 "是图文不做处理", "1002", str(data))
+        logger.info(f"[+] {video_url}视频下载成功")
+        logger.info(f"[+] {video_url}开始处理标题")
+        logger.info(f"[+] 视频更改分辨率处理成功")
+        logger.info(f"[+] 内容分析-开始获取视频口播内容")
+        video_text = GoogleAI.run(GEMINI_API_KEY, video_path)
+        if not video_text:
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,获取口播文案失败",
+                                 "3003",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 获取口播文案失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
             return
+        logger.info(f"[+] 内容分析-开始获取AI片尾")
+        pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
+        pw_url = TTS.get_pw_zm(pw_srt_text, 'zhifeng_emo')
+        if not pw_url:
+            logger.error(f"[+] 内容分析-片尾获取失败")
+            data["transform_rule"] = "仅改造"
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,片尾获取失败",
+                                 "3003",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 片尾获取失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
 
-        logger.info(f"[+] {url}开始下载视频")
-        video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
+            return
+        logger.info(f"[+] 内容分析-片尾获取成功")
+        pw_srt = TTS.getSrt(pw_url)
+        if not pw_srt:
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,片尾音频获取失败",
+                                 "3003",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 片尾音频获取失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
+            return
+        pw_mp3_path = TTS.download_mp3(pw_url, file_path)
+        if not pw_mp3_path:
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,片尾音频下载失败",
+                                 "3003",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 片尾音频下载失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
+            return
+        logger.info(f"[+] 内容分析-片尾音频下载成功")
+        logger.info(f"[+] 内容分析-片尾获取最后一帧成功")
+        jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
+        pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
+        if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
+            logger.error(f"[+] 内容分析-片尾拼接失败")
+            AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"],
+                                 "内容分析,片尾拼接失败", "3003", str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 片尾拼接失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
+            return
+        logger.info(f"[+] 内容分析-合并开始拼接")
+        video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
         if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
             in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
-            logger.error(f"[+] {url}下载失败")
-            AliyunLogger.logging(data["name"],"内容分析", tag_transport_channel, data["video_url"],
-                                 "视频下载失败等待重新处理", "3002", str(data))
+            logger.error(f"[+] 内容分析-添加片尾失败")
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 添加片尾失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
             return
-        logger.info(f"[+] {url}开始视频下载成功")
-        logger.info(f"[+] {url}开始处理标题")
-        if data["title_category"] == "AI标题" or data["trailer_share"] == "AI标题":
-            title = GPT4oMini.get_ai_mini_title(
-                original_title if data["title_category"] == "AI标题" else data["title_category"])
-        else:
-            title = original_title if data["title_category"] == "原标题" else data["title_category"]
-        if tag_transport_channel == "抖音":
-            if "复制打开抖音" in data['video_url']:
-                channel_mark = "APP"
-            else:
-                channel_mark = "PC"
-        else:
-            if "https://www.kuaishou.com/f" in data['video_url']:
-                channel_mark = "PC"
-            else:
-                channel_mark = "APP"
-        if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是":
-            width, height = FFmpeg.get_w_h_size(video_path)
-            if width < height:  # 判断是否需要修改为竖屏
-                video_path = FFmpeg.update_video_h_w(video_path, file_path)
-            logger.info(f"[+] {REDIS_NAME}的{data}视频更改分辨率处理")
-            video_path = FFmpeg.video_640(video_path, file_path)
-            if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
-                in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
-                logger.error(f"[+] {REDIS_NAME}的{data}视频更改分辨率失败")
-                AliyunLogger.logging(data["name"], "内容分析", tag_transport_channel, data["video_url"],
-                                     "改造失败,片尾拼接失败", "3001", str(data))
-                return
-            logger.info(f"[+] {REDIS_NAME}的{data}视频更改分辨率处理成功")
-            if data["video_clipping"]:  # 判断是否需要裁剪
-                video_path = FFmpeg.video_crop(video_path, file_path)
-            if data["video_clipping_time"]:  # 判断是否需要指定视频时长
-                video_path = FFmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
-            logger.info(f"[+] 内容分析-开始获取视频口播内容")
-            video_text = GoogleAI.run(GEMINI_API_KEY, video_path)
-            if not video_text:
-                AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"],
-                                     "内容分析,获取口播文案失败", "3003", str(data))
-                return
-            logger.info(f"[+] 内容分析-开始获取AI片尾")
-            pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
-            voice = data['trailer_share_audio']
-            if voice:
-                if ',' in voice:
-                    voices = voice.split(',')
-                else:
-                    voices = [voice]
-                voice = random.choice(voices)
-            else:
-                voice = "zhifeng_emo"
-            pw_url = TTS.get_pw_zm(pw_srt_text, voice)
-            if not pw_url:
-                logger.error(f"[+] 内容分析-片尾获取失败")
-                data["transform_rule"] = "仅改造"
-                AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"],
-                                     "内容分析,片尾获取失败", "3003", str(data))
-
-                return
-            logger.info(f"[+] 内容分析-片尾获取成功")
-            pw_srt = TTS.getSrt(pw_url)
-            if not pw_srt:
-                AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"],
-                                     "内容分析,片尾音频下载失败", "3003", str(data))
-                return
-            pw_mp3_path = TTS.download_mp3(pw_url, file_path)
-            if not pw_mp3_path:
-                AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"],
-                                     "内容分析,片尾音频下载失败", "3003", str(data))
-                return
-            logger.info(f"[+] 内容分析-片尾音频下载成功")
-            logger.info(f"[+] 内容分析-片尾获取最后一帧成功")
-            jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
-            pw_path = FFmpeg.nrfx_pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
-            if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
-                logger.error(f"[+] 内容分析-片尾拼接失败")
-                AliyunLogger.logging(data["name"], "内容分析", "", data["video_url"],
-                                     "内容分析,片尾拼接失败", "3003", str(data))
-                return
-            logger.info(f"[+] 内容分析-合并开始拼接")
-            video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
-            single_video_path = FFmpeg.single_video(video_path, file_path, data["video_share"])
-            if not os.path.exists(single_video_path) or os.path.getsize(single_video_path) == 0:
-                data["transform_rule"] = "仅改造"
-                in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
-                logger.error(f"[+] 内容分析-添加片中字幕失败")
-                AliyunLogger.logging(data["name"], "内容分析", tag_transport_channel, data["video_url"],
-                                     "内容分析,添加片中字幕失败", "3003", str(data))
-                return
-            logger.info(f"[+] 内容分析-添加片中字幕成功")
-            logger.info(f"[+] 内容分析-开始发送oss")
-            oss_object_key = Oss.stitching_sync_upload_oss(single_video_path, str(uuid.uuid4()))  # 视频发送OSS
-            status = oss_object_key.get("status")
-            if status != 200:
-                logger.error(f"[+] 内容分析-发送oss失败")
-                AliyunLogger.logging(data["name"], "内容分析", tag_transport_channel, data["video_url"],
-                                     "内容分析,发送oss失败", "3003", str(data))
-                return
-            logger.info(f"[+] 内容分析-发送oss成功")
-            oss_object_key = oss_object_key.get("oss_object_key")
-            tags = 'lev-供给,rol-机器,#str-搬运改造内容理解引导语实验_60'
-            self.insert_pq(REDIS_NAME, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, "50322062", "内容分析")
+        logger.info(f"[+] 内容分析-开始发送oss")
+        oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4()))  # 视频发送OSS
+        status = oss_object_key.get("status")
+        if status != 200:
+            logger.error(f"[+] 内容分析-发送oss失败")
+            AliyunLogger.logging(data["type"], "内容分析", data["channel"], video_id, "内容分析,发送oss失败",
+                                 "3003",
+                                 str(data))
+            text = (
+                f"**渠道**: {data['channel']}\n"
+                f"**内容**: {data}\n"
+                f"**失败信息**: 发送oss失败\n"
+            )
+            Feishu.finish_bot(text,
+                              "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
+                              f"【 内容理解-{data['channel']}失败通知 】")
             return
+        logger.info(f"[+] 内容分析-发送oss成功")
+        oss_object_key = oss_object_key.get("oss_object_key")
+        self.insert_pq(data, oss_object_key, old_title, cover_path)
+        return
 
 
-        return
 
 
 

+ 46 - 0
common/odps_data.py

@@ -0,0 +1,46 @@
+import json
+import datetime
+import math
+import random
+
+from odps import ODPS
+
+# ODPS服务配置
+ODPS_CONFIG = {
+    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+    'ACCESSID': 'LTAIWYUujJAm7CbH',
+    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+    'PROJECT': 'loghubods'
+}
+class OdpsDataCount:
+    @classmethod
+    def get_data_count(cls, dt):
+        odps = ODPS(
+            access_id=ODPS_CONFIG['ACCESSID'],
+            secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+            project=ODPS_CONFIG['PROJECT'],
+            endpoint=ODPS_CONFIG['ENDPOINT']
+        )
+        data_values = []
+        try:
+            sql = f'SELECT videoid,time,type,channel FROM loghubods.transport_spider_recommend_video_hour WHERE dt = "{dt}" and  channel = "搬运工具"'
+            with odps.execute_sql(sql).open_reader() as reader:
+                for row in reader:
+                    data_values.append(json.dumps( {"videoid": row[0], "time": row[1], "type": row[2], "channel": row[3], "dt": str(dt)}, ensure_ascii=False ))
+        except Exception as e:
+            print(f"An error occurred: {e}")
+            return data_values
+        return data_values
+
+    @classmethod
+    def main(cls):
+        dt = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y%m%d%H')
+        data_count = cls.get_data_count(dt= dt)
+        sample_size = math.ceil(len(data_count) / 2)
+        random_selection = random.sample(data_count, sample_size)
+
+        print(len(random_selection))
+        return random_selection
+
+if __name__ == '__main__':
+    OdpsDataCount.main()

+ 5 - 1
common/redis.py

@@ -1,6 +1,7 @@
 import redis
 
 from common import Material
+from common.odps_data import OdpsDataCount
 
 
 class SyncRedisHelper:
@@ -35,7 +36,10 @@ class SyncRedisHelper:
             self._pool.disconnect(inuse_connections=True)
 
 def insert_carry_data(dt, REDIS_NAME,FS_SHEET, NAME):
-    data = Material.get_carry_data(dt, FS_SHEET,NAME)
+    if NAME == "内容分析":
+        data = OdpsDataCount.main()
+    else:
+        data = Material.get_carry_data(dt, FS_SHEET,NAME)
     if not data:
         return 0
     helper = SyncRedisHelper()

+ 27 - 0
data_channel/piaoquan.py

@@ -49,6 +49,33 @@ class PQ:
             return new_video_id
         return None
 
+    @classmethod
+    def get_pq_oss_path(cls, video_id):
+        try:
+            url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/getBaseInfo"
+
+            payload = json.dumps({
+                "videoId": int(video_id)
+            })
+            headers = {
+                'Content-Type': 'application/json',
+                'Cookie': 'JSESSIONID=658158EABFCF6AC9B9BB0D8B61897A88'
+            }
+            for i in range(3):
+                response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
+                response = response.json()
+                code = response['code']
+                if code == 0:
+                    data = response['data']
+                    video_path = data["videoPath"]
+                    cover_path = data["coverImgPath"]
+                    title = data["title"]
+
+                    return video_path, cover_path, title
+            return None, None, None
+        except Exception as e:
+            return None, None, None
+
     @classmethod
     def get_pq_oss(cls, video_id_list):
         url_list = []