Ver código fonte

中青看点

zhangliang 1 semana atrás
pai
commit
3406e6daff

+ 3 - 0
application/config/topic_group_queue.py

@@ -43,6 +43,9 @@ class TopicGroup(object):
             ('cjpq', 'recommend', 'chaojipiaoquan'),
             ('xlzf', 'recommend', 'xuanlanzhufu'),
             ('zzhxzfy', 'recommend', 'zhaozhaohuanxizhufuyu'),
+            ('zqkd', 'recommend', 'zhongqingkandian'),
+            ('zqkd', 'recommend', 'zhongqingkandian'),
+            ('zqkd', 'related_recommend', 'zhongqingkandian')
         ]
 
     def produce(self):

+ 344 - 0
spider/crawler_online/zhongqingkandian.py

@@ -0,0 +1,344 @@
+import asyncio
+import os
+import random
+import sys
+import time
+import uuid
+import json
+from datetime import datetime
+
+import aiohttp
+import requests
+
+from application.common.feishu import FsData
+from application.common.feishu.feishu_utils import FeishuUtils
+from application.common.gpt import GPT4oMini
+from application.common.redis.redis_helper import SyncRedisHelper
+
+sys.path.append(os.getcwd())
+
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.messageQueue import MQ
+from application.common.log import AliyunLogger
+from application.common.mysql import MysqlHelper
+
+
+
+
+
+class ZhongQingKanDian:
+    # / recommend(列表11个id)
+    # ↓ 并发请求每个id的 / related(得到列表N个元素)
+    # ↓ 对每个元素并发请求 / detail
+    # ↓ 若为视频,写入Redis(键:detail_id,值:视频数据)
+    API_BASE_URL = "http://8.217.192.46:8889"
+    COMMON_HEADERS = {
+        "Content-Type": "application/json"
+    }
+    MAX_RETRIES = 3
+    TIMEOUT = 30  # 设置超时时间
+    max_recommend_count = 100  # 推荐抓取每日最大量
+    max_related_recommend_count = 200  # 相关推荐抓取每日最大量
+    max_author_video = 300  # 账号每日抓取视频最大量
+
+    """
+        中青看点推荐流
+        Topic:zqkd_recommend_prod
+        """
+
+    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+        self.limit_flag = False
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.expire_flag = False
+        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
+        self.mysql = MysqlHelper(mode=self.mode, platform=self)
+        data_rule = FsData()
+        self.title_rule = data_rule.get_title_rule()
+
+    async def send_request(self, path, data):
+        full_url = f"{self.API_BASE_URL}{path}"
+        async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
+            for retry in range(self.MAX_RETRIES):
+                try:
+                    async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
+                        response.raise_for_status()
+                        return await response.json()
+                except aiohttp.ClientError as e:
+                    if retry < self.MAX_RETRIES - 1:
+                        await asyncio.sleep(2)
+                except json.JSONDecodeError as e:
+                    if retry < self.MAX_RETRIES - 1:
+                        await asyncio.sleep(2)
+        return None
+
+    def is_response_valid(self, resp):
+        if resp['code'] != 0:
+            self.aliyun_log.logging(
+                code="3000",
+                message="抓取单条视频失败,请求失败"
+            ),
+            return
+        return resp
+
+    async def req_recommend_list(self):
+        print("开始请求推荐")
+        '''
+        推荐请求
+        '''
+        url = '/crawler/zhong_qing_kan_dian/recommend'
+        body = json.dumps({"cursor": ""})
+        resp = await self.send_request(url, body)
+        return self.is_response_valid(resp)
+
+    async def req_related_recommend_list(self, content_id):
+        print("请求相关推荐")
+        '''
+         相关推荐请求
+        '''
+        url = '/crawler/zhong_qing_kan_dian/related'
+        body = json.dumps({
+            "content_id": str(content_id),
+            "cursor": ""
+        })
+        resp = await self.send_request(url, body)
+        return self.is_response_valid(resp)
+
+
+    async def req_detail(self, content_link, label,**kwargs):
+        print("请求详情")
+        '''
+        请求详情
+        '''
+        url = '/crawler/zhong_qing_kan_dian/detail'
+        body = json.dumps({
+            "content_link": content_link
+        })
+        resp = await self.send_request(url, body)
+        if not self.is_response_valid(resp):
+            return
+        data = resp.get("data", {}).get("data", {})
+        if data.get("content_type") != "video":
+            self.aliyun_log.logging(
+                code="3003",
+                message=f"跳过非视频内容(label={label})",
+                data={"content_link": content_link}
+            )
+            return
+        print("是视频")
+        # 将 kwargs 中的键值对更新到 data 字典中
+        data.update(kwargs)
+        self.process_video_obj(data, label)
+        await asyncio.sleep(10)
+
+    async def control_request(self):
+        print("开始处理")
+        """核心控制逻辑:顺序处理三个接口"""
+        recommend_resp = await self.req_recommend_list()
+        if not self.is_response_valid(recommend_resp):
+            return
+
+        recommend_list = recommend_resp.get("data", {}).get("data", [])
+
+        for video_obj in recommend_list:
+            content_link = video_obj.get("share_url")
+            content_id = video_obj.get("id")
+
+            if not (content_link and content_id):
+                continue
+            # 处理推荐视频详情
+            await self.req_detail(content_link, "recommend",**video_obj)
+
+            # # 处理相关推荐列表(间隔后执行)
+            # await asyncio.sleep(5)
+            # related_resp = await self.req_related_recommend_list(content_id)
+            # if not self.is_response_valid(related_resp):
+            #     continue
+            #
+            # related_list = related_resp.get("data", {}).get("data", [])
+            # for related_obj in related_list:
+            #     related_content_link = related_obj.get("share_url")
+            #     if related_content_link:
+            #         await self.req_detail(related_content_link, "related",**related_obj)
+    def process_video_obj(self, video_obj, label):
+        """
+        处理视频
+        :param video_obj:
+        """
+
+        if not self.save_video_id():
+
+        our_user = random.choice(self.user_list)
+        trace_id = self.platform + str(uuid.uuid1())
+        item = VideoItem()
+        try:
+            video_id = video_obj['channel_content_id']
+            account_id = video_obj["channel_account_id"]
+            account_name = video_obj["channel_account_name"]
+            account_avatar = video_obj["avatar"]
+            is_repeat_user = self.select_id(account_id)
+            # 判断用户是否重复
+            if is_repeat_user:
+                self.update_name_url(account_id, account_name, account_avatar)
+            else:
+                # 写表
+                self.insert_name_url(account_id, account_name, account_avatar)
+                # 写redis
+                self.write_redis_user_data(json.dumps({"uid": account_id}))
+                print("写入成功")
+        except Exception as e:
+            print(f"写入异常{e}")
+            pass
+        url = video_obj["video_url_list"][0]['video_url']
+        duration = video_obj["video_url_list"][0]['video_duration']
+        item.add_video_info("video_id", video_obj['channel_content_id'])
+        item.add_video_info("video_title", video_obj["title"])
+        item.add_video_info("play_cnt", int(video_obj["read_num"]))
+        item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"])/1000))
+        item.add_video_info("out_user_id", video_obj["channel_account_id"])
+        item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
+        item.add_video_info("like_cnt", 0)
+        item.add_video_info("collection_cnt", int(video_obj['collect_num']))
+        item.add_video_info("share_cnt", int(video_obj["share_num"]))
+        item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
+        item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
+        item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
+        item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        item.add_video_info("user_id", our_user["uid"])
+        item.add_video_info("user_name", our_user["nick_name"])
+
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            title_list = self.title_rule.split(",")
+            title = video_obj["title"]
+            contains_keyword = any(keyword in title for keyword in title_list)
+            if contains_keyword:
+                new_title = GPT4oMini.get_ai_mini_title(title)
+                if new_title:
+                    item.add_video_info("video_title", new_title)
+                    current_time = datetime.now()
+                    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+                    values = [
+                        [
+                            video_obj["video_url_list"][0]['video_url'],
+                            video_obj["image_url_list"][0]['image_url'],
+                            title,
+                            new_title,
+                            formatted_time,
+                        ]
+                    ]
+                    FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
+                    time.sleep(0.5)
+                    FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
+            self.download_cnt += 1
+            self.mq.send_msg(mq_obj)
+            self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
+            if self.download_cnt >= int(
+                    self.rule_dict.get("videos_cnt", {}).get("min", 200)
+            ):
+                self.limit_flag = True
+            if label == "recommend":
+                key = f"crawler:zqkd:{video_id}"
+                self.save_video_id(key)
+
+
+
+    """
+      查询用户id是否存在
+      """
+
+    def select_id(self, uid):
+        sql = f""" select uid from zqkd_uid where uid = "{uid}"; """
+        db = MysqlHelper()
+        repeat_user = db.select(sql=sql)
+        if repeat_user:
+            return True
+        return False
+    def update_name_url(self, uid,user_name,avatar_url):
+        sql = f""" update zqkd_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{uid}"; """
+        db = MysqlHelper()
+        repeat_video = db.update(sql=sql)
+        if repeat_video:
+            return True
+        return False
+
+    def insert_name_url(self, uid, user_name, avatar_url):
+        current_time = datetime.now()
+        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+        insert_sql = f"""INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')"""
+        db = MysqlHelper()
+        repeat_video = db.update(sql=insert_sql)
+        if repeat_video:
+            return True
+        return False
+
+    def get_redis_video_data(self):
+        """获取一条id"""
+        task = f"task:zqkd_video_id"
+        helper = SyncRedisHelper()
+        client = helper.get_client()
+
+        # 获取列表的长度
+        list_length = client.llen(task)
+        # 循环获取列表中的元素
+        for i in range(list_length):
+            # 使用 lrange 获取单个元素
+            element = client.lrange(task, i, i)
+            if element:
+                print(f"Element at index {i}: {element[0].decode('utf-8')}")
+                return element
+
+    def write_redis_user_data(self,key,ret):
+        """写入"""
+        task = f"task:zqkd_user_id"
+        helper = SyncRedisHelper()
+        client = helper.get_client()
+        client.rpush(task, ret)
+
+    async def run(self):
+        while True:
+            await self.control_request()
+    def save_video_id(self,key):
+        helper = SyncRedisHelper()
+        client = helper.get_client()
+        # 将视频ID存储到Redis中,并设置过期时间为7天
+        # 检查键是否存在
+
+        if client.exists(key):
+            return False
+        else:
+            expiration_time = int(timedelta(days=7).total_seconds())
+            client.setex(key, expiration_time, "1")
+
+
+from datetime import datetime, timedelta
+
+if __name__ == '__main__':
+    # asyncio.run(ZhongQingKanDian(
+    #     platform="zhongqingkandian",
+    #     mode="recommend",
+    #     rule_dict={},
+    #     user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"},
+    #                ]
+    #
+    # ).run())
+    save_video_id("1234")
+
+