zhangliang 1 месяц назад
Родитель
Сommit
15b824f03b

+ 85 - 0
core/utils/feishu_data_async.py

@@ -105,6 +105,49 @@ class FeishuDataAsync:
 
             return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
 
+    async def get_values_v3(
+            self,
+            spreadsheet_token: str,
+            sheet_id: str,
+            range_str: str = None,
+    ) -> List[List[Any]]:
+        """
+        异步获取电子表格数据(V3 版本接口)
+
+        :param spreadsheet_token: 电子表格token
+        :param sheet_id: 工作表ID
+        :param range_str: 数据范围(如"A1:C10"或"A:B"),如果不指定则获取整个工作表
+        :param value_render_option: 值渲染选项
+        :param date_time_render_option: 日期时间渲染选项
+        :return: 表格数据二维列表
+        """
+        access_token = await self._get_access_token()
+        url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
+
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json; charset=utf-8"
+        }
+
+        full_range = f"{sheet_id}!{range_str}" if range_str else sheet_id
+        params = {
+            "ranges": full_range,
+            "valueRenderOption": "ToString",
+            "dateTimeRenderOption": "",
+            "user_id_type": "open_id"
+        }
+
+        async with self.session.get(url, headers=headers, params=params, ssl=False) as response:
+            if response.status != 200:
+                error_text = await response.text()
+                raise Exception(f"获取表格数据失败: {error_text}")
+
+            result = await response.json()
+            if result.get("code") != 0:
+                raise Exception(f"获取表格数据失败: {result.get('msg')}")
+
+            return result.get("data", {}).get("valueRanges", {})[0].get("values", [])
+
     async def insert_values(
             self,
             spreadsheet_token: str,
@@ -149,3 +192,45 @@ class FeishuDataAsync:
 
             return result.get("data", {})
 
+    async def append_values(
+            self,
+            spreadsheet_token: str,
+            sheet_id: str,
+            ranges: str,
+            values: List[Any],
+    ) -> Dict[str, Any]:
+        """
+        异步追加数据到电子表格末尾(正序写入)
+
+        :param spreadsheet_token: 电子表格token
+        :param sheet_id: 工作表ID
+        :param ranges: 数据范围(如"A2:H")
+        :param values: 要追加的数据列表,必须是二维数组 [['关键词', '日期', '总分', '公众号', '视频号', '搜一搜', '直播', '小程序']]
+        :return: 更新结果
+        """
+        access_token = await self._get_access_token()
+        url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_append"
+
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json; charset=utf-8"
+        }
+
+        payload = {
+            "valueRange": {
+                "range": f"{sheet_id}!{ranges}",
+                "values": values
+            }
+        }
+
+        async with self.session.post(url, headers=headers, json=payload, ssl=False) as response:
+            if response.status != 200:
+                error_text = await response.text()
+                raise Exception(f"追加表格数据失败: {error_text}")
+
+            result = await response.json()
+            if result.get("code") != 0:
+                raise Exception(f"追加表格数据失败: {result.get('msg')}")
+
+            return result.get("data", {})
+

+ 101 - 0
scripts/office/get_baidu_recommend.py

@@ -0,0 +1,101 @@
+import asyncio
+import sys
+import time
+from pathlib import Path
+from readline import insert_text
+
+import requests
+import json
+from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
+
+from core.utils.feishu_data_async import FeishuDataAsync
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+class BaiduRecommend:
+    """
+    微信小程序域名信息获取类
+    一次性任务
+    """
+
+    def __init__(self):
+        pass
+
+    @retry(
+        stop=stop_after_attempt(3),
+        wait=wait_exponential(multiplier=1, min=4, max=10),
+        retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception))
+    )
+    def get_top_search(self,cursor=0,last_timestamp_ms=""):
+        """获取热搜数据"""
+
+        url = f"http://crawapi.piaoquantv.com/crawler/bai_du/recommend"
+        body = {
+                "task_type": "recommend",
+                "cursor": cursor,
+                "last_timestamp_ms": last_timestamp_ms
+            }
+        print( body)
+        try:
+            response = requests.post(url,json= body)
+            response.raise_for_status()  # 检查HTTP错误
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {str(e)}")
+            raise  # 重新抛出异常以触发重试
+        except Exception as e:
+            print(f"解析响应失败: {str(e)}")
+            return None
+
+async def main():
+    global last_timestamp_ms
+    last_timestamp_ms = ""
+    for i in range(100):
+        resp = BaiduRecommend().get_top_search(cursor=i,last_timestamp_ms=last_timestamp_ms)
+        if resp and resp["code"] != 0:
+            print(f"API请求失败: {resp.get('msg')}")
+            await asyncio.sleep(60)  # 请求失败时等待60秒再重试
+            continue
+            
+        if not resp or not resp.get("data") or not resp["data"].get("data"):
+            print("未获取到数据,等待60秒后重试")
+            await asyncio.sleep(60)
+            continue
+        last_timestamp_ms = resp.get("data").get("next_cursor").get("last_timestamp_ms")
+        print(last_timestamp_ms)
+        obj = resp.get("data").get("data")
+        insert_datas = []
+        
+        for item in obj:
+
+            item_data = item.get("data")
+            if item_data.get("mode") != "text":
+                continue
+            title = item_data.get("title")
+            source = item_data.get("source")
+            view_count = item_data.get("comment_num")
+            publish_time = item_data.get("publish_time")
+
+
+            id = item.get("id")
+            url = f'https://mbd.baidu.com/newspage/data/landingsuper?pageType=1&_refluxos=i0&context={{"nid":"{id}","ssid":""}}'
+
+
+            insert_data = [title,url,view_count,source,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(publish_time))) if publish_time and str(publish_time).isdigit() else None]
+            insert_datas.append(insert_data)
+
+        if insert_datas:
+            try:
+                async with FeishuDataAsync() as feishu_data:
+                    await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "G7kfw0", "A2:E", insert_datas)
+                print(f"已插入 {len(insert_datas)} 条数据")
+
+
+            except Exception as e:
+                print(f"插入数据失败: {str(e)}")
+        else:
+            print("本次没有有效数据")
+
+
+
+if __name__ == '__main__':
+    asyncio.run(main())

+ 124 - 0
scripts/office/get_baidu_topsearch.py

@@ -0,0 +1,124 @@
+import asyncio
+import sys
+import time
+from pathlib import Path
+from readline import insert_text
+
+import requests
+import json
+from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
+
+from core.utils.feishu_data_async import FeishuDataAsync
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+class BaiduTopSearch:
+    """
+    微信小程序域名信息获取类
+    一次性任务
+    """
+
+    def __init__(self):
+        pass
+
+    @retry(
+        stop=stop_after_attempt(3),
+        wait=wait_exponential(multiplier=1, min=4, max=10),
+        retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError))
+    )
+    def get_access_token(self, appid, secret):
+        """获取微信接口访问令牌"""
+        url = "https://api.weixin.qq.com/cgi-bin/token"
+        params = {
+            "grant_type": "client_credential",
+            "appid": appid,
+            "secret": secret
+        }
+
+        try:
+            response = requests.get(url, params=params)
+            response.raise_for_status()  # 检查HTTP错误
+            print(response)
+            return response.json().get("access_token")
+        except requests.exceptions.RequestException as e:
+            print(f"获取微信令牌失败: {str(e)}")
+            raise  # 重新抛出异常以触发重试
+        except Exception as e:
+            print(f"解析响应失败: {str(e)}")
+            return None
+
+    @retry(
+        stop=stop_after_attempt(3),
+        wait=wait_exponential(multiplier=1, min=4, max=10),
+        retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception))
+    )
+    def get_top_search(self):
+        """获取热搜数据"""
+
+        url = f"http://crawapi.piaoquantv.com/crawler/bai_du/top_search"
+
+        try:
+            response = requests.post(url)
+            response.raise_for_status()  # 检查HTTP错误
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {str(e)}")
+            raise  # 重新抛出异常以触发重试
+        except Exception as e:
+            print(f"解析响应失败: {str(e)}")
+            return None
+
+async def main():
+    last_public_time = 0  # 记录最后一次处理的时间
+    
+    while True:
+        resp = BaiduTopSearch().get_top_search()
+        if resp and resp["code"] != 0:
+            print(f"API请求失败: {resp.get('msg')}")
+            await asyncio.sleep(60)  # 请求失败时等待60秒再重试
+            continue
+            
+        if not resp or not resp.get("data") or not resp["data"].get("data"):
+            print("未获取到数据,等待60秒后重试")
+            await asyncio.sleep(60)
+            continue
+            
+        obj = resp.get("data").get("data")
+        insert_datas = []
+        
+        for item in obj:
+            if item.get("layout") != "hot_board_item":
+                continue
+            item_data = item.get("data")
+            title = item_data.get("title")
+            publictime = item_data.get("publicTime")
+
+            last_public_time = publictime
+            score = item_data.get("grext").get("score")
+            position_type = item_data.get("position_type",0)
+            if position_type == 1:
+                position_name = "置顶内容"
+            else:
+                position_name = item_data.get("index")
+            hot_tag_name = item_data.get("grext").get("hot_tag_name")
+            insert_data = [title, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publictime)), score, position_name, hot_tag_name]
+            insert_datas.append(insert_data)
+
+        if insert_datas:
+            try:
+                async with FeishuDataAsync() as feishu_data:
+                    await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "07a356", "A2:E", insert_datas)
+                print(f"已插入 {len(insert_datas)} 条数据")
+
+
+            except Exception as e:
+                print(f"插入数据失败: {str(e)}")
+        else:
+            print("本次没有有效数据")
+
+        print("等待10分钟后继续执行")
+        time.sleep(600)
+
+
+
+if __name__ == '__main__':
+    asyncio.run(main())

+ 110 - 0
scripts/office/wx_index.py

@@ -0,0 +1,110 @@
+import asyncio
+import random
+import time
+from readline import insert_text
+
+import requests
+import json
+from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
+
+from core.utils.feishu_data_async import FeishuDataAsync
+
+
+class WechatDomainFetcher:
+    """
+    微信小程序域名信息获取类
+    一次性任务
+    """
+
+    def __init__(self):
+        pass
+
+
+    @retry(
+        stop=stop_after_attempt(3),
+        wait=wait_exponential(multiplier=1, min=4, max=10),
+        retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError))
+    )
+    def get_domain_info(self, keyword, start_ymd):
+        headers = {
+            'Content-Type': 'application/json',
+        }
+
+        json_data = {
+            'keyword': keyword,
+            'start_ymd': '20260423',
+            'end_ymd': '20260423',
+        }
+        try:
+            response = requests.post('http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex',
+                                     headers=headers,
+                                     json=json_data)
+            response.raise_for_status()  # 检查HTTP错误
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"获取域名信息失败: {str(e)}")
+            raise  # 重新抛出异常以触发重试
+        except Exception as e:
+            print(f"解析响应失败: {str(e)}")
+            return None
+
+
+async def main():
+    while True:
+        try:
+            async with FeishuDataAsync() as feishu_data:
+                config = await feishu_data.get_values_v3("TWeZsuGW4hURHatWnaec12blnAe", "jvQdJL",range_str="A2:B")
+            # 初始化微信域名信息获取器
+            domain_fetcher = WechatDomainFetcher()
+
+            for row in config[1:]:
+                if len(row) < 2:
+                    continue
+
+                keyword = row[1]
+                start_ymd = row[0]
+                if not keyword or not start_ymd:
+                    continue
+
+                domain_info = domain_fetcher.get_domain_info(keyword, start_ymd)
+                if not domain_info:
+                    continue
+
+                wx_index_datas = domain_info.get('data', {}).get("data", [])
+                if not wx_index_datas:
+                    continue
+
+                for data in wx_index_datas:
+                    if data.get("ymd") == str(start_ymd):
+                        channel_score = data.get("channel_score", {})
+                        insert_row = [
+                            keyword,
+                            start_ymd,
+                            str(channel_score.get("total_score", 0)),
+                            str(channel_score.get("mpdoc_score", 0)),
+                            str(channel_score.get("finder_score", 0)),
+                            str(channel_score.get("query_score", 0)),
+                            str(channel_score.get("live_score", 0)),
+                            str(channel_score.get("miniapp_score", 0))
+                        ]
+
+                        print(f"准备追加数据: {insert_row}")
+
+                        async with FeishuDataAsync() as feishu_data:
+                            await feishu_data.append_values(
+                                "JvUOwyowjir2wSkueEBcIxaSn4f",
+                                "Nv3PJg",
+                                "A2:H",
+                                [insert_row]
+                            )
+                        print(f"成功追加: {keyword} - {start_ymd}")
+                        break
+                time.sleep(random.randint(60, 120))
+
+        except Exception as e:
+            print(f"执行过程中出现错误: {str(e)}")
+            import traceback
+            traceback.print_exc()
+
+if __name__ == '__main__':
+    asyncio.run(main())