فهرست منبع

improve code format

luojunhui 4 ماه پیش
والد
کامیت
334e948282
4فایلهای تغییر یافته به همراه85 افزوده شده و 51 حذف شده
  1. 6 0
      applications/const/__init__.py
  2. 12 8
      applications/db/__init__.py
  3. 67 42
      coldStartTasks/crawler/baidu/video_crawler.py
  4. 0 1
      run_baidu_video_crawler.py

+ 6 - 0
applications/const/__init__.py

@@ -220,6 +220,12 @@ class BaiduVideoCrawlerConst:
     # no source account
     NO_SOURCE_ACCOUNT_STATUS = 0
 
+    # timestamp To Cursor
+    TIMESTAMP_TO_CURSOR = 10000
+
+    # local path dir
+    LOCAL_PATH_DIR = "static"
+
 
 
 

+ 12 - 8
applications/db/__init__.py

@@ -30,12 +30,12 @@ class DatabaseConnector:
         """
         try:
             self.connection = pymysql.connect(
-                host=self.db_config.get('host', 'localhost'),
-                user=self.db_config['user'],
-                password=self.db_config['password'],
-                db=self.db_config['db'],
-                port=self.db_config.get('port', 3306),
-                charset=self.db_config.get('charset', 'utf8mb4')
+                host=self.db_config.get("host", "localhost"),
+                user=self.db_config["user"],
+                password=self.db_config["password"],
+                db=self.db_config["db"],
+                port=self.db_config.get("port", 3306),
+                charset=self.db_config.get("charset", "utf8mb4"),
             )
         except pymysql.MySQLError as e:
             raise ConnectionError(f"无法连接到数据库: {e}")
@@ -48,9 +48,10 @@ class DatabaseConnector:
             self.connection.close()
             self.connection = None
 
-    def fetch(self, query, cursor_type=None):
+    def fetch(self, query, cursor_type=None, params=None):
         """
         执行单条查询语句,并返回结果。
+        :param params: 查询传参
         :param cursor_type: 输出的返回格式
         :param query: 查询语句
         :return: 查询结果列表
@@ -61,7 +62,10 @@ class DatabaseConnector:
 
         try:
             with self.connection.cursor(cursor_type) as cursor:
-                cursor.execute(query)
+                if params:
+                    cursor.execute(query, params)
+                else:
+                    cursor.execute(query)
                 result = cursor.fetchall()
                 return result
         except pymysql.MySQLError as e:

+ 67 - 42
coldStartTasks/crawler/baidu/video_crawler.py

@@ -2,6 +2,8 @@
 @author: luojunhui
 @description: video crawler
 """
+
+import os
 import json
 import time
 import traceback
@@ -32,6 +34,7 @@ class BaiduVideoCrawler(object):
     def __init__(self):
         self.db = None
         self.success_crawler_video_count = 0
+        self.connect_db()
 
     def connect_db(self) -> None:
         """
@@ -57,11 +60,11 @@ class BaiduVideoCrawler(object):
         whether video exists, use video_id && title
         """
         # check title
-        sql_2 = f"""
+        sql = f"""
             select id from publish_single_video_source
-            where article_title = '{title}';
+            where article_title = %s;
         """
-        duplicate_id = self.db.fetch(query=sql_2)
+        duplicate_id = self.db.fetch(query=sql, params=(title,))
         if duplicate_id:
             print(title + " video exists")
             return True
@@ -73,32 +76,32 @@ class BaiduVideoCrawler(object):
         download and save each video
         """
         # print(json.dumps(video, ensure_ascii=False, indent=4))
-        video_id = video['id']
-        title = video['title']
+        video_id = video["id"]
+        title = video["title"]
 
         # judge whether video exists
         if self.whether_video_exists(title):
             return
 
-        read_cnt = video.get('playcnt', 0)
-        like_cnt = video.get('like_num', 0)
-        publish_timestamp = video['publish_time']
+        read_cnt = video.get("playcnt", 0)
+        like_cnt = video.get("like_num", 0)
+        publish_timestamp = video["publish_time"]
         # duration = video['duration']
-        cover_url = video['poster']
-        video_url = video['playurl']
+        cover_url = video["poster"]
+        video_url = video["playurl"]
         # sensitive_flag = video.get('sensitive_flag')
-        video_more_info = video.get('contentcms_intervene_data')
+        video_more_info = video.get("contentcms_intervene_data")
         if video_more_info:
-            video_category_list = video_more_info.get('category_v2')
+            video_category_list = video_more_info.get("category_v2")
             if video_category_list:
                 video_category = video_category_list[0]
             else:
                 video_category = None
         else:
             video_category = None
-        manual_tags = video.get('manual_tags')
+        manual_tags = video.get("manual_tags")
 
-        video_path = 'static/{}.mp4'.format(video_id)
+        video_path = os.path.join(const.LOCAL_PATH_DIR, "{}.mp4".format(video_id))
         download_path = functions.download_baidu_videos(video_url, video_path)
         if download_path:
             oss_path = functions.upload_to_oss(local_video_path=download_path)
@@ -125,28 +128,49 @@ class BaiduVideoCrawler(object):
                         int(time.time()),
                         video_id,
                         video_category,
-                        json.dumps(manual_tags, ensure_ascii=False) if manual_tags else None,
+                        (
+                            json.dumps(manual_tags, ensure_ascii=False)
+                            if manual_tags
+                            else None
+                        ),
                         "baidu",
-                        const.NO_SOURCE_ACCOUNT_STATUS
-                    )
+                        const.NO_SOURCE_ACCOUNT_STATUS,
+                    ),
                 )
                 self.success_crawler_video_count += 1
             except Exception as e:
-                print(e)
+                log(
+                    task="baidu_video_crawler",
+                    function="save_each_video",
+                    message="save video failed",
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                        "video_id": video_id,
+                        "oss_path": oss_path,
+                    },
+                )
         else:
             print(f"download video failed, video_id: {video_id}")
 
-    def save_video_list(self, account_id: str, account_name: str, video_list: List[Dict]) -> None:
+    def save_video_list(
+        self, account_id: str, account_name: str, video_list: List[Dict]
+    ) -> None:
         """
         save video list
         """
-        # print(json.dumps(video_list, ensure_ascii=False, indent=4))
-        for video_obj in tqdm(video_list, desc="save video list"):
-            if video_obj['type'] == 'video':
-                video_id = video_obj['content']['vid']
+        progress_bar = tqdm(video_list, desc="crawler account: {}".format(account_name))
+        for video_obj in progress_bar:
+            if video_obj["type"] == "video":
+                video_id = video_obj["content"]["vid"]
                 try:
                     video_detail = baidu_single_video_crawler(video_id)
-                    self.save_each_video(video_detail, account_id=account_id, account_name=account_name)
+                    self.save_each_video(
+                        video=video_detail,
+                        account_id=account_id,
+                        account_name=account_name,
+                    )
+                    progress_bar.set_postfix({"videoId": video_id})
                 except SpiderError as e:
                     print("save single video fail", e)
                     continue
@@ -158,11 +182,11 @@ class BaiduVideoCrawler(object):
         crawler each account
         response_strategy
         """
-        account_id = account['account_id']
-        max_cursor = account['max_cursor']
+        account_id = account["account_id"]
+        max_cursor = account["max_cursor"]
         if not max_cursor:
             max_cursor = const.DEFAULT_CURSOR
-        account_name = account['account_name']
+        account_name = account["account_name"]
         try:
             response_json = baidu_account_video_crawler(account_id, cursor=cursor)
 
@@ -171,7 +195,7 @@ class BaiduVideoCrawler(object):
                 self.save_video_list(
                     account_id=account_id,
                     account_name=account_name,
-                    video_list=video_list
+                    video_list=video_list,
                 )
             # check next page
             has_next_page = response_json.get("has_more", False)
@@ -196,16 +220,13 @@ class BaiduVideoCrawler(object):
         response_mysql = self.db.fetch(query=select_sql)
         max_publish_timestamp = response_mysql[0][0]
         if max_publish_timestamp:
-            max_cursor = max_publish_timestamp * 10000
+            max_cursor = max_publish_timestamp * const.TIMESTAMP_TO_CURSOR
             update_sql = f"""
                 update baidu_account_for_videos
                 set max_cursor = %s
                 where account_id = %s;
             """
-            self.db.save(
-                query=update_sql,
-                params=(max_cursor, account_id)
-            )
+            self.db.save(query=update_sql, params=(max_cursor, account_id))
 
     def deal(self) -> None:
         """
@@ -214,10 +235,14 @@ class BaiduVideoCrawler(object):
         account_list = self.get_account_list()
         success_cnt = 0
         fail_cnt = 0
-        for account in account_list:
+        account_list_process_bar = tqdm(account_list, desc="process account list")
+        for account in account_list_process_bar:
             try:
+                account_list_process_bar.set_postfix(
+                    {"account_name": account["account_name"]}
+                )
                 self.crawler_each_account(account)
-                self.update_cursor(account['account_id'])
+                self.update_cursor(account["account_id"])
                 success_cnt += 1
             except Exception as e:
                 fail_cnt += 1
@@ -226,11 +251,11 @@ class BaiduVideoCrawler(object):
                     function="deal",
                     message="crawler each account failed",
                     data={
-                        "account_id": account['account_id'],
-                        "account_name": account['account_name'],
+                        "account_id": account["account_id"],
+                        "account_name": account["account_name"],
                         "error": str(e),
-                        "trace_back": traceback.format_exc()
-                    }
+                        "trace_back": traceback.format_exc(),
+                    },
                 )
         bot(
             title="baidu video crawler task finished",
@@ -238,7 +263,7 @@ class BaiduVideoCrawler(object):
                 "success_crawl_account_num": success_cnt,
                 "fail_crawl_account_num": fail_cnt,
                 "success_crawl_video_num": self.success_crawler_video_count,
-                "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt)
+                "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt),
             },
-            metion=False
-        )
+            metion=False,
+        )

+ 0 - 1
run_baidu_video_crawler.py

@@ -5,5 +5,4 @@ from coldStartTasks.crawler.baidu import BaiduVideoCrawler
 
 if __name__ == '__main__':
     task = BaiduVideoCrawler()
-    task.connect_db()
     task.deal()