소스 검색

update calI2I.py calI2I2.py clean.sh db_help.py extract_cur_share_log.py extract_user_action.py run.sh run_extract_tag.sh run_hour.sh

linfan 1 년 전
부모
커밋
71a0b2302e
9개의 변경된 파일246개의 추가작업 그리고 16개의 파일을 삭제
  1. 21 1
      calI2I.py
  2. 37 6
      calI2I2.py
  3. 9 1
      clean.sh
  4. 31 1
      db_help.py
  5. 54 0
      extract_cur_share_log.py
  6. 1 1
      extract_user_action.py
  7. 26 4
      run.sh
  8. 2 2
      run_extract_tag.sh
  9. 65 0
      run_hour.sh

+ 21 - 1
calI2I.py

@@ -27,6 +27,26 @@ if __name__=="__main__":
         else:
             item_dict[items[2]] = item_dict[items[2]]+1
     f.close()
+    nowhour=sys.argv[2]
+    f1 = open("./data/user_cur_day_item_share_"+nowhour)
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        key = (items[1],items[2])
+        #print(key)
+        if key not in user_item_dict:
+            user_item_dict[key] = 1
+        else:
+            user_item_dict[key] = user_item_dict[key]+1
+        if items[2] not in item_dict:
+            item_dict[items[2]] = 1
+        else:
+            item_dict[items[2]] = item_dict[items[2]]+1
+    f1.close()
     #((user,item), score)
     #print(user_item_dict)
     #2. (uid, [(vid, score)....])
@@ -116,7 +136,7 @@ if __name__=="__main__":
         final_rec_list.append((k, sorted_v))
     #print(final_rec_list[:1])
     #json_str = json.dumps(final_rec_list)
-    with open("./data/rec_result_"+nowdate+".json", "w") as f :
+    with open("./data/rec_result_"+nowhour+".json", "w") as f :
         json.dump(final_rec_list, f)
     
      

+ 37 - 6
calI2I2.py

@@ -6,7 +6,7 @@ import json
 if __name__=="__main__":
     #1.load data
     nowdate=sys.argv[1]
-    f = open("./data/user_item_share_"+nowdate)
+    f = open("./data/user_item_share_filter_"+nowdate)
     user_item_dict={}
     item_dict = {}  
     while True:
@@ -34,6 +34,35 @@ if __name__=="__main__":
         else:
             item_dict[vid] = item_dict[vid]+1
     f.close()
+    nowhour=sys.argv[2]
+    f1 = open("./data/user_cur_day_item_share_filter_"+nowhour)
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        vid = -1
+        try:
+            vid = int(items[2])
+        except:
+            continue
+        if vid == -1:
+            continue
+
+        key = (items[1],vid)
+        #print(key)
+        if key not in user_item_dict:
+            user_item_dict[key] = 1
+        else:
+            user_item_dict[key] = user_item_dict[key]+1
+        if vid not in item_dict:
+            item_dict[vid] = 1
+        else:
+            item_dict[vid] = item_dict[vid]+1
+    f1.close()
+
     #((user,item), score)
     #print(user_item_dict)
     #2. (uid, [(vid, score)....])
@@ -87,8 +116,9 @@ if __name__=="__main__":
         pair_score = v
         if item1 in item_dict:
             item_score1 = item_dict[item1]
-            if item_score1<10:
-                continue
+            #if item_score1<10:
+            #    continue
+            item_score1 = 1
             i2i_pro = float(pair_score)/(float(item_score1)+5)
             if i2i_pro<0.000001:
                 continue
@@ -102,8 +132,9 @@ if __name__=="__main__":
                 rec_item_dict[item2] = rec_list1
         if item2 in item_dict:
             item_score2 = item_dict[item2]
-            if item_score2<10:
-                continue
+            #if item_score2<10:
+            #    continue
+            item_score2 = 1.0
             i2i_pro = float(pair_score)/(float(item_score2)+5)
             if i2i_pro<0.000001:
                 continue
@@ -139,7 +170,7 @@ if __name__=="__main__":
         final_rec_list.append((k, sorted_v))
     #print(final_rec_list[:1])
     #json_str = json.dumps(final_rec_list)
-    with open("./data/rec_result_"+nowdate+".json", "w") as f :
+    with open("./data/rec_result3_"+nowhour+".json", "w") as f :
         json.dump(final_rec_list, f)
     
      

+ 9 - 1
clean.sh

@@ -3,18 +3,26 @@ source ~/.bash_profile
 source ~/.bashrc
 
 last3day=`date  +"%Y%m%d" -d -4days`
-
+rec_result_path=./data/rec_result3_${last3day}'*'
+rec_cur_day_item_path=./data/user_cur_day_item_share_filter_${last3day}'*'
 merge_path=./data/merge_score_${last3day}'*'
+user_item_share_filter_path=./data/user_item_share_filter_${last3day}'*'
 video_data_path=./data/video_data_${last3day}'*'
 sorted_path=./data/sorted_data_${last3day}'*'
 cls_path=./data/redis_cls_${last3day}'*'
 hour_video_path=./data/hour_video_data_${last3day}'*'
 sorted_hour_path=./data/sorted_hour_data_${last3day}'*'
 rec_path=./data/rec_result_'*'
+user_cur_day_path=./data/user_cur_day_item_share_${last3day}'*'
+#user_cur_d=./data/user_cur_day_item_share_${last3day}'*'
 echo ${merge_path}
 echo ${video_data_path}
 echo ${cls_path}
 
+rm -rf ${user_cur_day_path}
+rm -rf ${user_item_share_filter_path}
+rm -rf ${rec_cur_day_item_path}
+rm -rf ${rec_result_path}
 rm -rf ${merge_path}
 rm -rf ${video_data_path}
 rm -rf ${sorted_path}

+ 31 - 1
db_help.py

@@ -3,7 +3,7 @@ import redis
 from config import set_config
 config_  = set_config()
 conn_redis = None
-
+import pymysql
 
 class RedisHelper(object):
     def __init__(self):
@@ -291,6 +291,36 @@ class RedisHelper(object):
  
     #def disconnet(self):
     #    conn_redis.disconnect()        
+class MysqlHelper(object):
+    def __init__(self, mysql_info):
+        """
+        初始化mysql连接信息
+        """
+        self.mysql_info = mysql_info
+
+    def get_data(self, sql):
+        """
+        查询数据
+        :param sql: sql语句
+        :return: data
+        """
+        # 连接数据库
+        conn = pymysql.connect(**self.mysql_info)
+        # 创建游标
+        cursor = conn.cursor()
+        try:
+            # 执行SQL语句
+            cursor.execute(sql)
+            # 获取查询的所有记录
+            data = cursor.fetchall()
+        except Exception as e:
+            return None
+        # 关闭游标对象
+        cursor.close()
+        # 关闭数据库连接
+        conn.close()
+        return data
+
 
 if __name__ == '__main__':
     redis_helper = RedisHelper()

+ 54 - 0
extract_cur_share_log.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):    
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_share_log'
+    sql = "select machinecode, shareobjectid from loghubods.user_share_log_per5min where dt between '"+last7day+"' and '"+now_date+"' and topic='share';"
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_cur_day_item_share_"+now_date, sep='\t') 

+ 1 - 1
extract_user_action.py

@@ -48,7 +48,7 @@ if __name__=="__main__":
     now_date=sys.argv[2]
     print("now date:", now_date)
     table = 'user_action_log_base'
-    sql = "select  mid, videoid, businesstype, clienttimestamp, return from loghubods.user_action_log_base where dt between '"+last7day+"' and '"+now_date+"' and businesstype in ('videoShareFriend');"
+    sql = "select  mid, videoid, businesstype, clienttimestamp, return from loghubods.user_action_log_base_addrealplay where dt between '"+last7day+"' and '"+now_date+"' and businesstype in ('videoShareFriend');"
     print(sql)
     data = exe_sql(sql)
     data.to_csv("./data/user_action_"+now_date, sep='\t') 

+ 26 - 4
run.sh

@@ -7,13 +7,31 @@ cd /data/OffLineRec
 
 #1. download data
 nowday=`date  +"%Y%m%d" -d -0days`
-last7day=`date  +"%Y%m%d" -d -8days`
+last7day=`date  +"%Y%m%d" -d -15days`
 echo ${nowday} 
 echo ${last7day}
 mkdir -p ./data/
 mkdir -p ./logs/
 #conda activate py36
 
+nowhour=`date  +"%Y%m%d%H" -d -0days`
+nowstart=$nowday'000000'
+nowhour=${nowhour}'0000'
+#nowhour='20230601140000'
+echo $nowhour
+echo ${nowstart}
+echo ${last7day}
+
+#python extract_cur_share_log.py ${nowstart} ${nowhour}
+#if [ $? -ne 0 ];
+#then
+   # msg = "[ERROR] simrecall extract_share_log"
+   # sh sendmsg.sh  $nowday  $msg
+   # echo "[ERROR] echo 'extract_share_log"
+   # exit 255
+#fi
+
+
 python extract_share_log.py ${last7day} ${nowday}
 if [ $? -ne 0 ];
 then
@@ -23,19 +41,23 @@ then
     exit 255
 fi
 
+python filter_video.py ./data/user_item_share_${nowday} ./data/user_item_share_filter_${nowday}
+
+
+exit
 #nowday='20230505'
 #2. cal i2i result
-python calI2I2.py ${nowday}
+#python calI2I2.py ${nowday}  ${nowhour}
 if [ $? -ne 0 ];
 then
     msg = "[ERROR] simrecall calI2I.py"
-    sh sendmsg.sh $nowday $msg
+    #sh sendmsg.sh $nowday $msg
     echo $msg
     exit -1
 fi
 
 #3.import res
-python import_redist.py "./data/rec_result_"${nowday}".json"  "./data/redis_cls_"${nowday}".json"
+#python import_redist.py "./data/rec_result3_"${nowhour}".json"  "./data/redis_cls_"${nowhour}".json"
 if [ $? -ne 0 ];
 then
     msg = "[ERROR] simhot recall import_redist.py"

+ 2 - 2
run_extract_tag.sh

@@ -11,7 +11,7 @@ echo ${nowday}
 #3.import res
 mkdir -p ./data/
 
-#python extract_user_action.py  ${last7day} ${nowday}
+python extract_user_action.py  ${last7day} ${nowday}
 #if [ $? -ne 0 ];
 #then
 #    msg = "[ERROR] sorted extract_vid_log"
@@ -20,7 +20,7 @@ mkdir -p ./data/
 #    exit 255
 #fi
 
-python extract_video_info.py ${nowday}
+#python extract_video_info.py ${nowday}
 #if [ $? -ne 0 ];
 #then
 #    msg = "[ERROR] cal ctr "

+ 65 - 0
run_hour.sh

@@ -0,0 +1,65 @@
+#!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+conda activate python36 
+
+cd /data/OffLineRec
+
+#1. download data
+nowday=`date  +"%Y%m%d" -d -0days`
+last7day=`date  +"%Y%m%d" -d -15days`
+echo ${nowday} 
+echo ${last7day}
+mkdir -p ./data/
+mkdir -p ./logs/
+#conda activate py36
+
+nowhour=`date  +"%Y%m%d%H" -d -0days`
+nowstart=$nowday'000000'
+nowhour=${nowhour}'0000'
+#nowhour='20230601140000'
+echo $nowhour
+echo ${nowstart}
+echo ${last7day}
+
+python extract_cur_share_log.py ${nowstart} ${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python filter_video.py ./data/user_cur_day_item_share_${nowhour} ./data/user_cur_day_item_share_filter_${nowhour}
+
+#python extract_share_log.py ${last7day} ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+#nowday='20230505'
+#2. cal i2i result
+python calI2I2.py ${nowday}  ${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall calI2I.py"
+    #sh sendmsg.sh $nowday $msg
+    echo $msg
+    exit -1
+fi
+
+#3.import res
+python import_redist.py "./data/rec_result3_"${nowhour}".json"  "./data/redis_cls_"${nowhour}".json"
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simhot recall import_redist.py"
+    sh sendmsg.sh  $nowday  $msg
+    echo $msg
+    exit -1
+fi
+echo 'finish sorted'