zhangbo il y a 3 mois
Parent
commit
23df9c5917

+ 54 - 0
returnv2/01_extract_share_log.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_share_log'
+    sql = "select machinecode, shareid, shareobjectid from loghubods.user_share_log where dt between '"+last7day+"' and '"+now_date+"' and topic='share' "
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_item_share_"+now_date, sep='\t')

+ 54 - 0
returnv2/02_extract_clik_log.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_share_log'
+    sql = "select machinecode, shareid, clickobjectid from loghubods.user_share_log where dt between '"+last7day+"' and '"+now_date+"' and topic='click' "
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_item_click_"+now_date, sep='\t')

+ 60 - 0
returnv2/03_groupItem.py

@@ -0,0 +1,60 @@
+#coding utf-8
+import sys
+from operator import itemgetter
+import json
+
+if __name__=="__main__":
+    #1.load data
+    nowdate=sys.argv[1]
+    f1 = open("./data/user_item_share_"+nowdate)
+    user_share_item_dict={}
+    item_dict = {}
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<4:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        vid = items[3]
+        user_share_item_dict[shareid] = (uid,vid)
+    f1.close()
+    print(len(user_share_item_dict))
+    f2 = open("./data/user_item_click_"+nowdate)
+    #user_group_dict={}
+    item_group_dict = {}
+    item_dict = {}
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        #vid = items[3]
+        sim_user_set = set('')
+        if shareid in user_share_item_dict: # 是2日内分享的回流
+            kuid, kvid = user_share_item_dict[shareid]
+            key_info = kvid
+            if key_info in item_group_dict:
+                sim_user_set = item_group_dict[key_info]
+                sim_user_set.add(uid)
+                item_group_dict[key_info] =  sim_user_set
+            else:
+                sim_user_set.add(uid)
+                item_group_dict[key_info] =  sim_user_set
+    print(len(item_group_dict))
+    f2.close()
+    f3 = open("./data/return_item_"+nowdate, 'w')
+    for k, v in item_group_dict.items():
+        f3.write(k+"\t"+json.dumps(list(v))+"\n")
+    f3.close()
+    #((user,item), score)
+    #print(user_item_dict)
+    #2. (uid, [(vid, score)....])

+ 54 - 0
returnv2/04_extract_user_exp_video.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_action_log_base'
+    sql = "select mid, videoid  from loghubods.user_action_log_base_addrealplay where dt between '"+last7day+"' and '"+now_date+"' and businesstype='videoView' "
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_video_exp_"+now_date, sep='\t')

+ 49 - 0
returnv2/05_getI2I.py

@@ -0,0 +1,49 @@
+#coding utf-8
+import sys
+import json
+
+if __name__=="__main__":
+    f = open(sys.argv[1]) # 曝光日志
+    user_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        items = line.split("\t")
+        if len(items)<3:
+            continue
+        uid = items[1]
+        vid = items[2]
+        vid_set = set('')
+        if uid in user_dict:
+            vid_set = user_dict[uid]
+        vid_set.add(vid)
+        user_dict[uid] = vid_set
+    f.close()
+    f1 = open(sys.argv[2]) # 回流日志
+    f2 = open(sys.argv[3], 'w')
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.split("\t")
+        if len(items)<2:
+            continue
+        vid = items[0]
+        uid_list = json.loads(items[1])
+        if len(uid_list)<10: # 回流小于10个人的视频 被过滤了
+            continue
+        item_dict = {}
+        for uid in uid_list:
+            if uid in user_dict:
+                item_list = user_dict[uid]
+                for item in item_list:
+                    item = item.strip()
+                    if item in item_dict:
+                         item_dict[item] = item_dict[item]+1
+                    else:
+                         item_dict[item]  = 1
+        item_list= sorted(item_dict.items(), key=lambda s:s[1], reverse=True)
+        f2.write(vid+"\t"+json.dumps(item_list[:100])+"\t"+items[1]+"\n")
+    f1.close()
+    f2.close()

+ 61 - 0
returnv2/06_calReturn.py

@@ -0,0 +1,61 @@
+#coding utf-8
+import sys
+from operator import itemgetter
+import json
+
+if __name__=="__main__":
+    #1.load data
+    nowdate=sys.argv[1]
+    f1 = open("./data/user_item_share_"+nowdate)
+    user_share_item_dict={}
+    user_shareid_dict = {}
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<4:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        vid = items[3]
+        k_info = uid+"\t"+vid
+        user_share_item_dict[k_info] = shareid
+        user_shareid_dict[shareid] = (uid,vid)
+    f1.close()
+    print(len(user_share_item_dict))
+    f2 = open("./data/user_item_click_"+nowdate)
+    #user_group_dict={}
+    share_return_dict = {}
+    #item_dict = {}
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        #vid = items[3]
+        if shareid not in share_return_dict:
+            share_return_dict[shareid] = 1
+        else:
+            share_return_dict[shareid] += 1
+    f2.close()
+    user_item_return = {}
+    for  k, v in user_share_item_dict.items():
+         if v in share_return_dict:
+             if k not in user_item_return:
+                 user_item_return[k] = share_return_dict[v]
+             else:
+                 user_item_return[k] += share_return_dict[v]
+    f3 = open("./data/user_item_return_count_"+nowdate, 'w')
+    for k, v in user_item_return.items():
+        f3.write(k+"\t"+str(v)+"\n")
+    f3.close()
+    #((user,item), score)
+    #print(user_item_dict)
+    #2. (uid, [(vid, score)....])

+ 102 - 0
returnv2/07_getI2ICTRGroup.py

@@ -0,0 +1,102 @@
+#coding utf-8
+import sys
+import json
+import math
+from operator import attrgetter
+
+if __name__=="__main__":
+    f = open(sys.argv[1])
+    exp_item_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mid = items[1]
+        vid = items[2]
+        key_info = mid+"\t"+vid
+        if key_info in exp_item_dict:
+            exp_item_dict[key_info]+=1
+        else:
+            exp_item_dict[key_info] = 1
+    f.close()
+    return_item_dict = {}
+    f1 = open(sys.argv[2])
+    #f2 = open(sys.argv[3], 'w')
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        uid = items[0]
+        vid = items[1]
+        return_count = items[2]
+        key_info = uid+"\t"+vid
+        if key_info not in return_item_dict:
+            return_item_dict[key_info] = 1
+        else:
+            return_item_dict[key_info]+=1
+    f1.close()
+    f2 = open(sys.argv[3])
+    f3 = open(sys.argv[4], 'w')
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mvid = items[0]
+        vid_list = json.loads(items[1])
+        user_list = json.loads(items[2])
+        rec_item_dict = {}
+        #rec_item_list = []
+        for vid_info in vid_list:
+            rec_vid = vid_info[0]
+            if rec_vid == mvid:
+                continue
+            score1 = float(vid_info[1])
+            rec_k_exp = 0
+            rec_k_return = 0
+            for uid in user_list:
+                rec_k = uid+"\t"+rec_vid
+                if rec_k in exp_item_dict:
+                     rec_k_exp +=exp_item_dict[rec_k]
+                if rec_k in return_item_dict:
+                     rec_k_return += return_item_dict[rec_k]
+            score2 = 0.0000000000001
+            if rec_k_exp>0:
+                score2 = float(rec_k_return)/float(rec_k_exp)
+            if score1 == 0:
+                score1 = 0.000000000000001
+            else:
+                score1 = math.log(score1)
+            #if rec_vid in item_dict:
+            # 	score2 = float(item_dict[rec_vid][0])
+            return_score = 0.000000000000001
+            #print("rec_k_return:", rec_k_return)
+            if rec_k_return>0:
+                return_score=math.log(rec_k_return+1)
+            score = score1*score2
+            #score = return_score*score2
+            if score<=0.0:
+                continue
+            #rec_item_list = []
+            rec_vid = int(rec_vid)
+            if rec_vid not in rec_item_dict:
+                #rec_item_list.append((rec_vid,score))
+                rec_item_dict[rec_vid] = score
+            else:
+                rec_item_dict[rec_vid] += score
+        rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1], reverse=True)
+        #print(rec_item_list2)
+        #rec_list = list(rec_item_dict.values())
+        #for k, v in rec_item_dict.items():
+        #rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1][1], reverse=True)
+        f3.write(str(mvid)+"\t"+json.dumps(rec_item_list2[:100])+"\n")
+    #f1.close()
+    f3.close()

+ 35 - 0
returnv2/08_import_redist.py

@@ -0,0 +1,35 @@
+#coding utf-8
+import sys
+import json
+from db_help import RedisHelper
+
+if __name__=="__main__":
+     f2 = open(sys.argv[2], 'w')
+     import_data_dict = {}
+     with  open(sys.argv[1]) as f:
+         while True:
+             line = f.readline()
+             if not line:
+                 break
+             items = line.strip().split("\t")
+             if len(items)<2:
+                 continue
+             vid = items[0]
+             key = "rv:"+vid
+             rec_list = json.loads(items[1])
+             if len(rec_list)==0:
+                 continue
+             rec_item_list = []
+             for rec_item in rec_list:
+                 rec_item_list.append((rec_item[0], round(rec_item[1],3)))
+             res_info = json.dumps(rec_item_list[:10])
+             f2.write(key+"\t"+res_info+"\n")
+             #key="rv:"+str(line[0])
+             import_data_dict[key] = res_info
+     redis_helper = RedisHelper()
+     redis_helper.update_batch_setnx_key(import_data_dict, 60*60*24*7)
+         #con = redis_helper.connect()
+     res = redis_helper.get_data_from_redis("rv:14737974")
+     print(res)
+     f2.close()
+     f.close()

+ 102 - 0
returnv2/09_getI2ICTRGroupV2.py

@@ -0,0 +1,102 @@
+#coding utf-8
+import sys
+import json
+import math
+from operator import attrgetter
+
+if __name__=="__main__":
+    f = open(sys.argv[1])
+    exp_item_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mid = items[1]
+        vid = items[2]
+        key_info = mid+"\t"+vid
+        if key_info in exp_item_dict:
+            exp_item_dict[key_info]+=1
+        else:
+            exp_item_dict[key_info] = 1
+    f.close()
+    return_item_dict = {}
+    f1 = open(sys.argv[2])
+    #f2 = open(sys.argv[3], 'w')
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        uid = items[0]
+        vid = items[1]
+        return_count = items[2]
+        key_info = uid+"\t"+vid
+        if key_info not in return_item_dict:
+            return_item_dict[key_info] = 1
+        else:
+            return_item_dict[key_info]+=1
+    f1.close()
+    f2 = open(sys.argv[3])
+    f3 = open(sys.argv[4], 'w')
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mvid = items[0]
+        vid_list = json.loads(items[1])
+        user_list = json.loads(items[2])
+        rec_item_dict = {}
+        #rec_item_list = []
+        for vid_info in vid_list:
+            rec_vid = vid_info[0]
+            if rec_vid == mvid:
+                continue
+            score1 = float(vid_info[1])
+            rec_k_exp = 0
+            rec_k_return = 0
+            for uid in user_list:
+                rec_k = uid+"\t"+rec_vid
+                if rec_k in exp_item_dict:
+                     rec_k_exp +=exp_item_dict[rec_k]
+                if rec_k in return_item_dict:
+                     rec_k_return += return_item_dict[rec_k]
+            score2 = 0.0000000000001
+            if rec_k_exp>0:
+                score2 = float(rec_k_return)/float(rec_k_exp)
+            if score1 == 0:
+                score1 = 0.000000000000001
+            else:
+                score1 = math.log(score1)
+            #if rec_vid in item_dict:
+            # 	score2 = float(item_dict[rec_vid][0])
+            return_score = 0.000000000000001
+            #print("rec_k_return:", rec_k_return)
+            if rec_k_return>0:
+                return_score=math.log(rec_k_return+1)
+            #score = score1*score2
+            score = return_score*score2
+            if score<=0.0:
+                continue
+            #rec_item_list = []
+            rec_vid = int(rec_vid)
+            if rec_vid not in rec_item_dict:
+                #rec_item_list.append((rec_vid,score))
+                rec_item_dict[rec_vid] = score
+            else:
+                rec_item_dict[rec_vid] += score
+        rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1], reverse=True)
+        #print(rec_item_list2)
+        #rec_list = list(rec_item_dict.values())
+        #for k, v in rec_item_dict.items():
+        #rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1][1], reverse=True)
+        f3.write(str(mvid)+"\t"+json.dumps(rec_item_list2[:100])+"\n")
+    #f1.close()
+    f3.close()

+ 35 - 0
returnv2/10_import_redist_v2.py

@@ -0,0 +1,35 @@
+#coding utf-8
+import sys
+import json
+from db_help import RedisHelper
+
+if __name__=="__main__":
+     f2 = open(sys.argv[2], 'w')
+     import_data_dict = {}
+     with  open(sys.argv[1]) as f:
+         while True:
+             line = f.readline()
+             if not line:
+                 break
+             items = line.strip().split("\t")
+             if len(items)<2:
+                 continue
+             vid = items[0]
+             key = "rv2:"+vid
+             rec_list = json.loads(items[1])
+             if len(rec_list)==0:
+                 continue
+             rec_item_list = []
+             for rec_item in rec_list:
+                 rec_item_list.append((rec_item[0], round(rec_item[1],3)))
+             res_info = json.dumps(rec_item_list[:10])
+             f2.write(key+"\t"+res_info+"\n")
+             #key="rv:"+str(line[0])
+             import_data_dict[key] = res_info
+     redis_helper = RedisHelper()
+     redis_helper.update_batch_setnx_key(import_data_dict, 60*60*24*7)
+         #con = redis_helper.connect()
+     res = redis_helper.get_data_from_redis("rv2:14737974")
+     print(res)
+     f2.close()
+     f.close()

+ 54 - 0
returnv2/11_extract_cur_share_log.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_share_log'
+    sql = "select machinecode, shareid, shareobjectid  from loghubods.user_share_log_per5min where dt between '"+last7day+"0000' and '"+now_date+"0000' and topic='share';"
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_item_share_hour_"+now_date, sep='\t')

+ 54 - 0
returnv2/12_extract_cur_clik_log.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_share_log'
+    sql = "select machinecode, shareid, clickobjectid from loghubods.user_share_log_per5min  where dt between '"+last7day+"0000' and '"+now_date+"0000' and topic='click' "
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_item_click_hour_"+now_date, sep='\t')

+ 60 - 0
returnv2/13_groupItemHour.py

@@ -0,0 +1,60 @@
+#coding utf-8
+import sys
+from operator import itemgetter
+import json
+
+if __name__=="__main__":
+    #1.load data
+    nowdate=sys.argv[1]
+    f1 = open("./data/user_item_share_hour_"+nowdate)
+    user_share_item_dict={}
+    item_dict = {}
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<4:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        vid = items[3]
+        user_share_item_dict[shareid] = (uid,vid)
+    f1.close()
+    print(len(user_share_item_dict))
+    f2 = open("./data/user_item_click_hour_"+nowdate)
+    #user_group_dict={}
+    item_group_dict = {}
+    item_dict = {}
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        #vid = items[3]
+        sim_user_set = set('')
+        if shareid in user_share_item_dict:
+            kuid, kvid = user_share_item_dict[shareid]
+            key_info = kvid
+            if key_info in item_group_dict:
+                sim_user_set = item_group_dict[key_info]
+                sim_user_set.add(uid)
+                item_group_dict[key_info] =  sim_user_set
+            else:
+                sim_user_set.add(uid)
+                item_group_dict[key_info] =  sim_user_set
+    print(len(item_group_dict))
+    f2.close()
+    f3 = open("./data/return_item_hour_"+nowdate, 'w')
+    for k, v in item_group_dict.items():
+        f3.write(k+"\t"+json.dumps(list(v))+"\n")
+    f3.close()
+    #((user,item), score)
+    #print(user_item_dict)
+    #2. (uid, [(vid, score)....])

+ 66 - 0
returnv2/14_extract_user_exp_hour_video.py

@@ -0,0 +1,66 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+import time
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+def timestamp(shijian):
+    s_t=time.strptime(shijian,"%Y%m%d%H%M%S")
+    print(s_t)
+    mkt=int(time.mktime(s_t))
+    return(mkt)
+
+if __name__=="__main__":
+    project = 'loghubods'
+    #last7day=sys.argv[1]
+    now_date=sys.argv[1]
+    print("now date:", now_date)
+    table = 'user_action_log_base'
+    start_time = sys.argv[2]
+    end_time = sys.argv[3]
+    start_timestamp = timestamp(start_time+"00")
+    end_timestamp =  timestamp(end_time+"00")
+    #print(start_timestamp, end_timestamp)
+    #return 1
+    sql = "select mid, videoid from loghubods.video_action_log_applet where dt ='"+now_date+"' and log_time>='"+str(start_timestamp)+"' and log_time<="+str(end_timestamp)+"  and businesstype='videoView' "
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_video_exp_hour_"+end_time, sep='\t')

+ 49 - 0
returnv2/15_getI2IHour.py

@@ -0,0 +1,49 @@
+#coding utf-8
+import sys
+import json
+
+if __name__=="__main__":
+    f = open(sys.argv[1])
+    user_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        items = line.split("\t")
+        if len(items)<3:
+            continue
+        uid = items[1]
+        vid = items[2]
+        vid_set = set('')
+        if uid in user_dict:
+            vid_set = user_dict[uid]
+        vid_set.add(vid)
+        user_dict[uid] = vid_set
+    f.close()
+    f1 = open(sys.argv[2])
+    f2 = open(sys.argv[3], 'w')
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.split("\t")
+        if len(items)<2:
+            continue
+        vid = items[0]
+        uid_list = json.loads(items[1])
+        if len(uid_list)<5: # 小于5个mid的视频过滤掉
+            continue
+        item_dict = {}
+        for uid in uid_list:
+            if uid in user_dict:
+                item_list = user_dict[uid] # 这个人曝光的视频
+                for item in item_list:
+                    item = item.strip()
+                    if item in item_dict:
+                         item_dict[item] = item_dict[item]+1
+                    else:
+                         item_dict[item]  = 1
+        item_list= sorted(item_dict.items(), key=lambda s:s[1], reverse=True)
+        f2.write(vid+"\t"+json.dumps(item_list[:100])+"\t"+items[1]+"\n")
+    f1.close()
+    f2.close()

+ 61 - 0
returnv2/16_calReturnHour.py

@@ -0,0 +1,61 @@
+#coding utf-8
+import sys
+from operator import itemgetter
+import json
+
+if __name__=="__main__":
+    #1.load data
+    nowdate=sys.argv[1]
+    f1 = open("./data/user_item_share_hour_"+nowdate)
+    user_share_item_dict={}
+    user_shareid_dict = {}
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<4:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        vid = items[3]
+        k_info = uid+"\t"+vid
+        user_share_item_dict[k_info] = shareid
+        user_shareid_dict[shareid] = (uid,vid)
+    f1.close()
+    print(len(user_share_item_dict))
+    f2 = open("./data/user_item_click_hour_"+nowdate)
+    #user_group_dict={}
+    share_return_dict = {}
+    #item_dict = {}
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        #(user, share)
+        uid = items[1]
+        shareid = items[2]
+        #vid = items[3]
+        if shareid not in share_return_dict:
+            share_return_dict[shareid] = 1
+        else:
+            share_return_dict[shareid] += 1
+    f2.close()
+    user_item_return = {}
+    for  k, v in user_share_item_dict.items(): # 遍历mid vid 的shareid
+         if v in share_return_dict: # 如果shareid有回流
+             if k not in user_item_return:
+                 user_item_return[k] = share_return_dict[v] # uid+vid 带回来多少回流
+             else:
+                 user_item_return[k] += share_return_dict[v]
+    f3 = open("./data/user_item_return_hour_count_"+nowdate, 'w')
+    for k, v in user_item_return.items():
+        f3.write(k+"\t"+str(v)+"\n")
+    f3.close()
+    #((user,item), score)
+    #print(user_item_dict)
+    #2. (uid, [(vid, score)....])

+ 102 - 0
returnv2/17_getI2ICTRGroup.py

@@ -0,0 +1,102 @@
+#coding utf-8
+import sys
+import json
+import math
+from operator import attrgetter
+
+if __name__=="__main__":
+    f = open(sys.argv[1])
+    exp_item_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mid = items[1]
+        vid = items[2]
+        key_info = mid+"\t"+vid
+        if key_info in exp_item_dict:
+            exp_item_dict[key_info]+=1
+        else:
+            exp_item_dict[key_info] = 1
+    f.close()
+    return_item_dict = {}
+    f1 = open(sys.argv[2])
+    #f2 = open(sys.argv[3], 'w')
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        uid = items[0]
+        vid = items[1]
+        return_count = items[2]
+        key_info = uid+"\t"+vid
+        if key_info not in return_item_dict:
+            return_item_dict[key_info] = 1
+        else:
+            return_item_dict[key_info]+=1
+    f1.close()
+    f2 = open(sys.argv[3])
+    f3 = open(sys.argv[4], 'w')
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mvid = items[0]
+        vid_list = json.loads(items[1])
+        user_list = json.loads(items[2])
+        rec_item_dict = {}
+        #rec_item_list = []
+        for vid_info in vid_list:
+            rec_vid = vid_info[0]
+            if rec_vid == mvid:
+                continue
+            score1 = float(vid_info[1])
+            rec_k_exp = 0
+            rec_k_return = 0
+            for uid in user_list:
+                rec_k = uid+"\t"+rec_vid
+                if rec_k in exp_item_dict:
+                     rec_k_exp +=exp_item_dict[rec_k]
+                if rec_k in return_item_dict:
+                     rec_k_return += return_item_dict[rec_k]
+            score2 = 0.0000000000001
+            if rec_k_exp>0:
+                score2 = float(rec_k_return)/float(rec_k_exp)
+            if score1 == 0:
+                score1 = 0.000000000000001
+            else:
+                score1 = math.log(score1)
+            #if rec_vid in item_dict:
+            # 	score2 = float(item_dict[rec_vid][0])
+            return_score = 0.000000000000001
+            #print("rec_k_return:", rec_k_return)
+            if rec_k_return>0:
+                return_score=math.log(rec_k_return+1)
+            score = score1*score2
+            #score = return_score*score2
+            if score<=0.0:
+                continue
+            #rec_item_list = []
+            rec_vid = int(rec_vid)
+            if rec_vid not in rec_item_dict:
+                #rec_item_list.append((rec_vid,score))
+                rec_item_dict[rec_vid] = score
+            else:
+                rec_item_dict[rec_vid] += score
+        rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1], reverse=True)
+        #print(rec_item_list2)
+        #rec_list = list(rec_item_dict.values())
+        #for k, v in rec_item_dict.items():
+        #rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1][1], reverse=True)
+        f3.write(str(mvid)+"\t"+json.dumps(rec_item_list2[:100])+"\n")
+    #f1.close()
+    f3.close()

+ 35 - 0
returnv2/18_import_redist.py

@@ -0,0 +1,35 @@
+#coding utf-8
+import sys
+import json
+from db_help import RedisHelper
+
+if __name__=="__main__":
+     f2 = open(sys.argv[2], 'w')
+     import_data_dict = {}
+     with  open(sys.argv[1]) as f:
+         while True:
+             line = f.readline()
+             if not line:
+                 break
+             items = line.strip().split("\t")
+             if len(items)<2:
+                 continue
+             vid = items[0]
+             key = "rv:"+vid
+             rec_list = json.loads(items[1])
+             if len(rec_list)==0:
+                 continue
+             rec_item_list = []
+             for rec_item in rec_list:
+                 rec_item_list.append((rec_item[0], round(rec_item[1],3)))
+             res_info = json.dumps(rec_item_list[:10])
+             f2.write(key+"\t"+res_info+"\n")
+             #key="rv:"+str(line[0])
+             import_data_dict[key] = res_info
+     redis_helper = RedisHelper()
+     redis_helper.update_batch_setnx_key(import_data_dict, 60*60*24*7)
+         #con = redis_helper.connect()
+     res = redis_helper.get_data_from_redis("rv:14737974")
+     print(res)
+     f2.close()
+     f.close()

+ 102 - 0
returnv2/19_getI2ICTRGroupV2.py

@@ -0,0 +1,102 @@
+#coding utf-8
+import sys
+import json
+import math
+from operator import attrgetter
+
+if __name__=="__main__":
+    f = open(sys.argv[1])
+    exp_item_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mid = items[1]
+        vid = items[2]
+        key_info = mid+"\t"+vid
+        if key_info in exp_item_dict:
+            exp_item_dict[key_info]+=1   # 曝光一行是一条记录,求曝光和
+        else:
+            exp_item_dict[key_info] = 1
+    f.close()
+    return_item_dict = {}
+    f1 = open(sys.argv[2])
+    #f2 = open(sys.argv[3], 'w')
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        uid = items[0]
+        vid = items[1]
+        return_count = items[2]
+        key_info = uid+"\t"+vid
+        if key_info not in return_item_dict:
+            return_item_dict[key_info] = 1   # 回流只计数 没有用回流量 = 有多少次分享带回来了回流
+        else:
+            return_item_dict[key_info]+=1
+    f1.close()
+    f2 = open(sys.argv[3])
+    f3 = open(sys.argv[4], 'w')
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<3:
+            continue
+        mvid = items[0]
+        vid_list = json.loads(items[1])
+        user_list = json.loads(items[2])
+        rec_item_dict = {}
+        #rec_item_list = []
+        for vid_info in vid_list:
+            rec_vid = vid_info[0]
+            if rec_vid == mvid:
+                continue
+            score1 = float(vid_info[1])
+            rec_k_exp = 0
+            rec_k_return = 0
+            for uid in user_list:
+                rec_k = uid+"\t"+rec_vid
+                if rec_k in exp_item_dict:
+                     rec_k_exp +=exp_item_dict[rec_k]
+                if rec_k in return_item_dict:
+                     rec_k_return += return_item_dict[rec_k]
+            score2 = 0.0000000000001
+            if rec_k_exp>0:
+                score2 = float(rec_k_return)/float(rec_k_exp)
+            if score1 == 0:
+                score1 = 0.000000000000001
+            else:
+                score1 = math.log(score1)
+            #if rec_vid in item_dict:
+            # 	score2 = float(item_dict[rec_vid][0])
+            return_score = 0.000000000000001
+            #print("rec_k_return:", rec_k_return)
+            if rec_k_return>0:
+                return_score=math.log(rec_k_return+1)
+            #score = score1*score2
+            score = return_score*score2
+            if score<=0.0:
+                continue
+            #rec_item_list = []
+            rec_vid = int(rec_vid)
+            if rec_vid not in rec_item_dict:
+                #rec_item_list.append((rec_vid,score))
+                rec_item_dict[rec_vid] = score
+            else:
+                rec_item_dict[rec_vid] += score
+        rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1], reverse=True)
+        #print(rec_item_list2)
+        #rec_list = list(rec_item_dict.values())
+        #for k, v in rec_item_dict.items():
+        #rec_item_list2= sorted(rec_item_dict.items(), key=lambda s:s[1][1], reverse=True)
+        f3.write(str(mvid)+"\t"+json.dumps(rec_item_list2[:100])+"\n")
+    #f1.close()
+    f3.close()

+ 35 - 0
returnv2/20_import_redist_v2.py

@@ -0,0 +1,35 @@
+#coding utf-8
+import sys
+import json
+from db_help import RedisHelper
+
+if __name__=="__main__":
+     f2 = open(sys.argv[2], 'w')
+     import_data_dict = {}
+     with  open(sys.argv[1]) as f:
+         while True:
+             line = f.readline()
+             if not line:
+                 break
+             items = line.strip().split("\t")
+             if len(items)<2:
+                 continue
+             vid = items[0]
+             key = "rv2:"+vid
+             rec_list = json.loads(items[1])
+             if len(rec_list)==0:
+                 continue
+             rec_item_list = []
+             for rec_item in rec_list:
+                 rec_item_list.append((rec_item[0], round(rec_item[1],3)))
+             res_info = json.dumps(rec_item_list[:10])
+             f2.write(key+"\t"+res_info+"\n")
+             #key="rv:"+str(line[0])
+             import_data_dict[key] = res_info
+     redis_helper = RedisHelper()
+     redis_helper.update_batch_setnx_key(import_data_dict, 60*60*24*7)
+         #con = redis_helper.connect()
+     res = redis_helper.get_data_from_redis("rv2:14737974")
+     print(res)
+     f2.close()
+     f.close()

+ 108 - 0
returnv2/run.sh

@@ -0,0 +1,108 @@
+#!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+conda activate tf2
+
+cd /data/UserReturn
+
+#1. download data
+nowday=`date  +"%Y%m%d" -d -0days`
+last7day=`date  +"%Y%m%d" -d -1days`
+nowhour=`date  +"%Y%m%d%H" -d -1hours`
+echo ${nowday}
+echo ${last7day}
+mkdir -p ./data/
+mkdir -p ./logs/
+#conda activate py36
+
+python extract_share_log.py ${last7day} ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python extract_clik_log.py ${last7day} ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python groupItem.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python extract_user_exp_video.py ${last7day} ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+
+fi
+
+python getI2I.py ./data/user_video_exp_${nowday}  ./data/return_item_${nowday} ./data/i2i_${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python calReturn.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+
+
+python getI2ICTRGroup.py ./data/user_video_exp_${nowday} ./data/user_item_return_count_${nowday} ./data/i2i_${nowday}  ./data/i2i_group_ctr_${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+
+fi
+
+python import_redist.py  ./data/i2i_group_ctr_${nowday} ./data/return_video_rec_${nowday}
+
+
+python  getI2ICTRGroupV2.py ./data/user_video_exp_${nowday} ./data/user_item_return_count_${nowday} ./data/i2i_${nowday}  ./data/i2i_group_ctr_v2_${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+
+fi
+
+python import_redist_v2.py  ./data/i2i_group_ctr_v2_${nowday} ./data/return_video_rec_v2_${nowday}
+
+
+day="$(date -d '40 days ago' +%Y-%m-%d)"
+pattern="/data/UserReturn/data/*${day}*"
+if compgen -G "$pattern" > /dev/null; then
+    find "$base_path" -name "${day}*" -exec rm -rf {} \;
+    echo "delete done"
+else
+    echo "no data"
+fi

+ 112 - 0
returnv2/run_hour.sh

@@ -0,0 +1,112 @@
+#!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+conda activate tf2
+
+cd /data/UserReturn
+
+#1. download data
+nowday=`date  +"%Y%m%d" -d -0days`
+#last7day=`date  +"%Y%m%d" -d -1days`
+nowstart=`date  +"%Y%m%d%H" -d -1hours`
+nowhour=`date  +"%Y%m%d%H" -d -0hours`
+#nowhour=`date  +"%Y%m%d%H" -d -0days
+#nowstart=$nowstart'0000'
+#nowhour=${nowhour}'0000'
+
+echo ${nowstart}
+echo ${nowhour}
+
+
+echo ${nowday}
+echo ${last7day}
+mkdir -p ./data/
+mkdir -p ./logs/
+#conda activate py36
+#nowhour=20230626160000
+
+python extract_cur_share_log.py  ${nowstart} ${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+echo ${nowstart}
+echo ${nowhour}
+python extract_cur_clik_log.py  ${nowstart} ${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python groupItemHour.py ${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+python extract_user_exp_hour_video.py ${nowday} ${nowstart} ${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+
+fi
+
+python getI2IHour.py ./data/user_video_exp_hour_${nowhour}  ./data/return_item_hour_${nowhour} ./data/i2i_hour_${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+fi
+
+
+python calReturnHour.py ${nowhour}
+#python getI2ICTR.py ../CtrProject/data/merge_4_days_score_${nowhour} ./data/i2i_${nowday} ./data/i2i_ctr_${nowday}
+
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] simrecall extract_share_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+
+fi
+python getI2ICTRGroup.py ./data/user_video_exp_hour_${nowhour} ./data/user_item_return_hour_count_${nowhour} ./data/i2i_hour_${nowhour}  ./data/i2i_hour_group_ctr_${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] getI2ICTRGroup"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_share_log"
+    exit 255
+
+fi
+
+python import_redist.py  ./data/i2i_hour_group_ctr_${nowhour} ./data/return_video_rec_${nowhour}
+
+
+
+python getI2ICTRGroupV2.py ./data/user_video_exp_hour_${nowhour} ./data/user_item_return_hour_count_${nowhour} ./data/i2i_hour_${nowhour}  ./data/i2i_hour_group_ctr_v2_${nowhour}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] getI2ICTRGroupV2"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'getI2ICTRGroupV2' "
+    exit 255
+
+fi
+
+python import_redist_v2.py  ./data/i2i_hour_group_ctr_v2_${nowhour} ./data/return_video_rec_v2_${nowhour}