zhangyong 3 månader sedan
förälder
incheckning
dfe65bb9f7
8 ändrade filer med 134 tillägg och 18 borttagningar
  1. 1 1
      Dockerfile
  2. 1 1
      common/odps_data.py
  3. 15 0
      common/redis.py
  4. 24 0
      docker-compose.yml
  5. 8 7
      job_data.py
  6. 77 0
      job_day_redis.py
  7. 5 5
      job_hour_data_redis.py
  8. 3 4
      top_automatic/top_data_processing.py

+ 1 - 1
Dockerfile

@@ -16,4 +16,4 @@ RUN apt update && apt --no-install-recommends install -y wget xz-utils nscd libg
     && ln -s /usr/local/ffmpeg-7.0.2-amd64-static/ffmpeg /usr/local/bin/ffmpeg \
     && mkdir -p /app/cache
 
-ENTRYPOINT ["python", "/app/job_data_redis.py"]
+ENTRYPOINT ["python", "/app/job_data.py"]

+ 1 - 1
common/odps_data.py

@@ -20,7 +20,7 @@ class OdpsDataCount:
         )
         data_values = []
         try:
-            sql = f'SELECT uid,videoid,return_uv,type,type_owner,channel,channel_owner,title FROM loghubods.all_apptype_top100_return WHERE dt = "{dt}" '
+            sql = f'SELECT uid,videoid,return_uv,type,type_owner,channel,channel_owner,title FROM loghubods.all_apptype_top1000_return WHERE dt = "{dt}" and  rank <= 100'
             with odps.execute_sql(sql).open_reader() as reader:
                 for row in reader:
                     data_values.append(json.dumps( {"uid": row[0], "videoid": row[1], "return_uv": row[2], "type": row[3], "type_owner": row[4], "channel": row[5], "channel_owner": row[6], "title": row[7], "dt": str(dt)}, ensure_ascii=False ))

+ 15 - 0
common/redis.py

@@ -49,6 +49,21 @@ def get_top_data(REDIS_NAME):
     ret = client.lpop(REDIS_NAME)
     return ret
 
+
+def get_llen_list(REDIS_NAME):
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    length = client.llen(REDIS_NAME)
+    return length
+
+def get_lrange_list(REDIS_NAME,count):
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    elements = client.lrange(REDIS_NAME, 0, count)
+    client.ltrim(REDIS_NAME, count, -1)
+    return elements
+
+
 def in_job_video_data(REDIS_NAME, ret):
     helper = SyncRedisHelper()
     client = helper.get_client()

+ 24 - 0
docker-compose.yml

@@ -10,6 +10,30 @@ services:
       - ENV=prod
     networks:
       - carry_net
+  worker2:
+    build:
+      context: .
+      dockerfile: Dockerfile
+    image: top_data
+    container_name: top_worker2
+    restart: unless-stopped
+    environment:
+      - ENV=prod
+    networks:
+      - carry_net
+    entrypoint: "python /app/job_data_redis.py.py"
+  worker3:
+    build:
+      context: .
+      dockerfile: Dockerfile
+    image: top_data
+    container_name: top_worker3
+    restart: unless-stopped
+    environment:
+      - ENV=prod
+    networks:
+      - carry_net
+    entrypoint: "python /app/job_hour_data_redis.py.py"
 networks:
   carry_net:
     name: carry_net

+ 8 - 7
job_data.py

@@ -18,6 +18,7 @@ def get_data_task():
     for i in range(100):
         top_task = get_top_data("task:top_all_data")
         if top_task:
+            time.sleep(1)
             data = json.loads(top_task)
             channel_id = data['channel']
             if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]:
@@ -45,7 +46,7 @@ def get_data_task():
                 logger.info(f"[+] 成功写入飞书表格")
                 logger.info(f"[+] 改内容为:{channel_id},不做处理")
                 if channel_id:
-                    AliyunLogger.logging(channel_id, data, "不处理","不处理")
+                    AliyunLogger.logging(channel_id, data, "不处理","fail")
                 continue
             top_tasks.add(top_task)
         else:
@@ -68,7 +69,7 @@ def video_task_start():
         except Exception as e:
             data = json.loads(data)
             in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
-            AliyunLogger.logging(data['channel'], data, "处理失败重新处理","处理失败重新处理")
+            AliyunLogger.logging(data['channel'], data, "处理失败重新处理","fail")
             logger.error(f"[+] {data}处理失败,失败信息{e}")
             continue
 
@@ -76,8 +77,8 @@ def schedule_tasks():
     schedule.every(10).minutes.do(video_task_start)
 
 if __name__ == '__main__':
-    # schedule_tasks()  # 调用任务调度函数
-    # while True:
-    #     schedule.run_pending()
-    #     time.sleep(1)  # 每秒钟检查一次
-    video_task_start()
+    schedule_tasks()  # 调用任务调度函数
+    while True:
+        schedule.run_pending()
+        time.sleep(1)  # 每秒钟检查一次
+    # video_task_start()

+ 77 - 0
job_day_redis.py

@@ -0,0 +1,77 @@
+import json
+import time
+from datetime import datetime
+from collections import defaultdict
+
+import schedule
+from loguru import logger
+import json
+
+from common.feishu_utils import Feishu
+from common.redis import insert_job_data, get_top_data, get_llen_list, get_lrange_list
+
+
+def jab_day_recommend():
+    """获取每日每小时top前100数据"""
+    try:
+        logger.info(f"开始获取溯源到的数据")
+        list_task = [{'task:top_data_ks_gjc': '77618314'}, {'task:top_data_dy_gjc':'77618313'}, {'task:top_data_ks_gz':'77618315'}, {'task:top_data_dy_gz':'77618316'}]
+        for task in list_task:
+            for key, value in task.items():
+
+                count = get_llen_list(key)
+                if count > 0:
+                    result = []
+                    channel = None
+                    tag_transport_channel = None
+                    top_task = get_lrange_list(key,count)
+                    for top in top_task:
+                        data = json.loads(top)
+                        channel_account_id = data.get("channel_account_id", "")
+                        tag_transport_channel = data.get("tag_transport_channel", "")
+                        channel = data.get("channel", "")
+                        result.append(channel_account_id)
+                    result_string = ",".join(result)
+                    values = [
+                        [
+                            "",
+                            tag_transport_channel,
+                            result_string,
+                            "lev-供给,rol-机器,#str-搬运搜索top视频溯源账号_42,genMod-账号",
+                            value,
+                            "2",
+                            count*3,
+                            "通用-分享到群",
+                            "AI片尾引导",
+                            "zhifeng_emo,sijia",
+                            "",
+                            "",
+                            "AI标题",
+                            "1",
+                            "",
+                            "王雪珂",
+                            channel
+                        ]
+                    ]
+                    Feishu.insert_columns("YqiSsMvvMhr5tCttL7tcRbHTnrd", "d79d48", "ROWS", 1, 2)
+                    time.sleep(0.5)
+                    Feishu.update_values("YqiSsMvvMhr5tCttL7tcRbHTnrd", "d79d48", "A2:Z2", values)
+                    logger.info(f"[+] 成功写入改造飞书表格")
+
+            else:
+                continue
+        logger.info(f"写入top前100数据成功")
+        return
+    except Exception as e:
+        logger.error(f"写入飞书异常,异常信息{e}")
+        return
+def schedule_tasks():
+    schedule.every().day.at("23:40").do(jab_day_recommend)
+
+
+if __name__ == "__main__":
+    # jab_day_recommend()
+    schedule_tasks()  # 调用任务调度函数
+    while True:
+        schedule.run_pending()
+        time.sleep(1)  # 每秒钟检查一次

+ 5 - 5
job_hour_data_redis.py

@@ -21,8 +21,8 @@ def schedule_tasks():
     schedule.every().hour.at(":35").do(jab_top_recommend)
 
 if __name__ == "__main__":
-    jab_top_recommend()
-    # schedule_tasks()  # 调用任务调度函数
-    # while True:
-    #     schedule.run_pending()
-    #     time.sleep(1)  # 每秒钟检查一次
+    # jab_top_recommend()
+    schedule_tasks()  # 调用任务调度函数
+    while True:
+        schedule.run_pending()
+        time.sleep(1)  # 每秒钟检查一次

+ 3 - 4
top_automatic/top_data_processing.py

@@ -118,12 +118,12 @@ class Top:
         channel_account_id = None
         tag_transport_channel = None
         data = json.loads(data)
-        AliyunLogger.logging(data['channel'], data, "开始获取","开始获取")
+        AliyunLogger.logging(data['channel'], data, "开始获取","fail")
         channel_id = data['channel']
         url_id, data_channel = sqlCollect.get_channle_id(data['videoid'])
         if not url_id:
             logger.info(f"[+] 任务{data},没有该视频信息")
-            AliyunLogger.logging(data['channel'], data, "没有该视频信息","没有该视频信息")
+            AliyunLogger.logging(data['channel'], data, "没有该视频信息","fail")
             return
         if "&vid=" in url_id or "?modal_id=" in url_id:
             host = urlparse(url_id).netloc
@@ -144,8 +144,8 @@ class Top:
             logger.info(f"[+] {url_id}开始获取快手视频链接")
             channel_account_id= self.get_text_ks_video(url=url_id)
         if not channel_account_id:
-            AliyunLogger.logging(data['channel'], data, "没有获取到视频用户ID,等待重新获取","没有获取到视频用户ID,等待重新获取")
             in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
+            AliyunLogger.logging(data['channel'], data, "没有获取到视频用户ID,等待重新获取","fail")
             return
         data["channel_account_id"] = channel_account_id
         if channel_id in ["抖音关键词抓取", "快手关键词抓取"]:
@@ -179,7 +179,6 @@ class Top:
         time.sleep(0.5)
         Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
         logger.info(f"[+] 成功写入飞书表格")
-
         return