zhangliang 2 dagar sedan
förälder
incheckning
fd03f01990

+ 5 - 0
.env

@@ -24,3 +24,8 @@ ROCKETMQ_BATCH=1
 # 阿里云日志上报
 ALIYUN_ACCESS_KEY_ID="LTAIWYUujJAm7CbH"
 ALIYUN_ACCESS_KEY_SECRET="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+
+
+# 飞书配置
+FEISHU_APPID="cli_a13ad2afa438d00b"
+FEISHU_APPSECRET="4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"

+ 0 - 19
.env.test

@@ -1,19 +0,0 @@
-# 环境配置
-ENV=test
-LOG_LEVEL=INFO
-ENABLE_ALIYUN_LOG=true
-
-
-# 数据库配置
-DB_HOST="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com"
-DB_PORT=3306
-DB_USER="crawler"
-DB_PASSWORD="crawler123456@"
-DB_NAME="piaoquan-crawler"
-DB_CHARSET="utf8mb4"
-
-# 消息队列配置(RabbitMQ/其他)
-MQ_HOST=localhost
-MQ_PORT=5672
-MQ_USER=guest
-MQ_PASSWORD=guest

+ 4 - 0
config/base.py

@@ -37,6 +37,10 @@ class Settings(BaseSettings):
     ROCKETMQ_WAIT_SECONDS: int = 10
     ROCKETMQ_BATCH: int = 1
 
+    # 飞书配置
+    FEISHU_APPID: str = Field(..., env="FEISHU_APPID")
+    FEISHU_APPSECRET: str = Field(..., env="FEISHU_APPSECRET")
+
     # 连接池配置
     CONNECTION_TIMEOUT: int = 10
     REQUEST_TIMEOUT: int = 30

+ 5 - 2
config/spiders_config.yaml

@@ -10,9 +10,10 @@ benshanzhufurecommend:
   path: /crawler/ben_shan_zhu_fu/recommend
   method: post
   request_body:
-    cursor: "1"
-  loop_times: 2
+    cursor: "{{next_cursor}}"
+  loop_times: 200
   loop_interval: 5
+  feishu_sheetid: "aTSJH4"
   response_parse:
     next_cursor: "$.data.next_cursor"
     data_path: "$.data.data"
@@ -27,6 +28,8 @@ benshanzhufurecommend:
       video_url: "$.video_url"
       out_video_id: "$.nid"
 
+
+
 xngtjl_recommend_prod:
   platform: xiaoniangaotuijianliu
   mode: recommend

+ 16 - 3
core/base/async_request_client.py

@@ -8,8 +8,9 @@ class AsyncRequestClient:
     """
     可独立复用的异步请求客户端,支持重试、日志结构化和限流预留
     """
-    def __init__(self, logger=None, max_retries=3, timeout=30):
+    def __init__(self, logger=None, aliyun_log=None,max_retries=3, timeout=30):
         self.logger = logger
+        self.aliyun_log = aliyun_log
         self.max_retries = max_retries
         self.timeout = timeout
 
@@ -22,13 +23,25 @@ class AsyncRequestClient:
                     self.logger.info(f"请求 {method} {url}, 尝试 {retries+1}/{self.max_retries}")
                 async with session.request(method, url, **kwargs) as response:
                     response.raise_for_status()
-                    return await response.json()
+                    resp = await response.json()
+                    if resp.get('code') != 0:
+                        retries += 1
+                        if self.logger:
+                            self.logger.warning(f"请求失败 {resp}, 重试 {retries}/{self.max_retries}")
+                    return resp
             except Exception as e:
                 retries += 1
                 if retries >= self.max_retries:
                     if self.logger:
                         self.logger.error(f"请求失败达到最大重试次数: {e}")
-                    raise
+                    if self.aliyun_log:
+                        self.aliyun_log.logging(
+                            code="9006",
+                            message=f"请求异常达到最大重试次数",
+                            data=f"{url}"
+                        )
+                    return
                 if self.logger:
                     self.logger.warning(f"请求失败 {e}, 重试 {retries}/{self.max_retries}")
+
                 await asyncio.sleep(1)

+ 1 - 0
core/models/spiders_config_models.py

@@ -19,4 +19,5 @@ class PlatformConfig(BaseConfig):
     max_pages: int = 0
     parse: dict = {}
     retry_times: int = 0
+    feishu_sheetid: str
 

+ 2 - 2
core/models/video_item.py

@@ -4,7 +4,7 @@ from typing import Optional
 
 from pydantic import BaseModel, Field
 
-from services import clean_title
+from services.clean_title import clean_title
 
 
 class VideoItem(BaseModel):
@@ -71,7 +71,7 @@ class VideoItem(BaseModel):
 
         must_fields = [
             "video_id", "user_id", "user_name", "out_video_id", "session",
-            "video_url", "cover_url", "classname", "strategy"
+            "video_url", "cover_url", "platform", "strategy"
         ]
         for f in must_fields:
             if not getattr(self, f, None):

+ 3 - 0
core/utils/__init__.py

@@ -0,0 +1,3 @@
+from .log.log_codes import CODES
+
+__all__ = ['CODES']

+ 5 - 1
core/utils/extractors.py

@@ -33,7 +33,7 @@ def extract_multiple(json_obj, fields: dict) -> dict:
     return {key: safe_extract(json_obj, path) for key, path in fields.items()}
 
 
-def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None) -> Dict:
+def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None,aliyun_log=None) -> Dict:
     result = {}
     for field, path in field_map.items():
         if not isinstance(path, str) or not path.startswith("$"):
@@ -42,5 +42,9 @@ def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None) ->
         value = safe_extract(video, path)
         if value is None and logger:
             logger.warning(f"{trace_id} 字段提取失败: {field} 路径: {path}")
+            # aliyun_log.logging(
+            #     code=""
+            #     trace_id=trace_id,
+            # )
         result[field] = value
     return result

+ 0 - 3
core/utils/feishu/__init__.py

@@ -1,3 +0,0 @@
-from .feishu import Feishu
-from .feishu_data import FsData
-from .feishu_insert import FeishuInsert

+ 0 - 727
core/utils/feishu/feishu.py

@@ -1,727 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/1/31
-"""
-飞书表配置: token 鉴权 / 增删改查 / 机器人报警
-"""
-import json
-import os
-import sys
-
-import requests
-import urllib3
-
-sys.path.append(os.getcwd())
-
-from core.utils.log import Local
-
-proxies = {"http": None, "https": None}
-
-
-class Feishu:
-    """
-    编辑飞书云文档
-    """
-    # 看一看爬虫数据表
-    kanyikan_url = "https://w42nne6hzg.feishu.cn/sheets/shtcngRPoDYAi24x52j2nDuHMih?"
-    # 快手爬虫数据表
-    kuaishou_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnICEfaw9llDNQkKgdymM1xf?"
-    # 微视爬虫数据表
-    weishi_url = "https://w42nne6hzg.feishu.cn/sheets/shtcn5YSWg91JfVGzj0SFZIRRPh?"
-    # 小年糕爬虫数据表
-    xiaoniangao_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnYxiyQ1wLklo1W5Kdqc9cGh?"
-    # 音乐相册
-    music_album = "https://w42nne6hzg.feishu.cn/sheets/shtcnT6zvmfsYe1g0iv4pt7855g?"
-    # 本山祝福数据表
-    crawler_benshanzhufu = "https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb?"
-    # 公众号爬虫表
-    gzh_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnexNXnpDLHhARw0QdiwbYuA?"
-    # 数据监控表
-    crawler_monitor = "https://w42nne6hzg.feishu.cn/sheets/shtcnlZWYazInhf7Z60jkbLRJyd?"
-    # 微群视频爬虫表
-    crawler_weiqun_video = "https://w42nne6hzg.feishu.cn/sheets/shtcnoKThNquYRweaylMFVyo9Hc?"
-    # 视频号爬虫表
-    crawler_shipinhao = 'https://w42nne6hzg.feishu.cn/sheets/shtcn9rOdZRAGFbRkWpn7hqEHGc?'
-    # 西瓜视频
-    crawler_xigua = 'https://w42nne6hzg.feishu.cn/sheets/shtcnvOpx2P8vBXiV91Ot1MKIw8?'
-    # 知乎 PC 端
-    crawler_zhihu = 'https://w42nne6hzg.feishu.cn/sheets/shtcnkGPBmGsjaqapgzouuj8MXe?'
-    # 吉祥幸福
-    crawler_jixiangxingfu = 'https://w42nne6hzg.feishu.cn/sheets/shtcnSx4nafMbLTq7xl7RHBwHBf?'
-    # 福小顺
-    crawler_fuxiaoshun = 'https://w42nne6hzg.feishu.cn/sheets/CoXEsl6MDhMaKKt6GUBcvLwsnWb?'
-    # 众妙音信
-    crawler_zmyx = 'https://w42nne6hzg.feishu.cn/sheets/shtcnbZIxstPeM0xshW07b26sve?'
-    # 岁岁年年迎福气
-    crawler_ssnnyfq = 'https://w42nne6hzg.feishu.cn/sheets/shtcnyJmJSJynHDLLbLTkySfvZe?'
-    # 祝福猫视频
-    crawler_zhufumao = 'https://w42nne6hzg.feishu.cn/sheets/shtcnXfIJthvkjhI5zlEJq84i6g?'
-    # 宗教公众号
-    crawler_zongjiao = 'https://w42nne6hzg.feishu.cn/sheets/shtcn73NW0CyoOeF21HWO15KBsb?'
-    # 好看视频
-    crawler_haokan = 'https://w42nne6hzg.feishu.cn/sheets/shtcnaYz8Nhv8q6DbWtlL6rMEBd'
-    # 看到就是福气
-    crawler_kandaojiushifuqi = 'https://w42nne6hzg.feishu.cn/sheets/shtcnEokBkIjOUPAk8vbbPKnXgb'
-    # 胜胜影音
-    crawler_shengshengyingyin = 'https://w42nne6hzg.feishu.cn/sheets/shtcnz1ymxHL1u8WHblfqfys7qe'
-    # 刚刚都传
-    crawler_ganggangdouchuan = 'https://w42nne6hzg.feishu.cn/sheets/shtcnTuJgeZU2bc7VaesAqk3QJx'
-    # 知青天天看
-    crawler_zhiqingtiantiankan = 'https://w42nne6hzg.feishu.cn/sheets/shtcnjmhKdJOKdqnEzJcZb5xaHc?'
-    # 公众号_信欣
-    crawler_gongzhonghao = 'https://w42nne6hzg.feishu.cn/sheets/shtcna98M2mX7TbivTj9Sb7WKBN?'
-    # YouTube
-    crawler_youtube = 'https://w42nne6hzg.feishu.cn/sheets/shtcnrLyr1zbYbhhZyqpN7Xrd5f?'
-    # 微信指数
-    weixinzhishu = 'https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?'
-    # 微信指数_搜索词
-    weixinzhishu_search_word = 'https://w42nne6hzg.feishu.cn/sheets/shtcnHxCj6dZBYMuK1Q3tIJVlqg?'
-    # 海豚祝福
-    crawler_haitunzhufu = 'https://w42nne6hzg.feishu.cn/sheets/VbyAsUGq3h9TQ7tG3GpczGjhn1M?'
-
-    # 飞书路径token
-    @classmethod
-    def spreadsheettoken(cls, crawler):
-        """
-        :param crawler: 哪个爬虫
-        """
-        if crawler == "kanyikan":
-            return "shtcngRPoDYAi24x52j2nDuHMih"
-        elif crawler == "kuaishou":
-            return "shtcnICEfaw9llDNQkKgdymM1xf"
-        elif crawler == "weishi":
-            return "shtcn5YSWg91JfVGzj0SFZIRRPh"
-        elif crawler == "xiaoniangao":
-            return "shtcnYxiyQ1wLklo1W5Kdqc9cGh"
-        elif crawler == "control":
-            return "shtcnlZWYazInhf7Z60jkbLRJyd"
-        elif crawler == "music_album":
-            return "shtcnT6zvmfsYe1g0iv4pt7855g"
-        elif crawler == "benshanzhufu":
-            return "shtcnGh2rrsPYM4iVNEBO7OqWrb"
-        elif crawler == "gzh":
-            return "shtcnexNXnpDLHhARw0QdiwbYuA"
-        elif crawler == "weiqun":
-            return "shtcnoKThNquYRweaylMFVyo9Hc"
-        elif crawler == 'shipinhao':
-            return 'shtcn9rOdZRAGFbRkWpn7hqEHGc'
-        elif crawler == 'xigua':
-            return 'shtcnvOpx2P8vBXiV91Ot1MKIw8'
-        elif crawler == 'zhihu':
-            return 'shtcnkGPBmGsjaqapgzouuj8MXe'
-        elif crawler == 'jixiangxingfu':
-            return 'shtcnSx4nafMbLTq7xl7RHBwHBf'
-        elif crawler == 'fuxiaoshun':
-            return 'CoXEsl6MDhMaKKt6GUBcvLwsnWb'
-        elif crawler == 'zhongmiaoyinxin':
-            return 'shtcnbZIxstPeM0xshW07b26sve'
-        elif crawler == 'suisuiniannianyingfuqi':
-            return 'shtcnyJmJSJynHDLLbLTkySfvZe'
-        elif crawler == 'zhufumao':
-            return 'shtcnXfIJthvkjhI5zlEJq84i6g'
-        elif crawler == 'zongjiao':
-            return 'shtcn73NW0CyoOeF21HWO15KBsb'
-        elif crawler == 'haokan':
-            return 'shtcnaYz8Nhv8q6DbWtlL6rMEBd'
-        elif crawler == 'kandaojiushifuqi':
-            return 'shtcnEokBkIjOUPAk8vbbPKnXgb'
-        elif crawler == 'shengshengyingyin':
-            return 'shtcnz1ymxHL1u8WHblfqfys7qe'
-        elif crawler == 'ganggangdouchuan':
-            return 'shtcnTuJgeZU2bc7VaesAqk3QJx'
-        elif crawler == 'youtube':
-            return 'shtcnrLyr1zbYbhhZyqpN7Xrd5f'
-        elif crawler == 'weixinzhishu':
-            return 'shtcnqhMRUGunIfGnGXMOBYiy4K'
-        elif crawler == 'weixinzhishu_search_word':
-            return 'shtcnHxCj6dZBYMuK1Q3tIJVlqg'
-        elif crawler == 'gongzhonghao':
-            return 'shtcna98M2mX7TbivTj9Sb7WKBN'
-        elif crawler == 'douyin':
-            return 'shtcnhq63MoXOpqbkuLuoapYIAh'
-        elif crawler == 'zhiqingtiantiankan':
-            return 'shtcnjmhKdJOKdqnEzJcZb5xaHc'
-        elif crawler == 'haitunzhufu':
-            return 'VbyAsUGq3h9TQ7tG3GpczGjhn1M'
-
-    # 获取飞书api token
-    @classmethod
-    def get_token(cls, log_type, crawler):
-        """
-        获取飞书api token
-        :return:
-        """
-        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
-        post_data = {"app_id": "cli_a13ad2afa438d00b",  # 这里账号密码是发布应用的后台账号及密码
-                     "app_secret": "4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"}
-
-        try:
-            urllib3.disable_warnings()
-            response = requests.post(url=url, data=post_data, proxies=proxies, verify=False)
-            tenant_access_token = response.json()["tenant_access_token"]
-            return tenant_access_token
-        except Exception as e:
-            Local.logger(log_type, crawler).error("获取飞书 api token 异常:{}", e)
-
-    # 获取表格元数据
-    @classmethod
-    def get_metainfo(cls, log_type, crawler):
-        """
-        获取表格元数据
-        :return:
-        """
-        try:
-            get_metainfo_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                               + cls.spreadsheettoken(crawler) + "/metainfo"
-
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            params = {
-                "extFields": "protectedRange",  # 额外返回的字段,extFields=protectedRange时返回保护行列信息
-                "user_id_type": "open_id"  # 返回的用户id类型,可选open_id,union_id
-            }
-            urllib3.disable_warnings()
-            r = requests.get(url=get_metainfo_url, headers=headers, params=params, proxies=proxies, verify=False)
-            response = json.loads(r.content.decode("utf8"))
-            return response
-        except Exception as e:
-            Local.logger(log_type, crawler).error("获取表格元数据异常:{}", e)
-
-    # 读取工作表中所有数据
-    @classmethod
-    def get_values_batch(cls, log_type, crawler, sheetid):
-        """
-        读取工作表中所有数据
-        :param log_type: 启用哪个 log
-        :param crawler: 哪个爬虫
-        :param sheetid: 哪张表
-        :return: 所有数据
-        """
-        try:
-            get_values_batch_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                   + cls.spreadsheettoken(crawler) + "/values_batch_get"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            params = {
-                # 多个查询范围 如 url?ranges=range1,range2 ,其中 range 包含 sheetId 与单元格范围两部分
-                "ranges": sheetid,
-
-                # valueRenderOption=ToString 可返回纯文本的值(数值类型除外);
-                # valueRenderOption=FormattedValue 计算并格式化单元格;
-                # valueRenderOption=Formula单元格中含有公式时返回公式本身;
-                # valueRenderOption=UnformattedValue计算但不对单元格进行格式化
-                "valueRenderOption": "ToString",
-
-                # dateTimeRenderOption=FormattedString 计算并将时间日期按照其格式进行格式化,但不会对数字进行格式化,返回格式化后的字符串。
-                "dateTimeRenderOption": "",
-
-                # 返回的用户id类型,可选open_id,union_id
-                "user_id_type": "open_id"
-            }
-            urllib3.disable_warnings()
-            r = requests.get(url=get_values_batch_url, headers=headers, params=params, proxies=proxies, verify=False)
-            # print(r.text)
-            response = json.loads(r.content.decode("utf8"))
-            values = response["data"]["valueRanges"][0]["values"]
-            return values
-        except Exception as e:
-            Local.logger(log_type, crawler).error("读取工作表所有数据异常:{}", e)
-
-    # 工作表,插入行或列
-    @classmethod
-    def insert_columns(cls, log_type, crawler, sheetid, majordimension, startindex, endindex):
-        """
-        工作表插入行或列
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫的云文档
-        :param sheetid:哪张工作表
-        :param majordimension:行或者列, ROWS、COLUMNS
-        :param startindex:开始位置
-        :param endindex:结束位置
-        """
-        try:
-            insert_columns_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                 + cls.spreadsheettoken(crawler) + "/insert_dimension_range"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            body = {
-                "dimension": {
-                    "sheetId": sheetid,
-                    "majorDimension": majordimension,  # 默认 ROWS ,可选 ROWS、COLUMNS
-                    "startIndex": startindex,  # 开始的位置
-                    "endIndex": endindex  # 结束的位置
-                },
-                "inheritStyle": "AFTER"  # BEFORE 或 AFTER,不填为不继承 style
-            }
-
-            urllib3.disable_warnings()
-            r = requests.post(url=insert_columns_url, headers=headers, json=body, proxies=proxies, verify=False)
-            Local.logger(log_type, crawler).info("插入行或列:{}", r.json()["msg"])
-        except Exception as e:
-            Local.logger(log_type, crawler).error("插入行或列异常:{}", e)
-
-    # 写入数据
-    @classmethod
-    def update_values(cls, log_type, crawler, sheetid, ranges, values):
-        """
-        写入数据
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫的云文档
-        :param sheetid:哪张工作表
-        :param ranges:单元格范围
-        :param values:写入的具体数据,list
-        """
-        try:
-            update_values_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                + cls.spreadsheettoken(crawler) + "/values_batch_update"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            body = {
-                "valueRanges": [
-                    {
-                        "range": sheetid + "!" + ranges,
-                        "values": values
-                    },
-                ],
-            }
-            urllib3.disable_warnings()
-            r = requests.post(url=update_values_url, headers=headers, json=body, proxies=proxies, verify=False)
-            Local.logger(log_type, crawler).info("写入数据:{}", r.json()["msg"])
-        except Exception as e:
-            Local.logger(log_type, crawler).error("写入数据异常:{}", e)
-
-    # 合并单元格
-    @classmethod
-    def merge_cells(cls, log_type, crawler, sheetid, ranges):
-        """
-        合并单元格
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫
-        :param sheetid:哪张工作表
-        :param ranges:需要合并的单元格范围
-        """
-        try:
-            merge_cells_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                              + cls.spreadsheettoken(crawler) + "/merge_cells"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-
-            body = {
-                "range": sheetid + "!" + ranges,
-                "mergeType": "MERGE_ROWS"
-            }
-            urllib3.disable_warnings()
-            r = requests.post(url=merge_cells_url, headers=headers, json=body, proxies=proxies, verify=False)
-            Local.logger(log_type, crawler).info("合并单元格:{}", r.json()["msg"])
-        except Exception as e:
-            Local.logger(log_type, crawler).error("合并单元格异常:{}", e)
-
-    # 读取单元格数据
-    @classmethod
-    def get_range_value(cls, log_type, crawler, sheetid, cell):
-        """
-        读取单元格内容
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫
-        :param sheetid: 哪张工作表
-        :param cell: 哪个单元格
-        :return: 单元格内容
-        """
-        try:
-            get_range_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                  + cls.spreadsheettoken(crawler) + "/values/" + sheetid + "!" + cell
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            params = {
-                # valueRenderOption=ToString 可返回纯文本的值(数值类型除外);
-                # valueRenderOption=FormattedValue 计算并格式化单元格;
-                # valueRenderOption=Formula 单元格中含有公式时返回公式本身;
-                # valueRenderOption=UnformattedValue 计算但不对单元格进行格式化。
-                "valueRenderOption": "FormattedValue",
-
-                # dateTimeRenderOption=FormattedString 计算并对时间日期按照其格式进行格式化,但不会对数字进行格式化,返回格式化后的字符串。
-                "dateTimeRenderOption": "",
-
-                # 返回的用户id类型,可选open_id,union_id
-                "user_id_type": "open_id"
-            }
-            urllib3.disable_warnings()
-            r = requests.get(url=get_range_value_url, headers=headers, params=params, proxies=proxies, verify=False)
-            # print(r.text)
-            return r.json()["data"]["valueRange"]["values"][0]
-        except Exception as e:
-            Local.logger(log_type, crawler).error("读取单元格数据异常:{}", e)
-
-    # 获取表内容
-    @classmethod
-    def get_sheet_content(cls, log_type, crawler, sheet_id):
-        try:
-            sheet = Feishu.get_values_batch(log_type, crawler, sheet_id)
-            content_list = []
-            for x in sheet:
-                for y in x:
-                    if y is None:
-                        pass
-                    else:
-                        content_list.append(y)
-            return content_list
-        except Exception as e:
-            Local.logger(log_type, crawler).error(f'get_sheet_content:{e}\n')
-
-    # 删除行或列,可选 ROWS、COLUMNS
-    @classmethod
-    def dimension_range(cls, log_type, crawler, sheetid, major_dimension, startindex, endindex):
-        """
-        删除行或列
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫
-        :param sheetid:工作表
-        :param major_dimension:默认 ROWS ,可选 ROWS、COLUMNS
-        :param startindex:开始的位置
-        :param endindex:结束的位置
-        :return:
-        """
-        try:
-            dimension_range_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                  + cls.spreadsheettoken(crawler) + "/dimension_range"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            body = {
-                "dimension": {
-                    "sheetId": sheetid,
-                    "majorDimension": major_dimension,
-                    "startIndex": startindex,
-                    "endIndex": endindex
-                }
-            }
-            urllib3.disable_warnings()
-            r = requests.delete(url=dimension_range_url, headers=headers, json=body, proxies=proxies, verify=False)
-            Local.logger(log_type, crawler).info("删除视频数据:{}", r.json()["msg"])
-        except Exception as e:
-            Local.logger(log_type, crawler).error("删除视频数据异常:{}", e)
-
-    # 获取用户 ID
-    @classmethod
-    def get_userid(cls, log_type, crawler, username):
-        try:
-            url = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(log_type, crawler),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            name_phone_dict = {
-                "xinxin": "15546206651",
-                "muxinyi": "13699208058",
-                "wangxueke": "13513479926",
-                "yuzhuoyi": "18624010360",
-                "luojunhui": "18801281360",
-                "fanjun": "15200827642",
-                "zhangyong": "17600025055"
-            }
-
-            # if username == "wangkun":
-            #     username = "13426262515"
-            # # elif username == "gaonannan":
-            # #     username = "18501180073"
-            # elif username == "xinxin":
-            #     username = "15546206651"
-            # # elif username == "huxinxue":
-            # #     username = "18832292015"
-            # # elif username == "wuchaoyue":
-            # #     username = "15712941385"
-            # elif username == "muxinyi":
-            #     username = '13699208058'
-            # elif username == "wangxueke":
-            #     username = '13513479926'
-            # elif username == "yuzhuoyi":
-            #     username = '18624010360'
-            # elif username == "luojunhui":
-            #     username = '18801281360'
-            username = name_phone_dict.get(username)
-
-            data = {"mobiles": [username]}
-            urllib3.disable_warnings()
-            r = requests.get(url=url, headers=headers, params=data, verify=False, proxies=proxies)
-            open_id = r.json()["data"]["mobile_users"][username][0]["open_id"]
-            # Common.logger(log_type, crawler).info(f"{username}:{open_id}")
-            # print(f"{username}:{open_id}")
-            return open_id
-        except Exception as e:
-            Local.logger(log_type, crawler).error(f"get_userid异常:{e}\n")
-
-    # 飞书机器人
-    @classmethod
-    def bot(cls, log_type, crawler, text):
-        try:
-            url = "https://open.feishu.cn/open-apis/bot/v2/hook/96989577-50e7-4653-9ec2-308fe3f2c5fe"
-            headers = {'Content-Type': 'application/json'}
-            if crawler == "kanyikan":
-                content = "看一看爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcngRPoDYAi24x52j2nDuHMih"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "gaonannan")) + "></at>\n"
-            elif crawler == "jixiangxingfu":
-                content = text
-                sheet_url = ""
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangxueke")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-
-            # elif crawler == "weixinzhishu_out":
-            #     content = "微信指数_站外指数"
-            #     sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=YVuVgQ"
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-            #         cls.get_userid(log_type, crawler, "muxinyi")) + "></at>\n"
-            # elif crawler == "weixinzhishu_inner_sort":
-            #     content = "微信指数_站内短期指数"
-            #     sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=DrZHpa"
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-            #         cls.get_userid(log_type, crawler, "muxinyi")) + "></at>\n"
-            # elif crawler == "weixinzhishu_inner_long":
-            #     content = "微信指数_站内长期指数"
-            #     sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=JpgyAv"
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-            #         cls.get_userid(log_type, crawler, "muxinyi")) + "></at>\n"
-            #
-            # elif crawler == "weixinzhishu" and text == "今日微信指数抓取完毕":
-            #     content = "微信指数"
-            #     sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k"
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "yuzhuoyi")) + "></at>\n"
-            # elif crawler == "weixinzhishu":
-            #     content = "微信指数"
-            #     sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k"
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-            #         cls.get_userid(log_type, crawler, "muxinyi")) + "></at>\n"
-
-            elif crawler == "xiaoniangao_hour":
-                content = "小年糕_小时级_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnYxiyQ1wLklo1W5Kdqc9cGh?sheet=yatRv2"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-            elif crawler == "xiaoniangao_person":
-                content = "小年糕_用户主页_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnYxiyQ1wLklo1W5Kdqc9cGh?sheet=Wu0CeL"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-            elif crawler == "xiaoniangao_play":
-                content = "小年糕_播放量_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnYxiyQ1wLklo1W5Kdqc9cGh?sheet=c85k1C"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-
-            elif crawler == 'xigua' and log_type == "recommend":
-                content = '西瓜视频_推荐_已下载表'
-                sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnvOpx2P8vBXiV91Ot1MKIw8?sheet=ZzsClu'
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wangxueke")) + "></at>\n"
-            # elif crawler == 'xigua':
-            #     content = '西瓜视频_用户主页_已下载表'
-            #     sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnvOpx2P8vBXiV91Ot1MKIw8?sheet=e075e9'
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-            #         cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-            # elif crawler == 'xigua_little_video':
-            #     content = '西瓜视频_小视频_已下载表'
-            #     sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnvOpx2P8vBXiV91Ot1MKIw8?sheet=hDSDnv'
-            #     users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-            #         cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-
-            elif crawler == 'zhihu_hot':
-                content = '知乎_热门_已下载表'
-                sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnkGPBmGsjaqapgzouuj8MXe?sheet=8871e3'
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "gaonannan")) + "></at>\n"
-            elif crawler == 'zhihu_follow':
-                content = '知乎_定向_已下载表'
-                sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnkGPBmGsjaqapgzouuj8MXe?sheet=4MGuux'
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "gaonannan")) + "></at>\n"
-
-            elif crawler == 'haokan_hot':
-                content = '好看_热榜_已下载表'
-                sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnaYz8Nhv8q6DbWtlL6rMEBd?sheet=5pWipX'
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-            elif crawler == 'haokan_channel':
-                content = '好看_频道_已下载表'
-                sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnaYz8Nhv8q6DbWtlL6rMEBd?sheet=7f05d8'
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-            elif crawler == 'haokan_follow':
-                content = '好看_定向_已下载表'
-                sheet_url = 'https://w42nne6hzg.feishu.cn/sheets/shtcnaYz8Nhv8q6DbWtlL6rMEBd?sheet=kVaSjf'
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-
-            elif crawler == "music_album":
-                content = "音乐相册爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnT6zvmfsYe1g0iv4pt7855g"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "gaonannan")) + "></at>\n"
-
-            elif crawler == "ssyy":
-                content = "胜胜影音爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnz1ymxHL1u8WHblfqfys7qe"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "gaonannan")) + "></at>\n"
-
-            elif crawler == "ggdc":
-                content = "刚刚都传爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnTuJgeZU2bc7VaesAqk3QJx"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-
-            elif crawler == "bszf":
-                content = "本山祝福爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-
-            elif crawler == "jxxf":
-                content = "吉祥幸福爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnSx4nafMbLTq7xl7RHBwHBf"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-
-            elif crawler == "zmyx":
-                content = "众妙音信爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnbZIxstPeM0xshW07b26sve"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "zhangyong")) + "></at>\n"
-
-            elif crawler == "zhufumao":
-                content = "祝福猫视频爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnXfIJthvkjhI5zlEJq84i6g"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "gaonannan")) + "></at>\n"
-
-            elif crawler == "kuaishou_follow":
-                content = "快手_用户主页_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnICEfaw9llDNQkKgdymM1xf?sheet=fYdA8F"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "xinxin")) + "></at>\n"
-            elif crawler == "kuaishou_recommend":
-                content = "快手_推荐榜_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnICEfaw9llDNQkKgdymM1xf?sheet=3cd128"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "xinxin")) + "></at>\n"
-
-            elif crawler == "ssnnyfq":
-                content = "岁岁年年迎福气_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnyJmJSJynHDLLbLTkySfvZe?sheet=290bae"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "luojunhui")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "xinxin")) + "></at>\n"
-
-            elif crawler == "kdjsfq":
-                content = "看到就是福气_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnEokBkIjOUPAk8vbbPKnXgb?sheet=ad3b6d"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "xinxin")) + "></at>\n"
-
-            elif crawler == "gzh":
-                content = "公众号爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnexNXnpDLHhARw0QdiwbYuA"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "huxinxue")) + "></at>\n"
-
-            elif crawler == "gongzhonghao":
-                content = "公众号_信欣_爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcna98M2mX7TbivTj9Sb7WKBN?"
-                users = f"\n<at id={str(cls.get_userid(log_type, crawler, 'fanjun'))}></at> <at id={str(cls.get_userid(log_type, crawler, 'wangxueke'))}></at> <at id={str(cls.get_userid(log_type, crawler, 'luojunhui'))}></at>\n"
-
-            elif crawler == "weiqun":
-                content = "微群爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnoKThNquYRweaylMFVyo9Hc"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "xinxin")) + "></at>\n"
-
-            elif crawler == "weishi":
-                content = "微视爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcn5YSWg91JfVGzj0SFZIRRPh"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "xinxin")) + "></at>\n"
-
-            elif crawler == "shipinhao_recommend":
-                content = "视频号_推荐_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcn9rOdZRAGFbRkWpn7hqEHGc?sheet=c77cf9"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-            elif crawler == "shipinhao_follow":
-                content = "视频号_定向_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcn9rOdZRAGFbRkWpn7hqEHGc?sheet=KsVtLe"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-            elif crawler == "youtube":
-                content = "youtube_定向_已下载表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnrLyr1zbYbhhZyqpN7Xrd5f?sheet=GVxlYk"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "wuchaoyue")) + "></at>\n"
-
-            elif crawler == "zongjiao":
-                content = "宗教公众号爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcn73NW0CyoOeF21HWO15KBsb"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at> <at id=" + str(
-                    cls.get_userid(log_type, crawler, "huxinxue")) + "></at>\n"
-
-            else:
-                content = "小年糕爬虫表"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/shtcnYxiyQ1wLklo1W5Kdqc9cGh"
-                users = "\n<at id=" + str(cls.get_userid(log_type, crawler, "wangkun")) + "></at>\n"
-
-            data = json.dumps({
-                "msg_type": "interactive",
-                "card": {
-                    "config": {
-                        "wide_screen_mode": True,
-                        "enable_forward": True
-                    },
-                    "elements": [{
-                        "tag": "div",
-                        "text": {
-                            "content": users + text,
-                            "tag": "lark_md"
-                        }
-                    }, {
-                        "actions": [{
-                            "tag": "button",
-                            "text": {
-                                "content": content,
-                                "tag": "lark_md"
-                            },
-                            "url": sheet_url,
-                            "type": "default",
-                            "value": {}
-                        }],
-                        "tag": "action"
-                    }],
-                    "header": {
-                        "title": {
-                            "content": "📣您有新的信息,请注意查收",
-                            "tag": "plain_text"
-                        }
-                    }
-                }
-            })
-            urllib3.disable_warnings()
-            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
-            Local.logger(log_type, crawler).info(f'触发机器人消息:{r.status_code}, {text}')
-        except Exception as e:
-            Local.logger(log_type, crawler).error(f"bot异常:{e}\n")
-
-
-if __name__ == "__main__":
-    Feishu.bot('recommend', 'xigua', '测试: 西瓜推荐,登录失效')
-    # print(Feishu.get_userid('bot', 'weixinzhishu', 'wangkun'))
-    # print(Feishu.get_userid('bot', 'weixinzhishu', 'yuzhuoyi'))

+ 0 - 20
core/utils/feishu/feishu_data.py

@@ -1,20 +0,0 @@
-from core.utils.feishu.feishu_utils import FeishuUtils
-
-
-class FsData:
-
-    def get_title_rule(self):
-        summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "BS9uyu")
-        for row in summary[1:]:
-            title_rule = row[0]
-            if title_rule:
-                return title_rule
-            else:
-                return None
-        return None
-
-
-if __name__ == '__main__':
-    data_rule = FsData()
-    title_rule = data_rule.get_title_rule()
-    print(title_rule)

+ 0 - 53
core/utils/feishu/feishu_insert.py

@@ -1,53 +0,0 @@
-"""
-feishu python方法
-"""
-
-import requests
-
-
-def get_app_token():
-    """
-    获取飞书api token
-    :return:
-    """
-    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
-    post_data = {
-        "app_id": "cli_a51114cf8bf8d00c",  # 这里账号密码是发布应用的后台账号及密码
-        "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
-    }
-    response = requests.request("POST", url=url, data=post_data)
-    tenant_access_token = response.json()["tenant_access_token"]
-    return tenant_access_token
-
-
-class FeishuInsert(object):
-    """
-    feishu Python Object
-    """
-
-    def __init__(self, document_token):
-        self.headers = {"Content-Type": "application/json"}
-        self.document_token = document_token
-
-    def insert_value(self, sheet_id, ranges, values):
-        """
-        在表的某一个sheet的ranges中插入数据,若该地方存在数据,会自动把已有的数据往下移动,再写如数据
-        :param sheet_id: 飞书表的唯一ID
-        :param ranges: 单元格位置的range, 从左上角到右下角, 两边都是闭区间
-        :param values: 二维数组, 用于填充ranges的空格数组
-        """
-        insert_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{}/values_prepend".format(
-            self.document_token)
-        # print(get_app_token())
-        headers = {
-            "Authorization": "Bearer " + get_app_token(),
-            'contentType': 'application/json; charset=utf-8'
-        }
-        body = {
-            "valueRange": {
-                "range": "{}!{}".format(sheet_id, ranges),
-                "values": values
-            }
-        }
-        response = requests.request("POST", url=insert_value_url, headers=headers, json=body)
-        print(response.json())

+ 0 - 397
core/utils/feishu/feishu_utils.py

@@ -1,397 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time: 2023/12/26
-"""
-飞书表配置: token 鉴权 / 增删改查 / 机器人报警
-"""
-import json
-import os
-import sys
-
-import requests
-import urllib3
-from loguru import logger
-
-sys.path.append(os.getcwd())
-
-proxies = {"http": None, "https": None}
-
-
-class FeishuUtils:
-    """
-    编辑飞书云文档
-    """
-    succinct_url = "https://w42nne6hzg.feishu.cn/sheets/"
-
-    # 飞书路径token
-    @classmethod
-    def spreadsheettoken(cls, crawler):
-        if crawler == "summary":
-            return "KsoMsyP2ghleM9tzBfmcEEXBnXg"
-        else:
-            return crawler
-
-    # 获取飞书api token
-    @classmethod
-    def get_token(cls):
-        """
-        获取飞书api token
-        :return:
-        """
-        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
-        post_data = {"app_id": "cli_a13ad2afa438d00b",  # 这里账号密码是发布应用的后台账号及密码
-                     "app_secret": "4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"}
-        urllib3.disable_warnings()
-        response = requests.post(url=url, data=post_data, proxies=proxies, verify=False)
-        tenant_access_token = response.json()["tenant_access_token"]
-        return tenant_access_token
-
-    # 获取表格元数据
-    @classmethod
-    def get_metainfo(cls, crawler):
-        """
-        获取表格元数据
-        :return:
-        """
-        try:
-            get_metainfo_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                               + cls.spreadsheettoken(crawler) + "/metainfo"
-
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            params = {
-                "extFields": "protectedRange",  # 额外返回的字段,extFields=protectedRange时返回保护行列信息
-                "user_id_type": "open_id"  # 返回的用户id类型,可选open_id,union_id
-            }
-            urllib3.disable_warnings()
-            r = requests.get(url=get_metainfo_url, headers=headers, params=params, proxies=proxies, verify=False)
-            response = json.loads(r.content.decode("utf8"))
-            return response
-        except Exception as e:
-            logger.error("获取表格元数据异常:{}", e)
-
-    # 读取工作表中所有数据
-    @classmethod
-    def get_values_batch(cls, crawler, sheetid):
-        """
-        读取工作表中所有数据
-        :param crawler: 哪个爬虫
-        :param sheetid: 哪张表
-        :return: 所有数据
-        """
-
-        get_values_batch_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                               + cls.spreadsheettoken(crawler) + "/values_batch_get"
-        headers = {
-            "Authorization": "Bearer " + cls.get_token(),
-            "Content-Type": "application/json; charset=utf-8"
-        }
-        params = {
-            "ranges": sheetid,
-            "valueRenderOption": "ToString",
-            "dateTimeRenderOption": "",
-            "user_id_type": "open_id"
-        }
-        urllib3.disable_warnings()
-        r = requests.get(url=get_values_batch_url, headers=headers, params=params, proxies=proxies, verify=False)
-        response = json.loads(r.content.decode("utf8"))
-        values = response["data"]["valueRanges"][0]["values"]
-        return values
-
-    # 工作表,插入行或列
-    @classmethod
-    def insert_columns(cls, crawler, sheetid, majordimension, startindex, endindex):
-        """
-        工作表插入行或列
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫的云文档
-        :param sheetid:哪张工作表
-        :param majordimension:行或者列, ROWS、COLUMNS
-        :param startindex:开始位置
-        :param endindex:结束位置
-        """
-        try:
-            insert_columns_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                 + cls.spreadsheettoken(crawler) + "/insert_dimension_range"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            body = {
-                "dimension": {
-                    "sheetId": sheetid,
-                    "majorDimension": majordimension,  # 默认 ROWS ,可选 ROWS、COLUMNS
-                    "startIndex": startindex,  # 开始的位置
-                    "endIndex": endindex  # 结束的位置
-                },
-                "inheritStyle": "AFTER"  # BEFORE 或 AFTER,不填为不继承 style
-            }
-
-            urllib3.disable_warnings()
-            r = requests.post(url=insert_columns_url, headers=headers, json=body, proxies=proxies, verify=False)
-        except Exception as e:
-            logger.error("插入行或列异常:{}", e)
-
-    # 写入数据
-    @classmethod
-    def update_values(cls, crawler, sheetid, ranges, values):
-        """
-        写入数据
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫的云文档
-        :param sheetid:哪张工作表
-        :param ranges:单元格范围
-        :param values:写入的具体数据,list
-        """
-        try:
-            update_values_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                + cls.spreadsheettoken(crawler) + "/values_batch_update"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            body = {
-                "valueRanges": [
-                    {
-                        "range": sheetid + "!" + ranges,
-                        "values": values
-                    },
-                ],
-            }
-            urllib3.disable_warnings()
-            r = requests.post(url=update_values_url, headers=headers, json=body, proxies=proxies, verify=False)
-        except Exception as e:
-            logger.error("写入数据异常:{}", e)
-
-    # 合并单元格
-    @classmethod
-    def merge_cells(cls, crawler, sheetid, ranges):
-        """
-        合并单元格
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫
-        :param sheetid:哪张工作表
-        :param ranges:需要合并的单元格范围
-        """
-        try:
-            merge_cells_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                              + cls.spreadsheettoken(crawler) + "/merge_cells"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-
-            body = {
-                "range": sheetid + "!" + ranges,
-                "mergeType": "MERGE_ROWS"
-            }
-            urllib3.disable_warnings()
-            r = requests.post(url=merge_cells_url, headers=headers, json=body, proxies=proxies, verify=False)
-        except Exception as e:
-            logger.error("合并单元格异常:{}", e)
-
-    # 读取单元格数据
-    @classmethod
-    def get_range_value(cls, crawler, sheetid, cell):
-        """
-        读取单元格内容
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫
-        :param sheetid: 哪张工作表
-        :param cell: 哪个单元格
-        :return: 单元格内容
-        """
-        try:
-            get_range_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                  + cls.spreadsheettoken(crawler) + "/values/" + sheetid + "!" + cell
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            params = {
-                "valueRenderOption": "FormattedValue",
-
-                # dateTimeRenderOption=FormattedString 计算并对时间日期按照其格式进行格式化,但不会对数字进行格式化,返回格式化后的字符串。
-                "dateTimeRenderOption": "",
-
-                # 返回的用户id类型,可选open_id,union_id
-                "user_id_type": "open_id"
-            }
-            urllib3.disable_warnings()
-            r = requests.get(url=get_range_value_url, headers=headers, params=params, proxies=proxies, verify=False)
-            # logger.error(r.text)
-            return r.json()["data"]["valueRange"]["values"][0]
-        except Exception as e:
-            logger.error("读取单元格数据异常:{}", e)
-
-    # 获取表内容
-    @classmethod
-    def get_sheet_content(cls, crawler, sheet_id):
-        try:
-            sheet = Feishu.get_values_batch(crawler, sheet_id)
-            content_list = []
-            for x in sheet:
-                for y in x:
-                    if y is None:
-                        pass
-                    else:
-                        content_list.append(y)
-            return content_list
-        except Exception as e:
-            logger.error(f'get_sheet_content:{e}\n')
-
-    # 删除行或列,可选 ROWS、COLUMNS
-    @classmethod
-    def dimension_range(cls, log_type, crawler, sheetid, major_dimension, startindex, endindex):
-        """
-        删除行或列
-        :param log_type: 日志路径
-        :param crawler: 哪个爬虫
-        :param sheetid:工作表
-        :param major_dimension:默认 ROWS ,可选 ROWS、COLUMNS
-        :param startindex:开始的位置
-        :param endindex:结束的位置
-        :return:
-        """
-        try:
-            dimension_range_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
-                                  + cls.spreadsheettoken(crawler) + "/dimension_range"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            body = {
-                "dimension": {
-                    "sheetId": sheetid,
-                    "majorDimension": major_dimension,
-                    "startIndex": startindex,
-                    "endIndex": endindex
-                }
-            }
-            urllib3.disable_warnings()
-            r = requests.delete(url=dimension_range_url, headers=headers, json=body, proxies=proxies, verify=False)
-        except Exception as e:
-            logger.error("删除视频数据异常:{}", e)
-
-    # 获取用户 ID
-    @classmethod
-    def get_userid(cls, username):
-        try:
-            url = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?"
-            headers = {
-                "Authorization": "Bearer " + cls.get_token(),
-                "Content-Type": "application/json; charset=utf-8"
-            }
-            name_phone_dict = {
-                "xinxin": "15546206651",
-                "muxinyi": "13699208058",
-                "wangxueke": "13513479926",
-                "yuzhuoyi": "18624010360",
-                "luojunhui": "18801281360",
-                "fanjun": "15200827642",
-                "zhangyong": "17600025055",
-                'liukunyu': "18810931977"
-            }
-            username = name_phone_dict.get(username)
-
-            data = {"mobiles": [username]}
-            urllib3.disable_warnings()
-            r = requests.get(url=url, headers=headers, params=data, verify=False, proxies=proxies)
-            open_id = r.json()["data"]["mobile_users"][username][0]["open_id"]
-
-            return open_id
-        except Exception as e:
-            pass
-            # logger.error(f"get_userid异常:{e}\n")
-
-    # 飞书机器人
-    @classmethod
-    def bot(cls, log_type, crawler, text, mark_name):
-        try:
-
-            headers = {'Content-Type': 'application/json'}
-            if crawler == "机器自动改造消息通知":
-                url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d"
-                users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
-            elif crawler == "快手关键词搜索":
-                url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=U1gySe"
-                users = "".join([f'<at id="{cls.get_userid(type)}">{name}</at>' for type, name in
-                                 zip(log_type, mark_name)])
-                # users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
-            else:
-                url = "https://open.feishu.cn/open-apis/bot/v2/hook/7928f182-08c1-4c4d-b2f7-82e10c93ca80"
-                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d"
-                users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
-            data = json.dumps({
-                "msg_type": "interactive",
-                "card": {
-                    "config": {
-                        "wide_screen_mode": True,
-                        "enable_forward": True
-                    },
-                    "elements": [{
-                        "tag": "div",
-                        "text": {
-                            "content": users + text,
-                            "tag": "lark_md"
-                        }
-                    }, {
-                        "actions": [{
-                            "tag": "button",
-                            "text": {
-                                "content": "详情,点击~~~~~",
-                                "tag": "lark_md"
-                            },
-                            "url": sheet_url,
-                            "type": "default",
-                            "value": {}
-                        }],
-                        "tag": "action"
-                    }],
-                    "header": {
-                        "title": {
-                            "content": "📣消息提醒",
-                            "tag": "plain_text"
-                        }
-                    }
-                }
-            })
-            urllib3.disable_warnings()
-            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
-        except Exception as e:
-            logger.error(f"bot异常:{e}\n")
-
-    # 飞书机器人-改造计划完成通知
-    @classmethod
-    def finish_bot(cls, text, url, content):
-        try:
-            headers = {'Content-Type': 'application/json'}
-            data = json.dumps({
-                "msg_type": "interactive",
-                "card": {
-                    "config": {
-                        "wide_screen_mode": True,
-                        "enable_forward": True
-                    },
-                    "elements": [{
-                        "tag": "div",
-                        "text": {
-                            "content": text,
-                            "tag": "lark_md"
-                        }
-                    }],
-                    "header": {
-                        "title": {
-                            "content": content,
-                            "tag": "plain_text"
-                        }
-                    }
-                }
-            })
-            urllib3.disable_warnings()
-            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
-        except Exception as e:
-            logger.error(f"bot异常:{e}\n")

+ 156 - 0
core/utils/feishu_data_async.py

@@ -0,0 +1,156 @@
+import asyncio
+import time
+
+import aiohttp
+import json
+from typing import List, Dict, Any, Optional, Union
+from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
+from config import settings
+
+class FeishuDataAsync:
+    """飞书电子表格  异步操作工具类"""
+
+    def __init__(self, base_url: str = "https://open.feishu.cn"):
+        """
+        初始化飞书电子表格 V3 客户端
+
+        :param app_id: 飞书应用ID
+        :param app_secret: 飞书应用Secret
+        :param base_url: 飞书开放平台基础URL
+        """
+        self.app_id = settings.FEISHU_APPID
+        self.app_secret = settings.FEISHU_APPSECRET
+        self.base_url = base_url
+        self.access_token = ""
+        self.token_expire_time = 0
+        self.session = None
+
+    async def __aenter__(self):
+        """异步上下文管理器入口"""
+        self.session = aiohttp.ClientSession()
+        await self._get_access_token()
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        """异步上下文管理器出口"""
+        if self.session:
+            await self.session.close()
+
+    @retry(
+        stop=stop_after_attempt(3),
+        wait=wait_fixed(2),
+        retry=retry_if_exception_type(Exception)
+    )
+
+    async def _get_access_token(self) -> str:
+        """异步获取访问令牌"""
+        now = int(time.time())
+        if self.access_token and now < self.token_expire_time - 60:
+            return self.access_token
+
+        url = f"{self.base_url}/open-apis/auth/v3/tenant_access_token/internal"
+        payload = {"app_id": self.app_id, "app_secret": self.app_secret}
+
+        async with self.session.post(url, json=payload,ssl=False) as response:
+            if response.status != 200:
+                error_text = await response.text()
+                raise Exception(f"获取访问令牌失败: {error_text}")
+
+            result = await response.json()
+            if result.get("code") != 0:
+                raise Exception(f"获取访问令牌失败: {result.get('msg')}")
+
+            self.access_token = result.get("tenant_access_token", "")
+            self.token_expire_time = now + result.get("expire", 7200)
+            return self.access_token
+
+    async def get_values(
+            self,
+            spreadsheet_token: str,
+            sheet_id: str,
+    ) -> List[List[Any]]:
+        """
+        异步获取电子表格数据(V3 版本接口)
+
+        :param spreadsheet_token: 电子表格token
+        :param sheet_id: 工作表ID
+        :param range_str: 数据范围(如"A1:C10")
+        :param value_render_option: 值渲染选项
+        :param date_time_render_option: 日期时间渲染选项
+        :return: 表格数据二维列表
+        """
+        access_token = await self._get_access_token()
+        url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
+
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json; charset=utf-8"
+        }
+
+        params = {
+            "ranges": sheet_id,
+            "valueRenderOption": "ToString",
+            "dateTimeRenderOption": "",
+            "user_id_type": "open_id"
+        }
+
+        async with self.session.get(url, headers=headers, params=params,ssl=False) as response:
+            if response.status != 200:
+                error_text = await response.text()
+                raise Exception(f"获取表格数据失败: {error_text}")
+
+            result = await response.json()
+            if result.get("code") != 0:
+                raise Exception(f"获取表格数据失败: {result.get('msg')}")
+
+            return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
+
+    async def insert_values(
+            self,
+            spreadsheet_token: str,
+            sheet_id: str,
+            ranges: str,
+            values: List[Any],
+    ) -> Dict[str, Any]:
+        """
+        异步更新电子表格数据
+
+        :param spreadsheet_token: 电子表格token
+        :param sheet_id: 工作表ID
+        :param ranges: 数据范围(如"A1:C10")
+        :param values: 要更新的数据列表
+        :return: 更新结果
+        """
+        access_token = await self._get_access_token()
+        url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_prepend"
+
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json; charset=utf-8"
+        }
+
+
+        payload ={
+                "valueRange":
+                    {
+                        "range": f"{sheet_id}!{ranges}",
+                        "values": [values]
+                    },
+
+            }
+        print(payload)
+
+        async with self.session.post(url, headers=headers, json=payload,ssl=False) as response:
+            print(response)
+            if response.status != 200:
+                error_text = await response.text()
+            if response.status != 200:
+                error_text = await response.text()
+                raise Exception(f"更新表格数据失败: {error_text}")
+
+            result = await response.json()
+            if result.get("code") != 0:
+                raise Exception(f"更新表格数据失败: {result.get('msg')}")
+
+            return result.get("data", {})
+

+ 0 - 1
core/utils/gpt/__init__.py

@@ -1 +0,0 @@
-from .gpt4o_mini_help import GPT4oMini

+ 20 - 15
core/utils/gpt/gpt4o_mini_help.py → core/utils/gpt4o_mini_help.py

@@ -1,12 +1,11 @@
 import json
-
-import requests
+import asyncio
+import aiohttp
 
 
 class GPT4oMini:
-
     @classmethod
-    def get_ai_mini_title(cls, title):
+    async def get_ai_mini_title(cls, title):
         url = "http://aigc-api.cybertogether.net//aigc/dev/test/gpt"
         payload = json.dumps({
             "imageList": [],
@@ -24,7 +23,7 @@ class GPT4oMini:
 
                 "视频的原标题:“哇!好美的一个视频,发给您也看看!”、“晚上好,这也太美啦,发给大家一起欣赏欣赏。”、“____这段话说得真好,一起听听!每句话都很有道快分享给群友看看吧!”、“👈这段话说的真好,值得一听”、“🔴世界顶尖雪雕❗ 太真实了,太美了!忍不住发给你看看!”、“💖《等》说得真好,看看吧...”、“🔴这样的萌娃你们喜欢吗,都看看吧!”、“🔴2025金蛇纳福,这首歌送给全体群友,祝大家财运亨通永不断!”、“🔴元旦青蛇遇双春,这三件事千万别做,都看看吧!”、“💕呵呵太搞笑了!老师和家长的对话!值得一看!绝了!”、“❤️《中国知识大全》太珍贵了!值得我们每个中国人都看看!”、“六岁小女孩一首《爸》全场泪奔”、“🔴酒店招牌菜,菠菜炒鸡蛋的家常做法,快来学学!”、“这个视频,分享给我的老友,祝愿您能幸福安康”"
 
-                "请务必严格遵守上述生成规则,为原标题生成对应的新标题。"
+                f"请务必严格遵守上述生成规则,为原标题生成对应的新标题。"
                 f"请分析该标题,标题为:{title},返回新的标题。"
             ),
             "responseFormat": {
@@ -47,17 +46,23 @@ class GPT4oMini:
             }
         })
         headers = {'Content-Type': 'application/json'}
-        try:
-            response = requests.post(url, headers=headers, data=payload)
-            response_data = response.json()
 
-            data = json.loads(response_data.get('data', '{}'))
-            new_title = data["新标题"]
-            return new_title
-        except Exception as e:
-            return None
+        async with aiohttp.ClientSession() as session:
+            try:
+                async with session.post(url, headers=headers, data=payload) as response:
+                    response_data = await response.json()
+                    data = json.loads(response_data.get('data', '{}'))
+                    new_title = data["新标题"]
+                    return new_title
+            except Exception as e:
+                return None
 
 
 if __name__ == '__main__':
-    title = GPT4oMini.get_ai_mini_title("🔴这位美女说的太好了!这就是我们的大中国")
-    print(title)
+    # 异步调用示例
+    async def test():
+        title = await GPT4oMini.get_ai_mini_title("🔴这位美女说的太好了!这就是我们的大中国")
+        print(title)
+
+
+    asyncio.run(test())

+ 50 - 0
core/utils/helpers.py

@@ -0,0 +1,50 @@
+import asyncio
+
+from typing import List, Dict
+
+from datetime import datetime
+from core.utils.feishu_data_async import FeishuDataAsync
+from core.utils.gpt4o_mini_help import GPT4oMini
+
+
+async def get_title_filter_word() -> List[str]:
+    """
+    获取飞书表配置的标题过滤词
+    https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=BS9uyu
+    :return:
+    """
+    spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg"
+    sheet_id = "BS9uyu"
+    async with FeishuDataAsync() as feishu:
+        feishu_data = await feishu.get_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id)
+        return feishu_data[1]
+
+async def generate_titles(sheet_id: str,video_obj: Dict):
+    title_list = await get_title_filter_word()
+    title = video_obj.get("title")
+    if not title:
+        return
+    contains_keyword = any(keyword in title for keyword in title_list)
+    if contains_keyword:
+        new_title = await GPT4oMini.get_ai_mini_title(title)
+        current_time = datetime.now()
+        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+        values = [
+                video_obj["v_url"],
+                video_obj["url"],
+                title,
+                new_title,
+                formatted_time,
+        ]
+        await insert_feishu_data(sheet_id, values)
+
+async def insert_feishu_data(sheet_id: str,values: List):
+    spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg"
+    async with FeishuDataAsync() as feishu:
+        await feishu.insert_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id,ranges="A2:Z2",values=values)
+
+
+
+if __name__ == '__main__':
+     filter_word = asyncio.run(insert_feishu_data())
+     print(filter_word)

+ 1 - 1
core/utils/log/aliyun_log.py

@@ -7,7 +7,7 @@ import time
 from aliyun.log import LogClient, PutLogsRequest, LogItem
 
 from config import settings
-from core.utils.log.log_codes import LOG_CODES
+from core.utils.log.log_codes import CODES
 
 proxies = {"http": None, "https": None}
 

+ 9 - 1
core/utils/log/log_codes.py

@@ -24,6 +24,7 @@ CODES = {
     "1008": "规则匹配成功",
     "1009": "成功发送至ETL",
     "1010": "任务执行完成",
+    "1011": "视频数量达到当日最大值",
 
     # 调度监控 (15xx)
     "1500": "主进程启动成功",
@@ -33,6 +34,13 @@ CODES = {
     "1504": "进程监听循环启动",
     "1505": "子进程分配完成",
 
+    # 系统监控(16XX)
+    "1600": "系统收到停止信号",
+    "1601": "系统正在取消所有消费任务",
+    "1602": "所有任务已退出",
+    "1603": "任务退出异常",
+
+
     # 可恢复配置错误 (4xxx)
     "4000": "爬虫配置缺失",
     "4001": "ETL配置缺失",
@@ -45,7 +53,7 @@ CODES = {
     "9003": "视频处理失败",
     "9004": "推送ETL失败",
     "9005": "爬虫致命错误退出",
-    "9006": "请求异常重试",
+    "9006": "请求异常达到最大重试次数",
     "9007": "字段缺失校验失败",
     "9008": "标题不符合规则",
     "9009": "发布时间不符合规则",

+ 10 - 4
core/utils/log/logger_manager.py

@@ -1,3 +1,5 @@
+from typing import Any
+
 from core.utils.log.aliyun_log import AliyunLogger
 from core.utils.log.local_log import Local
 from loguru._logger import Logger as LoguruLogger
@@ -12,8 +14,8 @@ class LoggerManager:
 
     @staticmethod
     def get_logger(
-        platform: str = "system",
-        mode: str = "crawler",
+        platform: str,
+        mode: str,
         log_to_console: bool = False
     ) -> LoguruLogger:
         key = f"{platform}_{mode}"
@@ -27,10 +29,14 @@ class LoggerManager:
 
     @staticmethod
     def get_aliyun_logger(
-        platform: str = "system",
-        mode: str = "crawler",
+        platform: str,
+        mode: str,
         env: str = "prod"
     ) -> AliyunLogger:
+        """
+
+        :rtype: AliyunLogger
+        """
         key = f"{platform}_{mode}"
         if key not in LoggerManager._aliyun_loggers:
             LoggerManager._aliyun_loggers[key] = AliyunLogger(

+ 1 - 1
main.py

@@ -5,12 +5,12 @@ from typing import Dict
 from core.utils.log.logger_manager import LoggerManager
 from scheduler.process_manager import split_topics, start_worker_process
 from spiders.spider_registry import SPIDER_CLASS_MAP
+from core.utils import CODES
 
 
 
 def main():
     logger = LoggerManager.get_logger()
-
     aliyun_log = LoggerManager.get_aliyun_logger()
     """
     主调度入口:

+ 41 - 15
scheduler/async_consumer.py

@@ -9,6 +9,8 @@ from core.utils.trace_utils import generate_trace_id
 from services.async_mysql_service import AsyncMysqlService
 from spiders.spider_registry import get_spider_class
 
+logger = LoggerManager.get_logger()
+aliyun_logger = LoggerManager.get_aliyun_logger()
 
 async def async_handle_topic(topic: str,stop_event: asyncio.Event):
     """
@@ -18,11 +20,10 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
     - 使用异步数据库服务查询配置;
     - 记录日志、确认消息。
     """
-    logger = LoggerManager.get_logger()
-    aliyun_logger = LoggerManager.get_aliyun_logger()
+
 
     # 每个 topic 创建独立的 consumer 实例
-    from services.rocketmq_consumer import AsyncRocketMQConsumer
+    from services.async_mq_consumer import AsyncRocketMQConsumer
 
     consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
 
@@ -33,7 +34,14 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
             task_id = payload["id"]
 
             logger.info(f"{trace_id} - 接收到任务消息: {task_id}")
-            async with AsyncMysqlService("system", "crawler") as mysql:
+            aliyun_logger.logging(
+                code="1000",
+                message="任务接收成功",
+                data=payload,
+                trace_id=trace_id,
+                account=topic
+            )
+            async with AsyncMysqlService() as mysql:
                 user_list = await mysql.get_user_list(task_id)
                 rule_dict = await mysql.get_rule_dict(task_id)
 
@@ -50,26 +58,29 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
 
             logger.info(f"{trace_id} - 任务 {task_id} 执行成功并已 Ack")
             aliyun_logger.logging(
-                code="2000",
+                code="1010",
                 message="任务执行成功",
                 trace_id=trace_id,
                 data={
                     "task_id": task_id,
                     "topic": topic
-                }
+                },
+                account=topic
+
             )
 
         except Exception as e:
-            logger.error(f"{trace_id} - 任务处理失败: {e} /n traceback.format_exc()")
+            logger.error(f"{trace_id} - 任务处理失败: {e} /n {traceback.format_exc()}")
             aliyun_logger.logging(
                 code="9001",
-                message=f"处理消息失败: {str(e)}",
+                message=f"处理消息失败: {str(e)}  /n {traceback.format_exc()}",
                 trace_id=trace_id,
                 data={
                     "error_type": type(e).__name__,
                     "stack_trace": traceback.format_exc(),
                     "message_body": message.message_body
-                }
+                },
+                account=topic
             )
         # 自动重启消费循环
         while not stop_event.is_set():
@@ -82,7 +93,8 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
                     data={
                         "error_type": type(e).__name__,
                         "stack_trace": traceback.format_exc(),
-                    }
+                    },
+                    account=topic
                 )
                 logger.warning(f"[{topic}] 消费循环异常: {e},5秒后重启")
                 await asyncio.sleep(5)
@@ -93,7 +105,11 @@ async def run_all_topics(topics: List[str]):
     loop = asyncio.get_running_loop()
 
     def shutdown():
-        print("[系统] 收到停止信号,准备优雅退出...")
+        logger.warning("[系统] 收到停止信号,准备优雅退出...")
+        aliyun_logger.logging(
+            code="1600",
+            message="[系统] 收到停止信号,准备优雅退出...",
+        )
         stop_event.set()
 
     for sig in [signal.SIGINT, signal.SIGTERM]:
@@ -103,7 +119,12 @@ async def run_all_topics(topics: List[str]):
 
     await stop_event.wait()  # 等待停止信号
 
-    print("[系统] 正在取消所有消费任务...")
+    logger.warning(f"[系统] 正在取消所有消费任务...{tasks}")
+    aliyun_logger.logging(
+        code="1601",
+        message="[系统] 收到停止信号,准备优雅退出...",
+        data=f"任务列表{tasks}"
+    )
     for task in tasks:
         task.cancel()
 
@@ -111,9 +132,14 @@ async def run_all_topics(topics: List[str]):
 
     for idx, result in enumerate(results):
         if isinstance(result, Exception):
-            print(f"[系统] 任务 {topics[idx]} 异常退出: {result}")
-
-    print("[系统] 所有任务已退出,进程关闭")
+            logger.error(f"[系统] 任务 {topics[idx]} 异常退出: {result}")
+
+    logger.warning(f"[系统] 所有任务已退出,进程已关闭...")
+    aliyun_logger.logging(
+        code="1602",
+        message="[系统] 所有任务已退出,进程已关闭...",
+        data=f"任务列表{tasks}"
+    )
 
 
 def handle_topic_group(topics: List[str]):

+ 5 - 0
scheduler/process_manager.py

@@ -24,3 +24,8 @@ def start_worker_process(group_id: int, topic_group: List[str], process_map: Dic
     p.start()
     process_map[group_id] = p
     logger.info(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")
+    aliyun_log.logging(
+        code="1500",
+        message=f"[主进程] 启动进程 PID={p.pid}",
+        data=f"处理 topics={topic_group}"
+    )

+ 0 - 0
services/rocketmq_consumer.py → services/async_mq_consumer.py


+ 65 - 0
services/async_mq_producer.py

@@ -0,0 +1,65 @@
+import json
+import asyncio
+import traceback
+from mq_http_sdk.mq_exception import MQExceptionBase
+from mq_http_sdk.mq_producer import TopicMessage
+from mq_http_sdk.mq_client import MQClient
+from config import settings
+from core.utils.log.logger_manager import LoggerManager
+
+class AsyncMQProducer:
+    """
+    异步 MQ 推送封装类
+    """
+    instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
+
+    def __init__(self, topic_name: str, platform: str, mode: str) -> None:
+        self.mq_client = MQClient(
+            host=settings.ROCKETMQ_ENDPOINT,
+            access_id=settings.ROCKETMQ_ACCESS_KEY_ID,
+            access_key=settings.ROCKETMQ_ACCESS_KEY_SECRET,
+        )
+        self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
+        self.platform = platform
+        self.mode = mode
+        self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform,mode=mode)
+        self.loger = LoggerManager.get_logger(platform=platform,mode=mode)
+
+
+    async def send_msg(self, video_dict: dict, max_retries: int = 3):
+        """
+        异步发送 MQ 消息,自动重试并记录日志
+        """
+        for retry in range(max_retries):
+            try:
+                # 放入线程池中执行同步 publish_message,避免阻塞事件循环
+                msg = TopicMessage(json.dumps(video_dict))
+                message_key = "{}-{}-{}".format(self.platform, self.mode, video_dict['out_video_id'])
+                msg.set_message_key(message_key)
+
+                loop = asyncio.get_running_loop()
+                re_msg = await loop.run_in_executor(
+                    None,
+                    lambda: self.producer.publish_message(msg)
+                )
+
+                self.loger.info(
+                    f"Publish Message Succeed. MessageID:{re_msg.message_id}, BodyMD5:{re_msg.message_body_md5}"
+                )
+                return True
+
+            except MQExceptionBase as e:
+                tb = traceback.format_exc()
+                if retry == max_retries - 1:
+                    self.loger.error(
+                        f"Publish Message Fail after {max_retries} attempts. Exception: {e}\n{tb}"
+                    )
+                    self.aliyun_log.logging(
+                        code="5005",
+                        message=f"Publish Message Fail after {max_retries} attempts. Exception: {e}",
+                        data=tb
+                    )
+                else:
+                    await asyncio.sleep(2 ** retry)  # 退避重试
+
+        return False

+ 3 - 2
services/clean_title.py

@@ -1,6 +1,7 @@
-async def clean_title(strings):
+import asyncio
+async def clean_title(title):
     return (
-        strings.strip()
+        title.strip()
         .replace("\n", "")
         .replace("/", "")
         .replace("\r", "")

+ 2 - 2
services/pipeline.py

@@ -4,7 +4,7 @@ import sys
 import time
 from datetime import datetime
 
-from core.utils.feishu.feishu_utils import FeishuUtils
+from core.utils.feishu_data_async import FeishuDataAsync
 from core.utils.log.logger_manager import LoggerManager
 from services.async_mysql_service import AsyncMysqlService
 
@@ -31,7 +31,7 @@ class PiaoQuanPipeline:
 
     async def feishu_time_list(self):
         """从飞书读取天数配置"""
-        summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
+        summary = await FeishuDataAsync.get_values("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
         for row in summary[1:]:
             if row[0] == self.platform:
                 return row[1]

+ 24 - 11
spiders/base_spider.py

@@ -9,12 +9,14 @@ from typing import List, Dict, Optional
 import aiohttp
 
 from core.models.video_item import VideoItem
+from core.utils.helpers import generate_titles
 from core.utils.spider_config import SpiderConfig
 from core.utils.extractors import safe_extract, extract_fields
 from core.utils.log.logger_manager import LoggerManager
 from services.async_mysql_service import AsyncMysqlService
 from services.pipeline import PiaoQuanPipeline
 from core.base.async_request_client import AsyncRequestClient
+from services.async_mq_producer import AsyncMQProducer
 
 class BaseSpider(ABC):
     """
@@ -40,10 +42,9 @@ class BaseSpider(ABC):
         self.platform = self.platform_config.platform
         self.mode = self.platform_config.mode
         self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
-
         self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
         self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
-        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
+        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
 
         # 请求配置
         self.method = self.platform_config.method.upper()
@@ -62,12 +63,13 @@ class BaseSpider(ABC):
         self.loop_interval = self.platform_config.loop_interval  # 循环间隔(秒)
 
         self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
-        self.request_client = AsyncRequestClient(logger=self.logger)
+        self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
 
         self.logger.info(
             f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
 
         self.session = None
+        self.feishu_sheetid = self.platform_config.feishu_sheetid
 
 
     async def crawl_data(self,session) -> Optional[List[Dict]]:
@@ -104,8 +106,14 @@ class BaseSpider(ABC):
         async with AsyncMysqlService(self.platform, self.mode) as mysql:
             video_count = await mysql.get_today_videos()
         if video_count >= rule_videos_cnt.get("min", 200):
-            self.logger.info(f"{self.trace_id}--当日视频已达到最大量{video_count}")
+            self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}")
+            self.aliyun_logr.logging(
+                code="1011",
+                message=f"视频数量达到当日最大值",
+                data=f"<今日视频数量>{video_count}"
+            )
             return False
+        self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}")
         return True
 
     async def process_video(self, video: Dict) -> Optional[Dict]:
@@ -116,7 +124,7 @@ class BaseSpider(ABC):
         """
         self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
         publish_user = random.choice(self.user_list)
-        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id)
+        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr)
         item_kwargs["user_id"] = publish_user["uid"]
         item_kwargs["user_name"] = publish_user["nick_name"]
         item_kwargs["platform"] = self.platform
@@ -137,7 +145,7 @@ class BaseSpider(ABC):
         """推送数据到ETL(同步)"""
         self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
         try:
-            self.mq.send_msg(item)
+            await self.mq_producer.send_msg(item)
             self.aliyun_logr.logging(
                 code="1009",
                 message="成功发送至ETL",
@@ -158,12 +166,12 @@ class BaseSpider(ABC):
         video_count = await self.db_service.get_today_videos()
         return video_count
 
-    async def integrated_video_handling(self):
+    async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
         """
         视频处理
         :return:
         """
-        pass
+        await generate_titles(self.feishu_sheetid,video)
 
     async def run(self):
         """
@@ -179,7 +187,11 @@ class BaseSpider(ABC):
             loop_start_time = time.time()
             async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session:
                 for loop_index in range(1, self.loop_times + 1):
+                    # 判断当日视频数量已达到最大量
+                    if not await self.is_video_count_sufficient():
+                        return
                     self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
+                    # 请求视频列表
                     video_list = await self.crawl_data(session)
                     if not video_list:
                         self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
@@ -189,17 +201,18 @@ class BaseSpider(ABC):
                     fail_count = 0
 
                     for video in video_list:
+                        # 提取视频字段映射关系
                         video_obj = await self.process_video(video)
                         if not video_obj:
                             self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
                             fail_count += 1
                             continue
-
+                        # 视频过滤规则
                         if not await self.filter_data(video_obj):
                             self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
                             continue
-
-                        await self.integrated_video_handling()
+                        # 视频处理
+                        await self.integrated_video_handling(video_obj)
 
                         if await self.push_to_etl(video_obj):
                             success_count += 1

+ 21 - 0
tests/test_feishu.py

@@ -0,0 +1,21 @@
+import asyncio
+import random
+import string
+from datetime import datetime
+
+# 导入封装的异步工具类
+from core.utils.feishu_data_async import FeishuDataAsync  # 假设已保存为该文件名
+
+async def test_feishu():
+    async with FeishuDataAsync() as feishu:
+        spreadsheet_token = "ISFnspGcjhyxO6tj0fycbQARnUg"
+        sheet_id = "a44975"
+        valus =await feishu.get_values(spreadsheet_token=spreadsheet_token,sheet_id=sheet_id)
+
+        print(valus)
+        await feishu.insert_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id, ranges="A2:Z2", values=["2", "23333", "3", "4"])
+
+
+
+if __name__ == '__main__':
+    asyncio.run(test_feishu())