linfan 2 سال پیش
والد
کامیت
1e2f303b50
5فایلهای تغییر یافته به همراه166 افزوده شده و 5 حذف شده
  1. 78 0
      calCtr.py
  2. 53 0
      export_vid.py
  3. 3 3
      import_redist.py
  4. 12 2
      run.sh
  5. 20 0
      run_ctr.sh

+ 78 - 0
calCtr.py

@@ -0,0 +1,78 @@
+#coding utf-8
+import sys
+from operator import itemgetter
+import json
+import pandas as pd
+from db_help import RedisHelper
+
+if __name__=="__main__":
+    #1.load data
+    nowdate=sys.argv[1]
+    vlog='0'
+    love_live = 4
+    data_path = "./data/video_data_"+nowdate
+    f = open(data_path)
+    #data = pd.read_csv(data_path, encoding="utf-8", sep='\t')
+    #print(data)
+    index = 0
+    data_dict = {}
+    redis_helper = RedisHelper()
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        if index==0:
+            index += 1
+            continue
+        index +=1
+        items = line.strip().split("\t")
+        if len(items)<11:
+            continue
+        vid = items[1]
+        view_users = items[4] 
+        view_pv = items[5]
+        play_users = items[6]
+        play_pv = items[7]
+        share_users = items[8]
+        share_pv = items[9]
+        return_users = items[10]
+        #print(items)
+        if vid not in data_dict:
+            data_dict[vid] = (view_users, view_pv, play_users, play_pv, share_users, share_pv, return_users)
+        else:
+            item_info = data_dict[vid]
+            view_users = item_info[0]+view_users
+            view_pv = item_info[1]+view_pv
+            play_users = item_info[2]+play_pv
+            share_users = item_info[3]+share_users
+            share_pv = item_info[4]+share_pv
+            return_users = item_info[5]+return_users 
+            data_dict[vid] = (view_users, view_pv, play_users, play_pv, share_users, share_pv, return_users)
+    #print(data_dict.items())
+    info_dict = {}
+    data_path = "./data/sorted_data_"+nowdate
+    f = open(data_path, 'w')
+    for k, v in data_dict.items():
+        #print(v)
+        return_users = v[6]
+        print(return_users)
+        view_users = v[0]
+        view_pv = v[1]
+        share_pv = v[5]
+        share_users = [4]
+        play_users = v[2]
+        #print("return_users:", return_users) 
+        k_score = float(return_users)/(float(view_users)+10)
+        #print(k_score)
+        share_score = float(share_pv)/(float(view_pv)+5)
+        backrate = float(return_users)/(float(share_pv)+5)
+        #print(k, k_score, share_score*backrate, share_score, backrate) 
+        score_info = [k_score, share_score*backrate, share_score, backrate]
+        k = "k_p:"+k
+        score_info = json.dumps(score_info)
+        info_dict[k] = score_info
+        f.write(k+"\t"+score_info+"\n")
+    redis_helper.update_batch_setnx_key(info_dict, 60*60*24*15) 
+    f.close()
+
+     

+ 53 - 0
export_vid.py

@@ -0,0 +1,53 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):    
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    now_date=sys.argv[1]
+    print("now date:", now_date)
+    table = 'video_data_each_hour_dataset_24h_total_apptype'
+    sql = "select apptype, videoid, preview人数, preview次数, view人数, view次数, play人数, play次数, share人数, share次数, 回流人数 from loghubods.video_data_each_hour_dataset_24h_total_apptype where dt="+now_date
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/video_data_"+now_date, sep='\t', index=None) 

+ 3 - 3
import_redist.py

@@ -15,11 +15,11 @@ if __name__=="__main__":
              for rec_item in rec_list:
                  rec_item_list.append((rec_item[0], round(rec_item[1],3)))
              print(line[0]+"\t"+json.dumps(rec_item_list))
-             key="sim_"+line[0]
+             key="sim_hot_"+line[0]
              import_data_dict[key] = json.dumps(rec_item_list)
          redis_helper = RedisHelper()
-         #redis_helper.update_batch_setnx_key(import_data_dict, 60*60*12)
+         redis_helper.update_batch_setnx_key(import_data_dict, 60*60*24*7)
          #con = redis_helper.connect()
-         res = redis_helper.get_data_from_redis("sim_14330133")
+         res = redis_helper.get_data_from_redis("sim_hot_14330133")
          print(res)
    

+ 12 - 2
run.sh

@@ -9,11 +9,21 @@ nowday=`date  +"%Y%m%d" -d -0days`
 last7day=`date  +"%Y%m%d" -d -8days`
 echo ${nowday} 
 echo ${last7day}
-#python extract_share_log.py ${last7day} ${nowday}
+mkdir -p ./data/
 
+python extract_share_log.py ${last7day} ${nowday}
+if [ $? -ne 0 ];
+then
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
 #nowday='20230505'
 #2. cal i2i result
-#python calI2I.py ${nowday} 
+python calI2I.py ${nowday}
+then
+    echo "[ERROR] echo 'calI2I"
+    exit 255
+fi
 
 #3.import res
 python import_redist.py "./data/rec_result_"${nowday}".json"

+ 20 - 0
run_ctr.sh

@@ -0,0 +1,20 @@
+#!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+
+conda activate python36 
+
+#1. download data
+nowday=`date  +"%Y%m%d%H" -d -0days`
+echo ${nowday} 
+#3.import res
+mkdir -p ./data/
+
+python export_vid.py  ${nowday}
+if [ $? -ne 0 ];
+then
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python calCtr.py ${nowday}