浏览代码

developing

luojunhui 2 月之前
父节点
当前提交
cd46e02d0f

+ 18 - 0
account_crawler_task.py

@@ -8,10 +8,12 @@ import datetime
 from applications.api.feishu_api import FeishuSheetApi
 from applications.api.feishu_api import FeishuSheetApi
 from tasks.crawler_accounts_by_association import ChannelsAccountCrawler
 from tasks.crawler_accounts_by_association import ChannelsAccountCrawler
 from tasks.crawler_accounts_by_association import ToutiaoAccountCrawler
 from tasks.crawler_accounts_by_association import ToutiaoAccountCrawler
+from tasks.crawler_accounts_by_association import HaoKanAccountCrawler
 
 
 document_token = "BGQCsOXwHhVRq5tswjgcI8NInqd"
 document_token = "BGQCsOXwHhVRq5tswjgcI8NInqd"
 toutiao_sheet_id = "pIJSt7"
 toutiao_sheet_id = "pIJSt7"
 channels_sheet_id = "ee0163"
 channels_sheet_id = "ee0163"
+haokan_sheet_id = 'tfftfD'
 
 
 
 
 def insert_data_into_feishu_sheet(platform: str, data_list: list[list[str]]) -> None:
 def insert_data_into_feishu_sheet(platform: str, data_list: list[list[str]]) -> None:
@@ -30,6 +32,8 @@ def insert_data_into_feishu_sheet(platform: str, data_list: list[list[str]]) ->
             sheet_id = toutiao_sheet_id
             sheet_id = toutiao_sheet_id
         case "channels":
         case "channels":
             sheet_id = channels_sheet_id
             sheet_id = channels_sheet_id
+        case 'hksp':
+            sheet_id = haokan_sheet_id
         case _:
         case _:
             raise RuntimeError("platform error")
             raise RuntimeError("platform error")
 
 
@@ -71,3 +75,17 @@ if __name__ == "__main__":
     toutiao_account_crawler.update_video_status(
     toutiao_account_crawler.update_video_status(
         video_id_tuple=tuple(video_id_list), ori_status=0, new_status=1
         video_id_tuple=tuple(video_id_list), ori_status=0, new_status=1
     )
     )
+
+    # crawler haokanshipin
+    haokan_account_crawler = HaoKanAccountCrawler()
+    haokan_account_crawler.deal()
+    video_list = haokan_account_crawler.get_video_list_with_score(platform="hksp")
+    insert_data_into_feishu_sheet(platform="hksp", data_list=video_list)
+    video_id_list = [i[0] for i in video_list]
+    haokan_account_crawler.update_video_status(
+        video_id_tuple=tuple(video_id_list), ori_status=0, new_status=1
+    )
+
+
+
+

+ 3 - 1
coldStartTasks/crawler/baidu/__init__.py

@@ -1 +1,3 @@
-from .video_crawler import BaiduVideoCrawler
+from .video_crawler import BaiduVideoCrawler
+from .spider import haokan_search_videos
+from .spider import haokan_fetch_video_detail

+ 0 - 4
coldStartTasks/crawler/baidu/account_crawler.py

@@ -1,4 +0,0 @@
-"""
-@author: luojunhui
-"""
-

+ 106 - 0
coldStartTasks/crawler/baidu/spider.py

@@ -0,0 +1,106 @@
+from __future__ import annotations
+
+import json
+import base64
+import hashlib
+import requests
+import urllib.parse
+from datetime import datetime
+from tenacity import retry
+from uuid import uuid4
+from fake_useragent import FakeUserAgent
+
+from applications import log
+from applications.utils import proxy, request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def haokan_search_videos(search_key: str) -> dict | None:
+    """
+    get haokan search videos
+    :param search_key: search key
+    :return: haokan search videos
+    """
+    timestamp_with_ms = datetime.now().timestamp()
+    timestamp_ms = int(timestamp_with_ms * 1000)
+    query_string = urllib.parse.quote(search_key)
+    strings = "{}_{}_{}_{}_{}".format(1, query_string, 10, timestamp_ms, 1)
+    sign = hashlib.md5(strings.encode()).hexdigest()
+    url = f"https://haokan.baidu.com/haokan/ui-search/pc/search/video?pn=1&rn=10&type=video&query={query_string}&sign={sign}&version=1&timestamp={timestamp_ms}"
+    base_64_string = base64.b64encode(str(uuid4()).encode()).decode()
+    headers = {
+        "Accept": "*/*",
+        "Accept-Language": "zh",
+        "Connection": "keep-alive",
+        "Referer": "https://haokan.baidu.com/web/search/page?query={}".format(
+            query_string
+        ),
+        "User-Agent": FakeUserAgent().chrome,
+        "Cookie": "BAIDUID={}".format(base_64_string),
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy(), timeout=120)
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_search_videos",
+            message=f"API请求失败: {e}",
+            data={"search_key": search_key},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_search_videos",
+            message=f"响应解析失败: {e}",
+            data={"search_key": search_key},
+        )
+    return None
+
+@retry(**retry_desc)
+def haokan_fetch_video_detail(video_id: str) -> dict | None:
+    """
+    get haokan video detail
+    :param video_id: video id
+    :return: haokan video detail
+    """
+    url = "https://haokan.baidu.com/v"
+    params = {
+        'vid': video_id,
+        '_format': 'json'
+    }
+    base_64_string = base64.b64encode(str(uuid4()).encode()).decode()
+    headers = {
+        'Accept': '*/*',
+        'cookie': "BIDUPSID={}".format(base_64_string),
+        'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Content-Type': 'application/x-www-form-urlencoded',
+        'Referer': 'https://haokan.baidu.com',
+        'User-Agent': FakeUserAgent().chrome,
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy(), params=params, timeout=120)
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_get_detail",
+            message=f"API请求失败: {e}",
+            data={"video_id": video_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_get_detail",
+            message=f"响应解析失败: {e}",
+            data={"video_id": video_id},
+        )
+    return None
+
+

+ 155 - 39
tasks/crawler_accounts_by_association.py

@@ -4,8 +4,6 @@
 
 
 from __future__ import annotations
 from __future__ import annotations
 
 
-import json
-import time
 import datetime
 import datetime
 import traceback
 import traceback
 import numpy as np
 import numpy as np
@@ -19,6 +17,7 @@ from applications.db import DatabaseConnector
 from applications.pipeline import scrape_account_entities_process
 from applications.pipeline import scrape_account_entities_process
 from applications.utils import Item
 from applications.utils import Item
 from applications.utils import insert_into_associated_recommendation_table
 from applications.utils import insert_into_associated_recommendation_table
+from coldStartTasks.crawler.baidu import haokan_search_videos
 from coldStartTasks.crawler.toutiao import get_associated_recommendation
 from coldStartTasks.crawler.toutiao import get_associated_recommendation
 from coldStartTasks.crawler.channels import search_in_wechat_channel
 from coldStartTasks.crawler.channels import search_in_wechat_channel
 from coldStartTasks.crawler.channels import get_channel_account_videos
 from coldStartTasks.crawler.channels import get_channel_account_videos
@@ -104,10 +103,10 @@ class CrawlerAccounts:
         fetch_query = f"""
         fetch_query = f"""
             select id, account_name, recommend_video_id, title, read_cnt, duration, seed_account, seed_title
             select id, account_name, recommend_video_id, title, read_cnt, duration, seed_account, seed_title
             from video_association 
             from video_association 
-            where score > 0.5 and platform = '{platform}' and status = 0
+            where score > %s and platform = %s and status = %s
             order by account_name;
             order by account_name;
         """
         """
-        fetch_response = self.db_client.fetch(query=fetch_query)
+        fetch_response = self.db_client.fetch(query=fetch_query, params=(0.5, platform, 0))
         return fetch_response
         return fetch_response
 
 
     def update_video_status(self, video_id_tuple: tuple, ori_status: int, new_status: int) -> int:
     def update_video_status(self, video_id_tuple: tuple, ori_status: int, new_status: int) -> int:
@@ -129,42 +128,54 @@ class ChannelsAccountCrawler(CrawlerAccounts):
         2. use search api to get accounts
         2. use search api to get accounts
     """
     """
 
 
-    def process_each_video(self, video: dict, seed_title: str):
+    def process_channels_video(self, video: dict, seed_title: str, account_name: str, account_id: str):
         """
         """
         process video item and save to database
         process video item and save to database
         """
         """
+
+        video_item = Item()
+        video_item.add("account_name", account_name)
+        video_item.add("account_id", account_id)
+        video_item.add("recommend_video_id", video["id"])
+        video_item.add("title", video["objectDesc"]["description"])
+        video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"])
+        video_item.add("seed_account", "SearchWithOutAccount")
+        video_item.add("seed_title", seed_title)
+        video_item.add(
+            "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")
+        )
+        video_item.add("platform", "sph")
+        # check item
+        video_item.check(source="association")
+
+        # save to db
+        self.insert_video_into_recommend_table(video_item.item)
+
+    def process_search_response(self, video: dict, seed_title: str):
+        """
+        通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
+        """
         account_name = video["items"][0]["source"]["title"]
         account_name = video["items"][0]["source"]["title"]
+        # search account detail
         search_account_response = search_in_wechat_channel(
         search_account_response = search_in_wechat_channel(
             search_key=account_name, search_type=2
             search_key=account_name, search_type=2
         )
         )
         account_detail = search_account_response["data"]["data"][0]["items"][0]
         account_detail = search_account_response["data"]["data"][0]["items"][0]
         account_id = account_detail["jumpInfo"]["userName"]
         account_id = account_detail["jumpInfo"]["userName"]
+
+        # fetch account video list
         search_video_response = get_channel_account_videos(account_id)
         search_video_response = get_channel_account_videos(account_id)
         video_list = search_video_response["data"]["object"]
         video_list = search_video_response["data"]["object"]
-        for video in video_list[:5]:
-            try:
-                video_item = Item()
-                video_item.add("account_name", account_name)
-                video_item.add("account_id", account_id)
-                video_item.add("recommend_video_id", video["id"])
-                video_item.add("title", video["objectDesc"]["description"])
-                video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"])
-                video_item.add("seed_account", "SearchWithOutAccount")
-                video_item.add("seed_title", seed_title)
-                video_item.add(
-                    "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")
-                )
-                video_item.add("platform", "sph")
-                # check item
-                video_item.check(source="association")
 
 
-                # save to db
-                self.insert_video_into_recommend_table(video_item.item)
+        # process and insert each video
+        for video in video_list:
+            try:
+                self.process_channels_video(video, seed_title, account_name, account_id)
             except Exception as e:
             except Exception as e:
                 log(
                 log(
-                    task="channel account crawler",
-                    function="process_each_video",
-                    message="create item and save to db failed",
+                    task="crawler_channels_account_videos",
+                    function="process_channels_video",
+                    message="process video failed",
                     data={
                     data={
                         "video": video,
                         "video": video,
                         "error": str(e),
                         "error": str(e),
@@ -172,29 +183,43 @@ class ChannelsAccountCrawler(CrawlerAccounts):
                     }
                     }
                 )
                 )
 
 
-    def search_by_title_from_database(self, title: str) -> None:
+    def search_video_in_channels(self, title: str) -> None:
         """
         """
         search
         search
         """
         """
         search_response = search_in_wechat_channel(search_key=title, search_type=1)
         search_response = search_in_wechat_channel(search_key=title, search_type=1)
-        # print(search_response)
         video_list = search_response["data"]["data"][0]["subBoxes"]
         video_list = search_response["data"]["data"][0]["subBoxes"]
         for video in tqdm(video_list, desc="crawler each video"):
         for video in tqdm(video_list, desc="crawler each video"):
             try:
             try:
-                self.process_each_video(video, seed_title=title)
+                self.process_search_response(video, seed_title=title)
             except Exception as e:
             except Exception as e:
-                print(e)
-
-    def search_by_title_from_hotpoint(self, title: str) -> None:
-        return
+                log(
+                    task="channels account crawler",
+                    function="process_search_response",
+                    message="search by title failed",
+                    data={
+                        "video": video,
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+                )
 
 
     def deal(self):
     def deal(self):
         seed_title_list = self.get_seed_keys()
         seed_title_list = self.get_seed_keys()
         for item in tqdm(seed_title_list, desc="crawler each title"):
         for item in tqdm(seed_title_list, desc="crawler each title"):
             try:
             try:
-                self.search_by_title_from_database(title=item["title"])
+                self.search_video_in_channels(title=item["title"])
             except Exception as e:
             except Exception as e:
-                print(e)
+                log(
+                    task="channels account crawler",
+                    function="search_video_in_channels",
+                    message="search video in channels failed",
+                    data={
+                        "title": item["title"],
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+                )
 
 
         # cal similarity score
         # cal similarity score
         video_list = self.get_video_list_without_score()
         video_list = self.get_video_list_without_score()
@@ -216,7 +241,7 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
         )
         )
         return seed_video_list
         return seed_video_list
 
 
-    def process_each_video(self, video, seed_account_name, seed_title):
+    def process_toutiao_video(self, video, seed_account_name, seed_title):
 
 
         # process video item and save to database
         # process video item and save to database
         video_item = Item()
         video_item = Item()
@@ -247,9 +272,25 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
         recommend_video_list = recommend_response["data"]
         recommend_video_list = recommend_response["data"]
         for video in tqdm(recommend_video_list):
         for video in tqdm(recommend_video_list):
             try:
             try:
-                self.process_each_video(video, seed_account_name, seed_title)
+                self.process_toutiao_video(video, seed_account_name, seed_title)
+
             except Exception as e:
             except Exception as e:
-                print(e)
+                log(
+                    task="toutiao account crawler",
+                    function="process_toutiao_video",
+                    message="get recommend video failed",
+                    data={
+                        "video": video,
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+                )
+
+    def get_category_recommend_list(self):
+        """
+        品类推荐流几乎无视频,暂时不做
+        """
+        return NotImplementedError()
 
 
     def deal(self):
     def deal(self):
 
 
@@ -277,5 +318,80 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
 
 
 
 
 class HaoKanAccountCrawler(CrawlerAccounts):
 class HaoKanAccountCrawler(CrawlerAccounts):
+
+    def process_haokan_video(self, video: dict, seed_title: str) -> None:
+        """
+        process_haokan_video
+        """
+
+        video_item = Item()
+        video_item.add("account_name", video['author'])
+        video_item.add("account_id", video['author_id'])
+        video_item.add("platform", "hksp")
+        video_item.add("recommend_video_id", video['vid'])
+        video_item.add("title", video['title'])
+        read_num_string = video['read_num'].replace("次播放", "")
+        if "万" in read_num_string:
+            read_num_string = read_num_string.replace("万", "")
+            read_num = int(float(read_num_string) * 10000)
+        else:
+            read_num = int(read_num_string)
+        video_item.add("read_cnt", int(read_num))
+        duration_string = video['duration']
+        duration_list = duration_string.split(":")
+        if len(duration_list) > 2:
+            # video too long
+            return
+        duration = int(duration_list[0]) * 60 + int(duration_list[1])
+        video_item.add("duration", duration)
+        video_item.add("seed_account", "SearchWithOutAccount")
+        video_item.add("seed_title", seed_title)
+        video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+        # check item
+        video_item.check(source="association")
+
+        # insert into database
+        self.insert_video_into_recommend_table(video_item.item)
+
+    def search_videos_in_haokan_video(self, title: str) -> None:
+        """
+        search_
+        """
+        search_response = haokan_search_videos(title)
+        video_list = search_response["data"]["list"]
+        for video in tqdm(video_list, desc="search videos"):
+            try:
+                self.process_haokan_video(video, seed_title=title)
+            except Exception as e:
+                log(
+                    task="haokan_search_crawler",
+                    function="process_haokan_video",
+                    message="process haokan video failed",
+                    data={
+                        "video": video,
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+
+                )
+
     def deal(self):
     def deal(self):
-        raise NotImplementedError()
+        seed_title_list = self.get_seed_keys()
+        for seed_title in tqdm(seed_title_list, desc="crawler each title"):
+            try:
+                self.search_videos_in_haokan_video(seed_title["title"])
+            except Exception as e:
+                log(
+                    task="haokan_search_crawler",
+                    function="search_videos_in_haokan_video",
+                    message="search videos in haokan video failed",
+                    data={
+                        "title": seed_title["title"],
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+                )
+
+        video_list = self.get_video_list_without_score()
+        affected_rows = self.save_similarity_score_to_table(video_list)
+        print(affected_rows)