فهرست منبع

主板机跑小年糕+

罗俊辉 1 سال پیش
والد
کامیت
954be0fad4

+ 2 - 0
application/common/log/__init__.py

@@ -0,0 +1,2 @@
+from .local_log import Local
+from .aliyun_log import AliyunLogger

+ 80 - 0
application/common/log/aliyun_log.py

@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+# @Author: 罗俊辉
+# @Time: 2023/12/18
+"""
+公共方法,包含:生成log
+"""
+import json
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+from datetime import date, timedelta
+from datetime import datetime
+import time
+
+proxies = {"http": None, "https": None}
+
+
+class AliyunLogger(object):
+    def __init__(self, platform, mode, env):
+        self.platform = platform
+        self.mode = mode
+        self.env = env
+
+    # 写入阿里云日志
+    def logging(
+            self, code, message, data=None, trace_id=None, account=None
+    ):
+        """
+        写入阿里云日志
+        测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
+        正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
+        """
+        # 设置阿里云日志服务的访问信息
+        if data is None:
+            data = {}
+        accessKeyId = "LTAIWYUujJAm7CbH"
+        accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        if self.env == "dev":
+            project = "crawler-log-dev"
+            logstore = "crawler-log-dev"
+            endpoint = "cn-hangzhou.log.aliyuncs.com"
+        else:
+            project = "crawler-log-prod"
+            logstore = "crawler-fetch"
+            endpoint = "cn-hangzhou.log.aliyuncs.com"
+
+        # 创建 LogClient 实例
+        client = LogClient(endpoint, accessKeyId, accessKey)
+        log_group = []
+        log_item = LogItem()
+
+        """
+        生成日志消息体格式,例如
+        crawler:xigua
+        message:不满足抓取规则 
+        mode:search
+        timestamp:1686656143
+        """
+        message = message.replace("\r", " ").replace("\n", " ")
+        contents = [
+            (f"TraceId", str(trace_id)),
+            (f"code", str(code)),
+            (f"platform", str(self.platform)),
+            (f"mode", str(self.mode)),
+            (f"message", str(message)),
+            (f"data", json.dumps(data, ensure_ascii=False) if data else ""),
+            (f"account", str(account)),
+            ("timestamp", str(int(time.time()))),
+        ]
+
+        log_item.set_contents(contents)
+        log_group.append(log_item)
+        # 写入日志
+        request = PutLogsRequest(
+            project=project,
+            logstore=logstore,
+            topic="",
+            source="",
+            logitems=log_group,
+            compress=False,
+        )
+        client.put_logs(request)

+ 51 - 0
application/common/log/local_log.py

@@ -0,0 +1,51 @@
+from datetime import date, timedelta
+from datetime import datetime
+from loguru import logger
+import os
+
+proxies = {"http": None, "https": None}
+
+
+class Local:
+    # 统一获取当前时间 <class 'datetime.datetime'>  2022-04-14 20:13:51.244472
+    now = datetime.now()
+    # 昨天 <class 'str'>  2022-04-13
+    yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
+    # 今天 <class 'datetime.date'>  2022-04-14
+    today = date.today()
+    # 明天 <class 'str'>  2022-04-15
+    tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
+
+    # 使用 logger 模块生成日志
+    @staticmethod
+    def logger(log_type, crawler):
+        """
+        使用 logger 模块生成日志
+        """
+        # 日志路径
+        log_dir = f"./{crawler}/logs/"
+        log_path = os.getcwd() + os.sep + log_dir
+        if not os.path.isdir(log_path):
+            os.makedirs(log_path)
+
+        # 日志文件名
+        # log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log'
+        # log_name = datetime.datetime.now().strftime('%Y-%m-%d') + f'-{crawler}-{log_type}.log'
+        # log_name = f"{date.today():%Y-%m-%d}-{crawler}-{log_type}.log"
+        log_name = f"{crawler}-{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
+
+        # 日志不打印到控制台
+        logger.remove(handler_id=None)
+
+        # rotation="500 MB",实现每 500MB 存储一个文件
+        # rotation="12:00",实现每天 12:00 创建一个文件
+        # rotation="1 week",每周创建一个文件
+        # retention="10 days",每隔10天之后就会清理旧的日志
+        # 初始化日志
+        # logger.add(f"{log_dir}{log_name}", level="INFO", rotation="00:00", retention="10 days", enqueue=True)
+        logger.add(os.path.join(log_dir, log_name), level="INFO", rotation="00:00", retention="10 days", enqueue=True)
+
+        return logger
+
+
+

+ 30 - 0
application/common/messageQueue/__init__.py

@@ -0,0 +1,30 @@
+import json
+from mq_http_sdk.mq_exception import MQExceptionBase
+from mq_http_sdk.mq_producer import TopicMessage
+from mq_http_sdk.mq_client import MQClient
+
+from application.common.log import Local
+
+
+class MQ:
+    instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
+
+    def __init__(self, topic_name) -> None:
+        self.mq_client = MQClient("http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
+                                  "LTAI4G7puhXtLyHzHQpD6H7A",
+                                  "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
+        self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
+
+    def send_msg(self, video_dict):
+        strategy = video_dict["strategy"]
+        platform = video_dict["platform"]
+        try:
+            msg = TopicMessage(json.dumps(video_dict))
+            message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
+            # msg.set_message_key(platform + "-" + strategy + "-" + video_dict["out_video_id"])
+            msg.set_message_key(message_key)
+            re_msg = self.producer.publish_message(msg)
+            Local.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
+                                                  (re_msg.message_id, re_msg.message_body_md5))
+        except MQExceptionBase as e:
+            Local.logger(strategy, platform).error("Publish Message Fail. Exception:%s\n" % e)

+ 8 - 0
application/config/mobile_config.py

@@ -5,3 +5,11 @@ value: {
     "machine_ip":""
 }
 """
+platform_config = {
+    "xiaoniangao_plus": {
+        "adb_ip": "192.168.100.19:5555",
+        "machine_id": "150",
+        "local_port": "4750"
+    },
+    "zhufuquanzi": {},
+}

+ 1 - 0
application/functions/__init__.py

@@ -0,0 +1 @@
+from .get_redirect_url import get_redirect_url

+ 9 - 0
application/functions/get_redirect_url.py

@@ -0,0 +1,9 @@
+import requests
+
+
+def get_redirect_url(url):
+    res = requests.get(url, allow_redirects=False)
+    if res.status_code == 302 or res.status_code == 301:
+        return res.headers['Location']
+    else:
+        return url

+ 1 - 0
application/pipeline/__init__.py

@@ -0,0 +1 @@
+from .pipeline_dev import PiaoQuanPipelineTest

+ 111 - 0
application/pipeline/pipeline_dev.py

@@ -0,0 +1,111 @@
+import re
+import time
+
+
+class PiaoQuanPipelineTest:
+    def __init__(self, platform, mode, rule_dict, env, item, trace_id):
+        self.platform = platform
+        self.mode = mode
+        self.item = item
+        self.rule_dict = rule_dict
+        self.env = env
+        self.trace_id = trace_id
+
+    # 视频的发布时间限制, 属于是规则过滤
+    def publish_time_flag(self):
+        # 判断发布时间
+        publish_time_stamp = self.item["publish_time_stamp"]
+        update_time_stamp = self.item["update_time_stamp"]
+        if self.platform == "gongzhonghao":
+            if (
+                int(time.time()) - publish_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            ) and (
+                int(time.time()) - update_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            ):
+                message = "发布时间超过{}天".format(
+                    int(self.rule_dict.get("period", {}).get("max", 1000))
+                )
+                print(message)
+                return False
+        else:
+            if (
+                int(time.time()) - publish_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            ):
+                message = "发布时间超过{}天".format(
+                    int(self.rule_dict.get("period", {}).get("max", 1000))
+                )
+                print(message)
+                return False
+        return True
+
+    # 视频标题是否满足需求
+    def title_flag(self):
+        title = self.item["video_title"]
+        cleaned_title = re.sub(r"[^\w]", " ", title)
+        # 敏感词
+        # 获取敏感词列表
+        sensitive_words = []
+        if any(word in cleaned_title for word in sensitive_words):
+            message = "标题中包含敏感词"
+            print(message)
+            return False
+        return True
+
+    # 视频基础下载规则
+    def download_rule_flag(self):
+        for key in self.item:
+            if self.rule_dict.get(key):
+                max_value = (
+                    int(self.rule_dict[key]["max"])
+                    if int(self.rule_dict[key]["max"]) > 0
+                    else 999999999999999
+                )
+                if key == "peroid": # peroid是抓取周期天数
+                    continue
+                else:
+                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
+                    if not flag:
+                        message = "{}: {} <= {} <= {}, {}".format(
+                            key,
+                            self.rule_dict[key]["min"],
+                            self.item[key],
+                            max_value,
+                            flag,
+                        )
+                        print(message)
+                        return flag
+            else:
+                continue
+        return True
+
+    # 按照某个具体平台来去重
+    def repeat_video(self):
+        # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
+        # out_id = self.item["out_video_id"]
+        # sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
+        # repeat_video = MysqlHelper.get_values(
+        #     log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
+        # )
+        # if repeat_video:
+        #     message = "重复的视频"
+        #     print(message)
+        #     return False
+        return True
+
+    def process_item(self):
+        if not self.publish_time_flag():
+            # 记录相关日志
+            return False
+        if not self.title_flag():
+            # 记录相关日志
+            return False
+        if not self.repeat_video():
+            # 记录相关日志
+            return False
+        if not self.download_rule_flag():
+            # 记录相关日志
+            return False
+        return True

+ 1 - 0
application/spider/crawler/__init__.py

@@ -0,0 +1 @@
+from .xiaoniangao_plus import XiaoNianGaoPlusRecommend

+ 313 - 0
application/spider/crawler/xiaoniangao_plus.py

@@ -0,0 +1,313 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/12/18
+import json
+import os
+import sys
+import time
+import uuid
+from hashlib import md5
+
+from appium import webdriver
+from appium.webdriver.extensions.android.nativekey import AndroidKey
+from bs4 import BeautifulSoup
+from selenium.common.exceptions import NoSuchElementException
+from selenium.webdriver.common.by import By
+
+sys.path.append(os.getcwd())
+
+from application.functions import get_redirect_url
+from application.pipeline import PiaoQuanPipelineTest
+
+
+class XiaoNianGaoPlusRecommend(object):
+    def __init__(self, log_type, crawler, env, rule_dict, our_uid):
+        self.mq = None
+        self.platform = "xiaoniangaoplus"
+        self.download_cnt = 0
+        self.element_list = []
+        self.count = 0
+        self.swipe_count = 0
+        self.log_type = log_type
+        self.crawler = crawler
+        self.env = env
+        self.rule_dict = rule_dict
+        self.our_uid = our_uid
+        chromedriverExecutable = "/usr/bin/chromedriver"
+        print("启动微信")
+        # 微信的配置文件
+        caps = {
+            "platformName": "Android",
+            "devicesName": "Android",
+            "appPackage": "com.tencent.mm",
+            "appActivity": ".ui.LauncherUI",
+            "autoGrantPermissions": "true",
+            "noReset": True,
+            "resetkeyboard": True,
+            "unicodekeyboard": True,
+            "showChromedriverLog": True,
+            "printPageSourceOnFailure": True,
+            "recreateChromeDriverSessions": True,
+            "enableWebviewDetailsCollection": True,
+            "setWebContentsDebuggingEnabled": True,
+            "newCommandTimeout": 6000,
+            "automationName": "UiAutomator2",
+            "chromedriverExecutable": chromedriverExecutable,
+            "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
+        }
+        try:
+            self.driver = webdriver.Remote("http://localhost:4750/wd/hub", caps)
+        except Exception as e:
+            print(e)
+            return
+        self.driver.implicitly_wait(30)
+
+        for i in range(10):
+            try:
+                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
+                    print("启动微信成功")
+                    break
+                elif self.driver.find_element(
+                    By.ID, "com.android.systemui:id/dismiss_view"
+                ):
+                    print("发现并关闭系统下拉菜单")
+                    size = self.driver.get_window_size()
+                    self.driver.swipe(
+                        int(size["width"] * 0.5),
+                        int(size["height"] * 0.8),
+                        int(size["width"] * 0.5),
+                        int(size["height"] * 0.2),
+                        200,
+                    )
+                else:
+                    pass
+            except Exception as e:
+                print(f"打开微信异常:{e}")
+                time.sleep(1)
+
+        size = self.driver.get_window_size()
+        self.driver.swipe(
+            int(size["width"] * 0.5),
+            int(size["height"] * 0.2),
+            int(size["width"] * 0.5),
+            int(size["height"] * 0.8),
+            200,
+        )
+        time.sleep(1)
+        self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
+        print("打开小程序小年糕+成功")
+        time.sleep(5)
+        self.get_videoList()
+        time.sleep(1)
+        self.driver.quit()
+
+    def search_elements(self, xpath):
+        time.sleep(1)
+        windowHandles = self.driver.window_handles
+        for handle in windowHandles:
+            self.driver.switch_to.window(handle)
+            time.sleep(1)
+            try:
+                elements = self.driver.find_elements(By.XPATH, xpath)
+                if elements:
+                    return elements
+            except NoSuchElementException:
+                pass
+
+    def check_to_applet(self, xpath):
+        time.sleep(1)
+        webViews = self.driver.contexts
+        self.driver.switch_to.context(webViews[-1])
+        windowHandles = self.driver.window_handles
+        for handle in windowHandles:
+            self.driver.switch_to.window(handle)
+            time.sleep(1)
+            try:
+                self.driver.find_element(By.XPATH, xpath)
+                print("切换到WebView成功\n")
+                return
+            except NoSuchElementException:
+                time.sleep(1)
+
+    def swipe_up(self):
+        self.search_elements('//*[@class="list-list--list"]')
+        size = self.driver.get_window_size()
+        self.driver.swipe(
+            int(size["width"] * 0.5),
+            int(size["height"] * 0.8),
+            int(size["width"] * 0.5),
+            int(size["height"] * 0.442),
+            200,
+        )
+        self.swipe_count += 1
+
+    def get_video_url(self, video_title_element):
+        for i in range(3):
+            self.search_elements('//*[@class="list-list--list"]')
+            time.sleep(1)
+            self.driver.execute_script(
+                "arguments[0].scrollIntoView({block:'center',inline:'center'});",
+                video_title_element[0],
+            )
+            time.sleep(3)
+            video_title_element[0].click()
+            self.check_to_applet(
+                xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
+            )
+            time.sleep(10)
+            video_url_elements = self.search_elements(
+                '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
+            )
+            return video_url_elements[0].get_attribute("src")
+
+    def parse_detail(self, index):
+        page_source = self.driver.page_source
+        soup = BeautifulSoup(page_source, "html.parser")
+        soup.prettify()
+        video_list = soup.findAll(
+            name="wx-view", attrs={"class": "expose--adapt-parent"}
+        )
+        index = index + 1
+        element_list = [i for i in video_list][index:]
+        return element_list[0]
+
+    def get_video_info_2(self, video_element):
+        if self.download_cnt >= int(
+            self.rule_dict.get("videos_cnt", {}).get("min", 10)
+        ):
+            self.count = 0
+            self.download_cnt = 0
+            self.element_list = []
+            return
+        self.count += 1
+        # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引
+        trace_id = self.crawler + str(uuid.uuid1())
+        print("扫描到一条视频")
+        # 标题
+        video_title = video_element.find("wx-view", class_="dynamic--title").text
+        # 播放量字符串
+        play_str = video_element.find("wx-view", class_="dynamic--views").text
+        info_list = video_element.findAll(
+            "wx-view", class_="dynamic--commerce-btn-text"
+        )
+        # 点赞数量
+        like_str = info_list[1].text
+        # 评论数量
+        comment_str = info_list[2].text
+        # 视频时长
+        duration_str = video_element.find("wx-view", class_="dynamic--duration").text
+        user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
+        # 头像 URL
+        avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
+        # 封面 URL
+        cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"]
+        play_cnt = int(play_str.replace("+", "").replace("次播放", ""))
+        duration = int(duration_str.split(":")[0].strip()) * 60 + int(
+            duration_str.split(":")[-1].strip()
+        )
+        if "点赞" in like_str:
+            like_cnt = 0
+        elif "万" in like_str:
+            like_cnt = int(like_str.split("万")[0]) * 10000
+        else:
+            like_cnt = int(like_str)
+        if "评论" in comment_str:
+            comment_cnt = 0
+        elif "万" in comment_str:
+            comment_cnt = int(comment_str.split("万")[0]) * 10000
+        else:
+            comment_cnt = int(comment_str)
+        out_video_id = md5(video_title.encode("utf8")).hexdigest()
+        out_user_id = md5(user_name.encode("utf8")).hexdigest()
+
+        video_dict = {
+            "video_title": video_title,
+            "video_id": out_video_id,
+            "out_video_id": out_video_id,
+            "duration_str": duration_str,
+            "duration": duration,
+            "play_str": play_str,
+            "play_cnt": play_cnt,
+            "like_str": like_str,
+            "like_cnt": like_cnt,
+            "comment_cnt": comment_cnt,
+            "share_cnt": 0,
+            "user_name": user_name,
+            "user_id": out_user_id,
+            "publish_time_stamp": int(time.time()),
+            "publish_time_str": time.strftime(
+                "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
+            ),
+            "update_time_stamp": int(time.time()),
+            "avatar_url": avatar_url,
+            "cover_url": cover_url,
+            "session": f"xiaoniangao-{int(time.time())}",
+        }
+        pipeline = PiaoQuanPipelineTest(
+            platform=self.crawler,
+            mode=self.log_type,
+            item=video_dict,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            trace_id=trace_id,
+        )
+        flag = pipeline.process_item()
+        if flag:
+            video_title_element = self.search_elements(
+                f'//*[contains(text(), "{video_title}")]'
+            )
+            if video_title_element is None:
+                return
+            print("点击标题,进入视频详情页")
+            video_url = self.get_video_url(video_title_element)
+            print(video_url)
+            video_url = get_redirect_url(video_url)
+            print(video_url)
+            if video_url is None:
+                self.driver.press_keycode(AndroidKey.BACK)
+                time.sleep(5)
+                return
+            video_dict["video_url"] = video_url
+            video_dict["platform"] = self.crawler
+            video_dict["strategy"] = self.log_type
+            video_dict["out_video_id"] = video_dict["video_id"]
+            video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+            video_dict["user_id"] = self.our_uid
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+            self.download_cnt += 1
+            self.driver.press_keycode(AndroidKey.BACK)
+            time.sleep(5)
+
+    def get_video_info(self, video_element):
+        try:
+            self.get_video_info_2(video_element)
+        except Exception as e:
+            self.driver.press_keycode(AndroidKey.BACK)
+            print(f"抓取单条视频异常:{e}\n")
+
+    def get_videoList(self):
+        self.driver.implicitly_wait(20)
+        # 切换到 web_view
+        self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]')
+        print("切换到 webview 成功")
+        time.sleep(1)
+        page = 0
+        if self.search_elements('//*[@class="list-list--list"]') is None:
+            print("窗口已销毁")
+            self.count = 0
+            self.download_cnt = 0
+            self.element_list = []
+            return
+
+        print("开始获取视频信息")
+        for i in range(50):
+            print("下滑{}次".format(i))
+            element = self.parse_detail(i)
+            self.get_video_info(element)
+            self.swipe_up()
+            time.sleep(1)
+            if self.swipe_count > 100:
+                return
+        print("已抓取完一组,休眠 5 秒\n")
+        time.sleep(5)

+ 0 - 0
application/spider/crawler/xng_plus.py


+ 47 - 0
main.py

@@ -0,0 +1,47 @@
+import os
+import sys
+import time
+import schedule
+import multiprocessing
+
+sys.path.append(os.getcwd())
+
+from scheduler.spider_scheduler import SpiderHome
+
+
+class SpiderScheduler(object):
+    SH = SpiderHome()
+
+    @classmethod
+    def protect_spider_timeout(cls, function, hour):
+        run_time_limit = hour * 3600
+        start_time = time.time()
+        process = multiprocessing.Process(target=function)
+        process.start()
+        while True:
+            if time.time() - start_time >= run_time_limit:
+                process.terminate()
+                break
+            if not process.is_alive():
+                print("正在重启")
+                process.terminate()
+                time.sleep(60)
+                os.system("adb forward --remove-all")
+                process = multiprocessing.Process(target=function)
+                process.start()
+            time.sleep(60)
+
+    @classmethod
+    def run_xng_plus(cls, hour):
+        cls.protect_spider_timeout(function=cls.SH.run_xng_plus, hour=hour)
+
+    @classmethod
+    def run_zhuFuQuanZi(cls):
+        print("hello")
+
+
+if __name__ == "__main__":
+    SC = SpiderScheduler()
+    schedule.every().day.at("14:30").do(SC.run_xng_plus, hour=1)
+    while True:
+        schedule.run_pending()

+ 25 - 0
scheduler/spider_scheduler.py

@@ -0,0 +1,25 @@
+import os
+import sys
+
+sys.path.append(os.getcwd())
+
+from application.spider.crawler import *
+
+
+class SpiderHome(object):
+    @classmethod
+    def run_xng_plus(cls):
+        rule_dict1 = {
+            "period": {"min": 365, "max": 365},
+            "duration": {"min": 30, "max": 1800},
+            "favorite_cnt": {"min": 0, "max": 0},
+            "videos_cnt": {"min": 5000, "max": 0},
+            "share_cnt": {"min": 0, "max": 0},
+        }
+        XiaoNianGaoPlusRecommend(
+            "recommend",
+            "xiaoniangaoplus",
+            "prod",
+            rule_dict1,
+            [64120158, 64120157, 63676778],
+        )