luojunhui преди 7 месеца
родител
ревизия
6de7e73b9d
променени са 5 файла, в които са добавени 185 реда и са изтрити 68 реда
  1. 5 2
      applications/utils/__init__.py
  2. 68 0
      applications/utils/item.py
  3. 51 0
      applications/utils/save_to_db.py
  4. 1 1
      crawler_sph_video.py
  5. 60 65
      tasks/crawler_channel_account_videos.py

+ 5 - 2
applications/utils/__init__.py

@@ -1,8 +1,11 @@
 """
 utils
 """
-from .cold_start import *
+from .cold_start import whether_title_sensitive
+from .cold_start import get_inner_account_set
 from .common import *
 from .download_video import download_gzh_video
 from .download_video import download_sph_video
-from .upload import upload_to_oss
+from .item import Item
+from .save_to_db import insert_into_single_video_source_table
+from .upload import upload_to_oss

+ 68 - 0
applications/utils/item.py

@@ -0,0 +1,68 @@
+"""
+@author: luojunhui
+"""
+import time
+
+default_single_video_table_fields = {
+    "platform": 'gzh',
+    "article_title": None,
+    "content_trace_id": None,
+    "read_cnt": 0,
+    "article_index": None,
+    "out_account_name": None,
+    "article_url": None,
+    "url_unique_md5": None,
+    "category": None,
+    "publish_timestamp": None,
+    "out_account_id": None,
+    "cover_url": None,
+    "crawler_timestamp": int(time.time()),
+    "source_account": 1,
+    "article_publish_type": None,
+    "like_cnt": 0,
+    "bad_status": 0,
+    "tags": None,
+    "video_oss_path": None
+}
+
+
+class Item(object):
+    """
+    format save to article meta table or single video source table
+    """
+
+    def __init__(self):
+        self.item = {}
+
+    def add(self, key, value):
+        """
+        add key value to item
+        """
+        self.item[key] = value
+
+    def check_video_item(self):
+        """
+        check video item
+        """
+        fields = list(default_single_video_table_fields.keys())
+        for field in fields:
+            if self.item.get(field):
+                continue
+            else:
+                self.item[field] = default_single_video_table_fields[field]
+
+    def check_article_item(self):
+        """
+        check article item
+        """
+        return
+
+    def check(self, source):
+        """
+        check item
+        """
+        match source:
+            case "video":
+                self.check_video_item()
+            case "article":
+                self.check_article_item()

+ 51 - 0
applications/utils/save_to_db.py

@@ -0,0 +1,51 @@
+"""
+@author: luojunhui
+"""
+import traceback
+from applications.aliyunLogApi import log
+
+
+def insert_into_single_video_source_table(db_client, video_item):
+    """
+    insert video into single video source table
+    """
+    insert_sql = f"""
+        INSERT INTO publish_single_video_source
+        (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
+        values
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+    """
+    try:
+        db_client.save(
+            query=insert_sql,
+            params=(
+                video_item['content_trace_id'],
+                video_item['article_title'],
+                video_item['out_account_id'],
+                video_item['out_account_name'],
+                video_item['read_cnt'],
+                video_item['like_cnt'],
+                video_item['article_url'],
+                video_item['cover_url'],
+                video_item['video_oss_path'],
+                video_item['publish_timestamp'],
+                video_item['crawler_timestamp'],
+                video_item['url_unique_md5'],
+                video_item['category'],
+                video_item['tags'],
+                video_item['platform'],
+                video_item['source_account'],
+            ),
+        )
+    except Exception as e:
+        log(
+            task="{}_video_crawler".format(video_item['platform']),
+            function="save_each_video",
+            message="save video failed",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+                "video_id": video_item['url_unique_md5'],
+                "oss_path": video_item['video_oss_path']
+            }
+        )

+ 1 - 1
crawler_sph_video.py

@@ -5,6 +5,6 @@ from tasks.crawler_channel_account_videos import CrawlerChannelAccountVideos
 
 if __name__ == '__main__':
     crawler_channel_account_videos = CrawlerChannelAccountVideos()
-    account_id_list = ['v2_060000231003b20faec8c4eb8d19c3ddcc05e436b077297f098648330f323fb59a08d3728da9@finder', 'v2_060000231003b20faec8c4e28c1fc6d6cf0cee3db07785fb5dfb3a9edb1e4292bac390059b08@finder', 'v2_060000231003b20faec8c7eb8e1bc6d4cf05ed31b0775730a60aeeff4a1a22c7f787eb67ab81@finder', 'v2_060000231003b20faec8c5eb8a1cc3d1c902e43cb0774ec288165f96c810e3553f5069c92d73@finder', 'v2_060000231003b20faec8c5ea891cc1d1cb01ee30b077cc40cdecdd2072d3e03e314da6b2fc03@finder', 'v2_060000231003b20faec8c6e68e1ac7dcce0ce83cb077a175e727cda90695998bcb35c241a695@finder', 'v2_060000231003b20faec8c4e58119c4dccf03e433b077b7d76d8c35bec9f372d711235fa0124a@finder', 'v2_060000231003b20faec8c7e18d1ec5d4c905ec36b077a09518fb5af988d48d4a17348901428a@finder']
+    account_id_list = ['v2_060000231003b20faec8c7e38d1fc0d3c60deb3cb077785c69d8019608ba83ca1448711fce38@finder']
     for account_id in account_id_list:
         crawler_channel_account_videos.crawler_each_account(channel_account_id=account_id, channel_account_name="")

+ 60 - 65
tasks/crawler_channel_account_videos.py

@@ -2,13 +2,17 @@
 @author: luojunhui
 @tool: pycharm && deepseek
 """
-import json
-import time
+
+import os
 import traceback
 
+from tqdm import tqdm
+
 from applications import log
 from applications.db import DatabaseConnector
 from applications.utils import download_sph_video
+from applications.utils import insert_into_single_video_source_table
+from applications.utils import Item
 from applications.utils import str_to_md5
 from applications.utils import upload_to_oss
 from config import long_articles_config
@@ -21,6 +25,7 @@ class CrawlerChannelAccountVideos:
     """
     crawler channel account videos
     """
+
     def __init__(self):
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
@@ -48,73 +53,63 @@ class CrawlerChannelAccountVideos:
         """
         return
 
+    def crawler_each_video(self, video: dict):
+        """
+        download each video
+        save video and decrypt video
+        upload video to oss
+        """
+        object_desc = video["objectDesc"]
+        title = object_desc["description"]
+        if self.whether_video_exists(title):
+            return
+
+        video_item = Item()
+        video_item.add("content_trace_id", "video{}".format(str_to_md5(video["id"])))
+        video_item.add("url_unique_md5", video["id"])
+        video_item.add("article_title", title)
+        video_item.add("out_account_id", video["username"])
+        video_item.add("out_account_name", video["nickname"])
+        video_item.add("publish_timestamp", video["createtime"])
+        media = object_desc["media"][0]
+        url = media["Url"]
+        decode_key = media["decodeKey"]
+        url_token = media["urlToken"]
+        download_url = url + url_token
+        try:
+            decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
+            oss_path = upload_to_oss(decrypt_path)
+            video_item.add("video_oss_path", oss_path)
+            video_item.add("source_account", NO_SOURCE_ACCOUNT)
+            video_item.check(source="video")
+            insert_into_single_video_source_table(self.db_client, video_item.item)
+            os.remove(decrypt_path)
+        except Exception as e:
+            log(
+                task="crawler_channel_account_videos",
+                function="crawler_each_video",
+                message="download video failed",
+                data={
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                    "video_id": video["id"],
+                },
+            )
+
     def crawler_each_account(self, channel_account_id: str, channel_account_name: str):
         """
         get channel account videos
         """
         response = get_channel_account_videos(channel_account_id)
-        if response['ret'] == 200:
-            response_data = response['data']
-            last_buffer = response_data['lastBuffer']
-            continue_flag = response_data['continueFlag']
-            video_list = response_data['object']
-            for video in video_list:
-                video_id = str(video['id'])
-                account_name = video['nickname']
-                object_desc = video['objectDesc']
-                publish_timestamp = video['createtime']
-                title = object_desc['description']
-                if self.whether_video_exists(title):
-                    continue
-
-                media = object_desc['media'][0]
-                url = media['Url']
-                decode_key = media['decodeKey']
-                url_token = media['urlToken']
-                download_url = url + url_token
-                try:
-                    decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
-                    oss_path = upload_to_oss(decrypt_path)
-                    insert_sql = f"""
-                        insert into publish_single_video_source
-                        (content_trace_id, article_title, out_account_id, out_account_name, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, platform, source_account)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                    """
-
-                    try:
-                        self.db_client.save(
-                            query=insert_sql,
-                            params=(
-                                "video{}".format(str_to_md5(video_id)),
-                                title,
-                                channel_account_id,
-                                account_name,
-                                oss_path,
-                                publish_timestamp,
-                                int(time.time()),
-                                video_id,
-                                "sph",
-                                NO_SOURCE_ACCOUNT
-                            ),
-                        )
-                        self.success_crawler_video_count += 1
-                    except Exception as e:
-                        log(
-                            task="baidu_video_crawler",
-                            function="save_each_video",
-                            message="save video failed",
-                            data={
-                                "error": str(e),
-                                "traceback": traceback.format_exc(),
-                                "video_id": video_id,
-                                "oss_path": oss_path,
-                            },
-                        )
-
-                except Exception as e:
-                    print("download video error:", e)
-
+        if response["ret"] == 200:
+            response_data = response["data"]
+            last_buffer = response_data["lastBuffer"]
+            continue_flag = response_data["continueFlag"]
+            video_list = response_data["object"]
+            crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
+            for video in crawl_video_list_bar:
+                crawl_video_list_bar.set_postfix({"video_id": video["id"]})
+                self.crawler_each_video(video)
         else:
             print(f"crawler channel account {channel_account_name} videos failed")
-            return
+            return