Prechádzať zdrojové kódy

账号抓取v1 提交

luojunhui 7 mesiacov pred
rodič
commit
14bd73fcf0

+ 30 - 1
applications/const.py

@@ -72,4 +72,33 @@ class updateAccountReadAvgTaskConst:
 
     # 发文模式
     ARTICLES_DAILY = 1
-    TOULIU = 2
+    TOULIU = 2
+
+
+class WeixinVideoCrawlerConst:
+    """
+    微信视频抓取常量配置
+    """
+    # 账号抓取状态
+    ACCOUNT_CRAWL_STATUS = 1
+    ACCOUNT_DO_NOT_CRAWL_STATUS = 0
+
+    # 默认最早抓取时间戳(2024-01-01)
+    DEFAULT_TIMESTAMP = 1704038400
+
+    # 搜索爬虫最大页数
+    MAX_SEARCH_PAGE_NUM = 10
+
+    # 抓取每一页的等待时间
+    SLEEP_SECONDS = 5
+
+    # 种子标题最低阅读均值倍数
+    READ_AVG_MULTIPLE = 1.3
+
+    # 种子标题最低阅读量
+    MIN_READ_COUNT = 2000
+
+    # 获取种子标题的统计周期
+    STAT_PERIOD = 7 * 24 * 60 * 60
+
+

+ 103 - 1
applications/functions.py

@@ -1,11 +1,21 @@
 """
 @author: luojunhui
 """
+import os
 import threading
 import hashlib
 
 from datetime import datetime, timezone
 
+import re
+import html
+
+import oss2
+from uuid import uuid4
+import requests
+
+from fake_useragent import FakeUserAgent
+
 
 class Functions(object):
     """
@@ -137,4 +147,96 @@ class Functions(object):
         """
         dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
         date_string = dt_object.strftime(string_format)
-        return date_string
+        return date_string
+
+    @classmethod
+    def proxy(cls):
+        """
+        快代理
+        """
+        # 隧道域名:端口号
+        tunnel = "l901.kdltps.com:15818"
+
+        # 用户名密码方式
+        username = "t11983523373311"
+        password = "mtuhdr2z"
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
+        }
+        return proxies
+
+    @classmethod
+    def get_video_url(cls, article_url):
+        """
+        :param article_url:
+        :return:
+        """
+        response = requests.get(
+            url=article_url,
+            headers={'User-Agent': FakeUserAgent().random},
+
+        )
+        html_text = response.text
+        w = re.search(
+            r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
+        ).group(1)
+        url = html.unescape(
+            re.sub(
+                r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
+            )
+        )
+        return url
+
+    @classmethod
+    def download_gzh_video(cls, article_url):
+        """
+        下载公众号视频
+        :param article_url:
+        :return:
+        """
+        try:
+            video_url = cls.get_video_url(article_url)
+        except Exception as e:
+            return
+        save_path = "static/{}.mp4".format(cls.str_to_md5(video_url))
+        headers = {
+            'Accept': '*/*',
+            'Accept-Language': 'zh,zh-CN;q=0.9',
+            'Connection': 'keep-alive',
+            'Origin': 'https://mp.weixin.qq.com',
+            'Referer': 'https://mp.weixin.qq.com/',
+            'Sec-Fetch-Dest': 'video',
+            'Sec-Fetch-Mode': 'cors',
+            'Sec-Fetch-Site': 'cross-site',
+            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
+            'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
+            'sec-ch-ua-mobile': '?0',
+            'sec-ch-ua-platform': '"macOS"'
+        }
+        res = requests.get(video_url, headers=headers)
+        with open(save_path, "wb") as f:
+            f.write(res.content)
+
+        TEN_KB = 1024 * 10
+        if os.path.getsize(save_path) > TEN_KB:
+            return save_path
+        else:
+            return None
+
+    @classmethod
+    def upload_to_oss(cls, local_video_path):
+        """
+        把视频上传到 oss
+        :return:
+        """
+        oss_video_key = "long_articles/video/" + str(uuid4())
+        access_key_id = "LTAIP6x1l3DXfSxm"
+        access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+        endpoint = "oss-cn-hangzhou.aliyuncs.com"
+        bucket_name = "art-pubbucket"
+        bucket = oss2.Bucket(
+            oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
+        )
+        bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
+        return oss_video_key

+ 18 - 0
applications/longArticlesMysql.py

@@ -59,3 +59,21 @@ class longArticlesMySQL(object):
             cls.connection.rollback()
             raise e
 
+    @classmethod
+    def select_json(cls, sql):
+        """
+        查询
+        :param sql:
+        :return:
+        """
+        cursor = cls.connection.cursor()
+        cursor.execute(sql)
+        result = cursor.fetchall()
+        json_data = [
+            dict(
+                zip([column[0] for column in cursor.description], row)
+            )
+            for row in result
+        ]
+        return json_data
+

+ 7 - 4
applications/wxSpiderApi.py

@@ -21,7 +21,7 @@ class WeixinSpider(object):
 
     @classmethod
     @retryOnNone()
-    def search_articles(cls, title) -> dict:
+    def search_articles(cls, title, page="1") -> dict:
         """
         search articles in wx
         :return:
@@ -29,15 +29,17 @@ class WeixinSpider(object):
         url = "{}/keyword".format(cls.base_url)
         payload = json.dumps({
             "keyword": title,
-            "cursor": "1"
+            "cursor": page
         })
         response = requests.request("POST", url, headers=cls.headers, data=payload, timeout=120)
         return response.json()
 
     @classmethod
-    def get_article_text(cls, content_link, is_count=False) -> dict:
+    @retryOnNone()
+    def get_article_text(cls, content_link, is_count=False, is_cache=True) -> dict:
         """
         获取文章
+        :param is_cache:
         :param is_count:
         :param content_link:
         :return:
@@ -46,7 +48,8 @@ class WeixinSpider(object):
         payload = json.dumps({
             "content_link": content_link,
             "is_count": is_count,
-            "is_ad": False
+            "is_ad": False,
+            "is_cache": is_cache
         })
         response = requests.request("POST", url, headers=cls.headers, data=payload, timeout=120)
         return response.json()

+ 176 - 0
coldStartTasks/crawler/weixin_account_crawler.py

@@ -0,0 +1,176 @@
+"""
+@author: luojunhui
+"""
+import time
+import traceback
+from typing import List, Set, Dict
+
+from tqdm import tqdm
+
+from applications import WeixinSpider, longArticlesMySQL, log, bot
+from applications.const import WeixinVideoCrawlerConst
+from applications.functions import Functions
+
+const = WeixinVideoCrawlerConst()
+function = Functions()
+
+
+class WeixinAccountCrawler(object):
+    """
+    账号抓取
+    """
+
+    def __init__(self):
+        self.db_client = longArticlesMySQL()
+        self.spider = WeixinSpider()
+        self.account_count = 0
+
+    def get_inner_account_name(self) -> Set[str]:
+        """
+        获取内部账号名称
+        :return:
+        """
+        sql = "select distinct account_name from datastat_sort_strategy;"
+        account_name_list = self.db_client.select_json(sql)
+        account_name_set = set()
+        for account_name_obj in account_name_list:
+            account_name_set.add(account_name_obj['account_name'])
+        return account_name_set
+
+    def get_seed_titles(self) -> List[str]:
+        """
+        :return:
+        """
+        publish_timestamp_threshold = int(time.time()) - const.STAT_PERIOD
+        sql = f"""
+            SELECT distinct title
+            FROM datastat_sort_strategy
+            WHERE read_rate > {const.READ_AVG_MULTIPLE} and view_count > {const.MIN_READ_COUNT} and publish_timestamp > {publish_timestamp_threshold};
+            ORDER BY read_rate DESC;
+        """
+        title_list = self.db_client.select_json(sql)
+        title_list = [i['title'] for i in title_list]
+        return title_list
+
+    def is_original(self, article_url: str) -> bool:
+        """
+        判断视频是否是原创
+        :return:
+        """
+        response = self.spider.get_article_text(article_url)
+        data = response['data']['data']
+        return data['is_original']
+
+    def insert_account(self, gh_id: str, account_name: str) -> int:
+        """
+        插入账号
+        :param account_name:
+        :param gh_id:
+        :return:
+        """
+        init_date = time.strftime("%Y-%m-%d", time.localtime())
+        sql = """
+            INSERT IGNORE INTO weixin_account_for_videos
+            (gh_id, account_name, account_init_date)
+            VALUES 
+            (%s, %s, %s);
+        """
+        insert_rows = self.db_client.update(sql, (gh_id, account_name, init_date))
+        return insert_rows
+
+    def process_search_result(self, response: Dict, inner_account_set: Set[str]):
+        """
+        处理搜索结果
+        :param response:
+        :param inner_account_set:
+        :return:
+        """
+        if response['code'] != 0:
+            return
+
+        article_list = response['data']['data']
+        if article_list:
+            for article in article_list:
+                try:
+                    # 先判断账号是否内部账号
+                    article_url = article['url']
+                    account_detail = self.spider.get_account_by_url(article_url)
+                    account_detail = account_detail['data']['data']
+                    account_name = account_detail['account_name']
+                    gh_id = account_detail['wx_gh']
+                    if account_name in inner_account_set:
+                        continue
+                    # 判断搜索结果是否原创
+                    if self.is_original(article_url):
+                        continue
+
+                    # 判断是否有视频链接
+                    try:
+                        video_url = function.get_video_url(article_url)
+                    except Exception as e:
+                        continue
+
+                    if not video_url:
+                        continue
+
+                    # 将账号抓取进来
+                    insert_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
+                    if insert_rows:
+                        log(
+                            task="account_crawler_v1",
+                            function="process_search_result",
+                            message="insert account success",
+                            data={
+                                "gh_id": gh_id,
+                                "account_name": account_name
+                            }
+                        )
+                        self.account_count += 1
+                except Exception as e:
+                    log(
+                        task="account_crawler_v1",
+                        function="process_search_result",
+                        message="insert account error",
+                        data={
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                            "data": article
+                        }
+                    )
+
+    def search_title_in_weixin(self, title: str, inner_account_set: Set[str]) -> None:
+        """
+        调用搜索接口,在微信搜索
+        :param inner_account_set:
+        :param title:
+        :return:
+        """
+        for page_index in tqdm(range(1, const.MAX_SEARCH_PAGE_NUM + 1), desc='searching: {}'.format(title)):
+            response = self.spider.search_articles(title, page=str(page_index))
+            self.process_search_result(response, inner_account_set)
+            time.sleep(const.SLEEP_SECONDS)
+
+    def run(self) -> None:
+        """
+        入口函数
+        :return:
+        """
+        # get seed titles
+        title_list = self.get_seed_titles()
+        # get inner accounts set
+        inner_account_set = self.get_inner_account_name()
+
+        start_time = time.time()
+        for title in tqdm(title_list, desc="search each title"):
+            self.search_title_in_weixin(title, inner_account_set)
+
+        # 通知
+        bot(
+            title="微信账号抓取完成",
+            detail={
+                "总更新账号数量": self.account_count,
+                "总耗时": time.time() - start_time,
+                "种子标题数量": len(title_list)
+            },
+            mention=False
+        )