Преглед изворни кода

增加小年糕账号线下抓取

zhangyong пре 10 месеци
родитељ
комит
ce941a81c1

+ 4 - 0
app/off_line_controler.py

@@ -70,6 +70,10 @@ class SpiderScheduler(object):
     def run_spss_id(cls, hour):
         cls.protect_spider_timeout(function=cls.SH.run_spss_id, hour=hour)
 
+    @classmethod
+    def run_xng_id(cls, hour):
+        cls.protect_spider_timeout(function=cls.SH.run_xng_id, hour=hour)
+
 
 if __name__ == "__main__":
     SC = SpiderScheduler()

+ 6 - 0
scheduler/spider_scheduler.py

@@ -2,10 +2,12 @@ import os
 import sys
 import time
 
+
 sys.path.append(os.getcwd())
 
 from spider.crawler_offline import *
 from spider.crawler_offline.shipinshuashua_id import SPSSIdRecommend
+from spider.crawler_offline.xiaoniangao_zhanghao import XiaoNianGaoZH
 
 class SpiderHome(object):
     @classmethod
@@ -60,3 +62,7 @@ class SpiderHome(object):
         SPSSIdRecommend(
             env="prod"
         )
+
+    @classmethod
+    def run_xng_id(cls):
+        XiaoNianGaoZH()

+ 408 - 0
spider/crawler_offline/xiaoniangao_zhanghao.py

@@ -0,0 +1,408 @@
+# -*- coding: utf-8 -*-
+import json
+import os
+import random
+import subprocess
+import sys
+import time
+import uuid
+from datetime import datetime
+import requests
+
+from appium import webdriver
+from appium.webdriver.extensions.android.nativekey import AndroidKey
+from appium.webdriver.common.touch_action import TouchAction
+from bs4 import BeautifulSoup
+from selenium.common.exceptions import NoSuchElementException
+from selenium.webdriver.common.by import By
+
+from application.common import MysqlHelper, Feishu
+
+sys.path.append(os.getcwd())
+
+
+
+class XiaoNianGaoZH(object):
+    """
+    小年糕+线下爬虫
+    """
+
+    def __init__(self):
+        mid = 1160417293
+        current_time = datetime.now()
+        formatted_time = current_time.strftime("%Y%m%d")
+        date_int = int(formatted_time)
+        # 获取时间标签
+        tag_id = self.get_tag_id(date_int)
+        print(tag_id)
+        # 新增账号
+        pq_uid = self.insert_number(mid, tag_id)
+        self.count = 0
+        self.swipe_count = 0
+        chromedriverExecutable = "/Users/tzld/Downloads/chromedriver-mac-x64/chromedriver"
+        print("启动微信")
+        # 微信的配置文件
+        caps = {
+            "platformName": "Android",
+            "devicesName": "Android",
+            "appPackage": "com.tencent.mm",
+            "appActivity": ".ui.LauncherUI",
+            "autoGrantPermissions": True,
+            "noReset": True,
+            "resetkeyboard": True,
+            "unicodekeyboard": True,
+            "showChromedriverLog": True,
+            "printPageSourceOnFailure": True,
+            "recreateChromeDriverSessions": True,
+            "enableWebviewDetailsCollection": True,
+            "setWebContentsDebuggingEnabled": True,
+            "newCommandTimeout": 6000,
+            "automationName": "UiAutomator2",
+            "chromedriverExecutable": chromedriverExecutable,
+            "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
+        }
+        try:
+            self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
+        except Exception as e:
+            print(e)
+            return
+        self.driver.implicitly_wait(30)
+
+        for i in range(10):
+            try:
+                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
+                    print("启动微信成功")
+                    break
+                elif self.driver.find_element(
+                        By.ID, "com.android.systemui:id/dismiss_view"
+                ):
+                    print("发现并关闭系统下拉菜单")
+                    size = self.driver.get_window_size()
+                    self.driver.swipe(
+                        int(size["width"] * 0.5),
+                        int(size["height"] * 0.8),
+                        int(size["width"] * 0.5),
+                        int(size["height"] * 0.2),
+                        200,
+                    )
+                else:
+                    pass
+            except Exception as e:
+                print(f"打开微信异常:{e}")
+                time.sleep(1)
+
+        size = self.driver.get_window_size()
+        self.driver.swipe(
+            int(size["width"] * 0.5),
+            int(size["height"] * 0.2),
+            int(size["width"] * 0.5),
+            int(size["height"] * 0.8),
+            200,
+        )
+        time.sleep(1)
+        command = 'adb shell service call statusbar 2'
+        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
+        process.communicate()
+        self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
+        print("打开小程序小年糕+成功")
+        time.sleep(5)
+        self.get_videoList()
+        time.sleep(1)
+        self.driver.quit()
+
+    def search_elements(self, xpath):
+        time.sleep(1)
+        windowHandles = self.driver.window_handles
+        for handle in windowHandles:
+            self.driver.switch_to.window(handle)
+            time.sleep(1)
+            try:
+                elements = self.driver.find_elements(By.XPATH, xpath)
+                if elements:
+                    return elements
+            except NoSuchElementException:
+                pass
+
+    def check_to_applet(self, xpath):
+        time.sleep(1)
+        webViews = self.driver.contexts
+        self.driver.switch_to.context(webViews[-1])
+        windowHandles = self.driver.window_handles
+        for handle in windowHandles:
+            self.driver.switch_to.window(handle)
+            time.sleep(1)
+            try:
+                self.driver.find_element(By.XPATH, xpath)
+                print("切换到WebView成功\n")
+                return
+            except NoSuchElementException:
+                time.sleep(1)
+
+    def swipe_up(self):
+        self.search_elements('//*[@class="list-list--list"]')
+        size = self.driver.get_window_size()
+        action = TouchAction(self.driver)
+        action.press(x=int(size["width"] * 0.5), y=int(size["height"] * 0.85))
+        action.wait(ms=1300)  # 可以调整等待时间
+        action.move_to(x=int(size["width"] * 0.5), y=int(size["height"] * 0.2))
+        action.release()
+        action.perform()
+        self.swipe_count += 1
+
+    def get_video_url(self, video_title_element):
+        for i in range(3):
+            self.search_elements('//*[@class="list-list--list"]')
+            time.sleep(1)
+            self.driver.execute_script(
+                "arguments[0].scrollIntoView({block:'center',inline:'center'});",
+                video_title_element[0],
+            )
+            time.sleep(3)
+            video_title_element[0].click()
+            self.check_to_applet(
+                xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
+            )
+            time.sleep(10)
+            video_url_elements = self.search_elements(
+                '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
+            )
+            return video_url_elements[0].get_attribute("src")
+
+    def parse_detail(self, index):
+        page_source = self.driver.page_source
+        soup = BeautifulSoup(page_source, "html.parser")
+        soup.prettify()
+        video_list = soup.findAll(
+            name="wx-view", attrs={"class": "expose--adapt-parent"}
+        )
+        index = index + 1
+        element_list = [i for i in video_list][index:]
+        return element_list[0]
+
+    def get_video_info_2(self, video_element):
+        self.count += 1
+        video_title = video_element.find("wx-view", class_="dynamic--title").text
+
+        # 头像 URL
+        avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
+        # 用户名称
+        user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
+        name_url = self.select_name_url(avatar_url, user_name)
+        if name_url:
+            video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]')
+            if video_title_element is None:
+                return
+            self.get_video_url(video_title_element)
+            video_mid_elements = self.search_elements("//wx-view[@class='bar--navBar-content-capsule-wrap']")
+            mid = int(video_mid_elements[0].get_attribute("data-mid"))
+            self.driver.press_keycode(AndroidKey.BACK)
+            time.sleep(5)
+            uid = self.select_id(mid)
+            if uid:
+                self.update_name_url(mid, avatar_url, user_name)
+            else:
+                time.sleep(1)
+                link = self.select_id_status(mid)
+                if link:
+                    current_time = datetime.now()
+                    formatted_time = current_time.strftime("%Y%m%d")
+                    date_int = int(formatted_time)
+                    # 获取时间标签
+                    tag_id = self.get_tag_id(date_int)
+                    time.sleep(5)
+                    print(tag_id)
+                    # 新增账号
+                    pq_uid = self.insert_number(mid, tag_id)
+                    time.sleep(5)
+                    if pq_uid:
+                        self.insert_name_url(mid, avatar_url, user_name)
+                        # 获取当前时间
+                        current_time = datetime.now()
+                        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+                        values = [[
+                            str(mid),
+                            user_name,
+                            avatar_url,
+                            str(pq_uid),
+                            formatted_time,
+
+                        ]]
+                        Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "8zlceR", "ROWS", 1, 2)
+                        time.sleep(0.5)
+                        Feishu.update_values('xiaoniangao', 'xiaoniangao', "8zlceR", "A2:Z2", values)
+                        print("写入飞书表格成功")
+
+
+    def get_video_info(self, video_element):
+        try:
+            self.get_video_info_2(video_element)
+        except Exception as e:
+            print(f"抓取单条视频异常:{e}\n")
+
+    def get_videoList(self):
+        """
+        获取视频列表
+        :return:
+        """
+        # while True:
+        self.driver.implicitly_wait(20)
+        # 切换到 web_view
+        self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]')
+        print("切换到 webview 成功")
+        time.sleep(1)
+        if self.search_elements('//*[@class="list-list--list"]') is None:
+            print("窗口已销毁")
+            self.count = 0
+            self.download_cnt = 0
+            self.element_list = []
+            return
+
+        print("开始获取视频信息")
+        for i in range(50):
+            print("下滑{}次".format(i))
+            element = self.parse_detail(i)
+            self.get_video_info(element)
+            self.swipe_up()
+            time.sleep(random.randint(1, 5))
+
+
+    def insert_number(self, mid, tag_id):
+        for i in range(3):
+            url = "https://admin.piaoquantv.com/manager/crawler/v3/user/save"
+            payload = {
+                "source": "xiaoniangao",
+                "mode": "author",
+                "modeValue": "",
+                "modeBoard": "",
+                "recomStatus": -6,
+                "appRecomStatus": -6,
+                "autoAuditStatus": 0,
+                "tag": f"459,454,106,8240,{int(tag_id)}",
+                "contentCategory": 0,
+                "link": str(mid)
+            }
+
+            headers = {
+                'content-length': '0',
+                'cookie': 'SESSION=MWM4YzVlMTctNzdkNC00NjE3LWIxZTctOGQwYzgzYmVmN2Qw',
+                'origin': 'https://admin.piaoquantv.com',
+                'priority': 'u=1, i',
+                'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
+                'sec-ch-ua-mobile': '?0',
+                'sec-ch-ua-platform': '"macOS"'
+            }
+
+            response = requests.request("POST", url, headers=headers, json=payload)
+            response = response.json()
+            code = response["code"]
+            if code == 0:
+                print("添加账号成功")
+                time.sleep(1)
+                url = "https://admin.piaoquantv.com/manager/crawler/v3/user/list"
+                payload = {
+                    "pageNum": 1,
+                    "pageSize": 20
+                }
+                response = requests.request("POST", url, headers=headers, json=payload)
+                response = response.json()
+                list = response["content"]['list']
+                link = list[0]["link"]
+                if link == str(mid):
+                    print("获取站内账号ID成功")
+                    return list[0]["uid"]
+
+
+
+    """
+    查询用户名+头像是否存在
+    """
+    def select_name_url(self, avatar_url, user_name):
+        sql = f""" select uid from xng_uid where avatar_url = "{avatar_url}" and user_name="{user_name}"; """
+        db = MysqlHelper()
+        repeat_video = db.select(sql=sql)
+        if repeat_video:
+            return False
+        return True
+
+
+
+    def get_tag_id(self, date_int):
+        for i in range(3):
+            url = f"https://admin.piaoquantv.com/manager/user/up/searchUserTypeTag?keyword={date_int}&muid=7"
+
+            payload = {}
+            headers = {
+                'content-length': '0',
+                'cookie': 'SESSION=MWM4YzVlMTctNzdkNC00NjE3LWIxZTctOGQwYzgzYmVmN2Qw',
+                'origin': 'https://admin.piaoquantv.com',
+                'priority': 'u=1, i',
+                'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
+                'sec-ch-ua-mobile': '?0',
+                'sec-ch-ua-platform': '"macOS"'
+            }
+
+            response = requests.request("POST", url, headers=headers, data=payload)
+            response = response.json()
+            content = response["content"]
+            if content:
+                tagId = content[0]['tagId']
+                return tagId
+            else:
+                url = f"https://admin.piaoquantv.com/manager/user/up/createUserTypeTag?tagName={date_int}&muid=7"
+                response = requests.request("POST", url, headers=headers, data=payload)
+                response = response.json()
+                content = response["content"]
+                if content:
+                    tagId = content['tagId']
+                    return tagId
+
+    """
+    修改用户名+头像
+    """
+    def update_name_url(self, mid, avatar_url, user_name):
+        sql = f""" update xng_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{mid}"; """
+        db = MysqlHelper()
+        repeat_video = db.update(sql=sql)
+        if repeat_video:
+            return True
+        return False
+
+    """
+    插入 用户名 头像 用户id
+    """
+    def insert_name_url(self, uid, avatar_url, user_name):
+        current_time = datetime.now()
+        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
+        insert_sql = f"""INSERT INTO xng_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')"""
+        db = MysqlHelper()
+        repeat_video = db.update(sql=insert_sql)
+        if repeat_video:
+            return True
+        return False
+
+
+    """
+    查询用户id是否存在
+    """
+    def select_id(self, uid):
+        sql = f""" select uid from xng_uid where uid = "{uid}"; """
+        db = MysqlHelper()
+        repeat_video = db.select(sql=sql)
+        if repeat_video:
+            return True
+        return False
+
+    """
+    查询用户id是否之前已添加过
+    """
+    def select_id_status(self, uid):
+        sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """
+        db = MysqlHelper()
+        repeat_video = db.select(sql=sql)
+        if repeat_video:
+            return False
+        return True
+
+
+if __name__ == "__main__":
+    XiaoNianGaoZH()