zhangyong 1 gadu atpakaļ
vecāks
revīzija
91a11c4357

+ 384 - 0
shipinhao/shipinhao_author/shipinhao_author.py

@@ -0,0 +1,384 @@
+import os
+import json
+import random
+import sys
+import time
+import uuid
+import datetime
+
+import requests
+import cv2
+
+sys.path.append(os.getcwd())
+from datetime import datetime
+from common.feishu import Feishu
+from common import PiaoQuanPipeline, AliyunLogger
+from common.db import MysqlHelper
+from common.mq import MQ
+from common.public import clean_title
+
+
+def find_target_user(name, user_list):
+    """
+    在搜索到到账号列表中找目标列表
+    """
+    for obj in user_list:
+        if obj['contact']["nickname"] == name:
+            return obj
+        else:
+            continue
+    return False
+
+
+class ShiPinHaoAuthor(object):
+    """
+    视频号账号爬虫
+    """
+    def __init__(self, platform, mode, rule_dict, user_dict, env):
+        self.account_name = user_dict["link"]
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_dict = user_dict
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+
+    def get_history_id(self):
+        """
+        从数据库表中读取 id
+        """
+        select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
+        name_id = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_user_sql,
+            env=self.env,
+            machine="",
+        )
+        if name_id:
+            return name_id[0][0]
+        else:
+            return False
+
+    def get_account_id(self):
+        """
+        读历史数据,如果存在 id,则直接返回 id
+        """
+        history_id = self.get_history_id()
+        if history_id:
+            return history_id
+        else:
+            url = "http://61.48.133.26:30001/Find_Video_Content"
+            payload = json.dumps({
+                "content": self.account_name,
+                "type": "19"
+            })
+            headers = {
+                'Content-Type': 'application/json'
+            }
+            response = requests.request("POST", url, headers=headers, data=payload)
+            info_list = response.json()['info_list']
+            if len(info_list) == 0:
+                return False
+            target_user = find_target_user(name=self.account_name, user_list=info_list)
+            # 写入 MySql 数据库
+            if target_user:
+                update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['contact']['username']}", "{self.platform}", 1 )"""
+                MysqlHelper.update_values(
+                    log_type=self.mode,
+                    crawler=self.platform,
+                    sql=update_sql,
+                    env=self.env,
+                    machine="",
+                )
+                return target_user['contact']["username"]
+            else:
+                return False
+
+    def get_account_videos(self):
+        account_id = self.get_account_id()
+        if account_id:
+            url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage"
+            last_buffer = ""
+            for i in range(10):
+                if self.download_cnt >= int(
+                    self.rule_dict.get("videos_cnt", {}).get("min", 30)
+                ):
+                    return
+                headers = {
+                    'Content-Type': 'application/json'
+                }
+                payload = json.dumps({
+                    "username": account_id,
+                    "last_buffer": last_buffer
+                })
+
+                response = requests.request("POST", url, headers=headers, data=payload)
+                time.sleep(random.randint(1, 5))
+                if "objectId" not in response.text or response.status_code != 200:
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="没有更多视频了",
+                    )
+                    return
+                res_json = response.json()
+                if len(res_json["UpMasterHomePage"]) == 0:
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="没有更多视频了",
+                    )
+                    return
+
+                if not res_json["UpMasterHomePage"]:
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="没有更多视频了",
+                    )
+                    return
+                else:
+                    last_buffer = res_json.get('last_buffer')
+                    count = 0
+                    for obj in res_json["UpMasterHomePage"]:
+                        try:
+                            AliyunLogger.logging(
+                                code="1001",
+                                platform=self.platform,
+                                mode=self.mode,
+                                message="扫描到一条视频",
+                                env=self.env,
+                                data=obj,
+                            )
+                            repeat_flag = self.process_video_obj(obj, count)
+                            count += 1
+                            if not repeat_flag:
+                                return
+                        except Exception as e:
+                            AliyunLogger.logging(
+                                code="3000",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message=f"抓取单条视频异常:{e}\n",
+                            )
+        else:
+            AliyunLogger.logging(
+                code="3000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="{}\t获取 id 失败".format(self.account_name),
+            )
+
+    def process_video_obj(self, obj, count):
+        objectId = obj['objectId']
+        objectNonceId = obj['objectNonceId']
+
+        trace_id = self.platform + "new" + str(uuid.uuid1())
+        url = "http://61.48.133.26:30001/GetFinderDownloadAddress"
+        payload = json.dumps({
+            "objectId": objectId,
+            "objectNonceId": objectNonceId
+        })
+        headers = {
+            'Content-Type': 'text/plain'
+        }
+        response = requests.request("POST", url, headers=headers, data=payload)
+        time.sleep(random.randint(0, 1))
+        video_obj = response.json()
+        publish_time_str = obj['createtime']
+        datetime_obj = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S')
+        # 将datetime对象转换为时间戳
+        publish_time_stamp = int(datetime_obj.timestamp())
+        video_url = video_obj.get('DownloadAddress')
+        duration = int(self.video_duration(video_url))
+        share_cnt = int(obj['forward_count'])
+        like_cnt = int(obj['like_count'])
+        # 获取当前时间
+        current_time = datetime.now()
+        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+        video_dict = {
+            "video_id": objectId,
+            "video_title": clean_title(video_obj.get('title').split("\n")[0].split("#")[0]),
+            "out_video_id": trace_id,
+            "publish_time_stamp": publish_time_stamp,
+            "publish_time_str": publish_time_str,
+            "play_cnt": 0,
+            "fav_count": int(obj['fav_count']),
+            "comment_cnt": int(obj['comment_count']),
+            "like_cnt": like_cnt,
+            "share_cnt": share_cnt,
+            "user_id": self.user_dict["uid"],
+            "cover_url": video_obj.get('thumb_url'),
+            "video_url": video_url,
+            "avatar_url": video_obj.get('thumb_url'),
+            "width": video_obj.get('width'),
+            "height": video_obj.get('height'),
+            "duration": duration,
+            "platform": self.platform,
+            "strategy": self.mode,
+            "crawler_rule": self.rule_dict,
+            "session": f"shipinhao-author-{int(time.time())}",
+        }
+        if share_cnt == 0:
+            divisor_cnt = 0
+        else:
+            divisor_cnt = int(share_cnt / like_cnt)
+        # 视频时长小于30秒 返回
+        if duration < 30:
+            values = [[
+                obj['nickname'],
+                publish_time_str,
+                formatted_time,
+                int(obj['fav_count']),
+                int(obj['comment_count']),
+                int(obj['like_count']),
+                int(obj['forward_count']),
+                divisor_cnt,
+                video_obj.get('title').split("\n")[0].split("#")[0],
+                duration,
+                '否',
+                '时长小于30秒',
+                video_obj.get('DownloadAddress')
+            ]]
+            Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
+            return True
+        # 分享小于1000 返回
+        if share_cnt < 1000:
+            values = [[
+                obj['nickname'],
+                publish_time_str,
+                formatted_time,
+                int(obj['fav_count']),
+                int(obj['comment_count']),
+                int(obj['like_count']),
+                int(obj['forward_count']),
+                divisor_cnt,
+                video_obj.get('title').split("\n")[0].split("#")[0],
+                duration,
+                '否',
+                '分享小于1000',
+                video_obj.get('DownloadAddress')
+            ]]
+            Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
+            return True
+        # 分享小于等于99999
+        if share_cnt <= 99999 and divisor_cnt < 2:
+            values = [[
+                obj['nickname'],
+                publish_time_str,
+                formatted_time,
+                int(obj['fav_count']),
+                int(obj['comment_count']),
+                int(obj['like_count']),
+                int(obj['forward_count']),
+                divisor_cnt,
+                video_obj.get('title').split("\n")[0].split("#")[0],
+                duration,
+                '否',
+                f'分享小于100000,分享/点赞:{divisor_cnt}',
+                video_obj.get('DownloadAddress')
+            ]]
+            Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
+            return True
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            item=video_dict,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            trace_id=trace_id,
+        )
+        if not pipeline.repeat_video():
+            values = [[
+                obj['nickname'],
+                publish_time_str,
+                formatted_time,
+                int(obj['fav_count']),
+                int(obj['comment_count']),
+                int(obj['like_count']),
+                int(obj['forward_count']),
+                divisor_cnt,
+                video_obj.get('title').split("\n")[0].split("#")[0],
+                duration,
+                '否',
+                '重复视频',
+                video_obj.get('DownloadAddress')
+            ]]
+            Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
+            if count > 3:
+                return False
+            else:
+                return True
+        else:
+            values = [[
+                obj['nickname'],
+                publish_time_str,
+                formatted_time,
+                int(obj['fav_count']),
+                int(obj['comment_count']),
+                int(obj['like_count']),
+                int(obj['forward_count']),
+                divisor_cnt,
+                video_obj.get('title').split("\n")[0].split("#")[0],
+                duration,
+                '是',
+                '',
+                video_obj.get('DownloadAddress')
+            ]]
+            Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            self.mq.send_msg(video_dict)
+            self.download_cnt += 1
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+            )
+            time.sleep(5)
+        return True
+
+    def video_duration(self, filename):
+        cap = cv2.VideoCapture(filename)
+        if cap.isOpened():
+            rate = cap.get(5)
+            frame_num = cap.get(7)
+            duration = frame_num / rate
+            return duration
+        return 0
+
+
+if __name__ == "__main__":
+    SP = ShiPinHaoAuthor(
+
+        platform="shipinhao",
+        mode="author",
+        user_dict={"uid": "123456", "link": "老碗哥说文解惑", "user_id": "1234565"},
+        rule_dict={},
+        env="prod",
+    )
+
+    SP.get_account_videos()

+ 4 - 3
shipinhao/shipinhao_main/run_sph_author.py

@@ -1,15 +1,16 @@
 import argparse
 import random
-import time
 from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
 from mq_http_sdk.mq_exception import MQExceptionBase
 
+
 sys.path.append(os.getcwd())
 from common.public import task_fun_mq, get_consumer, ack_message
 from common.scheduling_db import MysqlHelper
 from common import AliyunLogger
-from shipinhao.shipinhao_author import ShiPinHaoAccount
+from shipinhao.shipinhao_author.shipinhao_author import ShiPinHaoAuthor
+
 
 
 def main(log_type, crawler, topic_name, group_id, env):
@@ -103,7 +104,7 @@ def main(log_type, crawler, topic_name, group_id, env):
                             message="开始抓取视频号{}".format(user_dict["link"]),
                         )
                         # 初始化
-                        SPHA = ShiPinHaoAccount(
+                        SPHA = ShiPinHaoAuthor(
                             platform=crawler,
                             mode=log_type,
                             rule_dict=rule_dict,