Jelajahi Sumber

2024-09-25
发布新版本文章匹配小程序服务

罗俊辉 7 bulan lalu
induk
melakukan
71d8ba854b
6 mengubah file dengan 40 tambahan dan 17 penghapusan
  1. 3 1
      requirements.txt
  2. 9 3
      tasks/etl_task.py
  3. 11 4
      tasks/history_task.py
  4. 6 4
      tasks/kimi_task.py
  5. 5 0
      tasks/publish_task.py
  6. 6 5
      tasks/spider_task.py

+ 3 - 1
requirements.txt

@@ -11,4 +11,6 @@ pyapollos~=0.1.5
 aiomysql~=0.2.0
 oss2~=2.19.0
 lxml~=5.3.0
-openai~=1.47.1
+openai~=1.47.1
+fake-useragent
+pycryptodome

+ 9 - 3
tasks/etl_task.py

@@ -7,12 +7,15 @@ import oss2
 import aiohttp
 import aiofiles
 import asyncio
+import requests
+
+from datetime import datetime
 from hashlib import md5
 from uuid import uuid4
 
-import requests
 from fake_useragent import FakeUserAgent
 from applications.config import Config
+from applications.log import logging
 
 
 async def downloadCover(file_path, platform, cover_url):
@@ -293,8 +296,11 @@ class AsyncETL(object):
         :return:
         """
         task_list = await self.getTasks()
+        logging(
+            code="5001",
+            info="ETL Task Got {} this time".format(len(task_list)),
+            function="ETL"
+        )
         if task_list:
             tasks = [self.processTask(params) for params in task_list]
             await asyncio.gather(*tasks)
-        else:
-            print("No spider tasks")

+ 11 - 4
tasks/history_task.py

@@ -157,6 +157,11 @@ class historyContentIdTask(object):
             sql=update_sql,
             params=(2, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
         )
+        logging(
+            code="9002",
+            info="已经从历史文章更新",
+            trace_id=trace_id
+        )
 
     async def processTask(self, params):
         """
@@ -204,11 +209,13 @@ class historyContentIdTask(object):
         :return:
         """
         task_list = await self.getTaskList()
+        logging(
+            code="5002",
+            info="History content_task Task Got {} this time".format(len(task_list)),
+            function="History Contents Task"
+        )
         if task_list:
             tasks = [self.processTask(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )
+            print("暂时未获得历史已存在文章")

+ 6 - 4
tasks/kimi_task.py

@@ -88,11 +88,13 @@ class KimiTask(object):
         :return:
         """
         task_list = await self.getTasks()
+        logging(
+            code="5003",
+            info="KIMI Task Got {} this time".format(len(task_list)),
+            function="Kimi Task"
+        )
         if task_list:
             tasks = [self.processTask(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
-            logging(
-                code="9008",
-                info="没有要处理的 kimi 任务"
-            )
+            print("没有要处理的 kimi 文章")

+ 5 - 0
tasks/publish_task.py

@@ -176,6 +176,11 @@ class publishTask(object):
         :return:
         """
         task_list = await self.getTasks()
+        logging(
+            code="5004",
+            info="PublishTask Got {} this time".format(len(task_list)),
+            function="Publish Task"
+        )
         if task_list:
             tasks = [self.processTask(params) for params in task_list]
             await asyncio.gather(*tasks)

+ 6 - 5
tasks/spider_task.py

@@ -234,12 +234,13 @@ class spiderTask(object):
         :return:
         """
         task_list = await self.getTask()
-        print(task_list)
+        logging(
+            code="5005",
+            info="Spider Task Got {} this time".format(len(task_list)),
+            function="Spider Task"
+        )
         if task_list:
             tasks = [self.processTask(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
-            logging(
-                code="9008",
-                info="爬虫池没有要处理的请求"
-            )
+            print("没有新的爬虫请求")