Selaa lähdekoodia

fix 归集队列 bug

DevYK 3 vuotta sitten
vanhempi
commit
2b1644f5ed
7 muutettua tiedostoa jossa 134 lisäystä ja 78 poistoa
  1. 11 3
      config/dev_config.js
  2. 1 1
      config/prd_config.js
  3. 0 1
      config/test_config.js
  4. 88 52
      model/moralis_sdk.js
  5. 25 16
      model/redis_db.js
  6. 6 2
      test/db_test.js
  7. 3 3
      test/test.js

+ 11 - 3
config/dev_config.js

@@ -35,11 +35,19 @@ const db_config = {
         PORT: '3306', //连接的端口
         HOST: '127.0.0.1' //host
     },
+    
+    // redis: {
+    //     PORT: 6379, // Redis port
+    //     HOST: "r-bp1ps6my7lzg8rdhwxpi.redis.rds.aliyuncs.com", // Redis host
+    //     USERNAME: null,
+    //     PASSWORD: "Wqsd@2019"
+    // },
+
     redis: {
         PORT: 6379, // Redis port
-        HOST: "r-bp1ps6my7lzg8rdhwxpi.redis.rds.aliyuncs.com", // Redis host
-        USERNAME: null,
-        PASSWORD: "Wqsd@2019"
+        HOST: "denet-test.y2slbl.clustercfg.memorydb.us-east-1.amazonaws.com", // Redis host
+        USERNAME: null, // needs Redis >= 6
+        PASSWORD: null,
     },
 }
 

+ 1 - 1
config/prd_config.js

@@ -7,7 +7,7 @@ const account_config = {
     WELLET_PUBLIC_KEY: '0xAD48D13E77011cFE03fF19729B6A247847AfD28E',
     TOKEN_GAS_LIMIT: '90000',
     BNB_GAS_LIMIT: '21000',
-    BNB_GAS_PRICE: '10000000000',
+    BNB_GAS_PRICE: '6000000000',
     TRANSFER_GAS:true,
 }
 

+ 0 - 1
config/test_config.js

@@ -33,7 +33,6 @@ const db_config = {
         PORT: '3306', //连接的端口
         HOST: 'denet-test.csi2lctklqzg.us-east-1.rds.amazonaws.com' //host
     },
-
     redis: {
         PORT: 6379, // Redis port
         HOST: "denet-test.y2slbl.clustercfg.memorydb.us-east-1.amazonaws.com", // Redis host

+ 88 - 52
model/moralis_sdk.js

@@ -112,15 +112,21 @@ async function getAccountBalances(options) {
         options.chain = utils.getChainName(options.chain)
     }
     logger.log('getAccountBalances :', options)
-    if (options.type == 'native') {
-        var opt_ret = await Moralis.Web3API.account.getNativeBalance(options);
-        logger.log('getNativeBalance=', opt_ret);
-        return opt_ret
-    } else {
-        var aar = await Moralis.Web3API.account.getTokenBalances(options);
-        logger.log('getTokenBalances=', aar);
-        return aar;
+    try {
+        if (options.type == 'native') {
+            var opt_ret = await Moralis.Web3API.account.getNativeBalance(options);
+            logger.log('getNativeBalance=', opt_ret);
+            return opt_ret
+        } else {
+            var aar = await Moralis.Web3API.account.getTokenBalances(options);
+            logger.log('getTokenBalances=', aar);
+            return aar;
+        }
+    } catch (error) {
+        logger.log('getAccountBalances error:', error)
+        return null
     }
+
 }
 
 /**
@@ -171,11 +177,15 @@ async function computeTransferGasFree(my_account_all_coins, tokenPrices) {
         my_account_all_coins.other.forEach(element => {
             logger.debug('20 element=', element);
             var find_transfer_item = findTokenPriceItem(element.token_address, tokenPrices);
-            var total_all_usdprice = calculate_total_usdprice(element.balance, element.decimals, find_transfer_item.usdPrice);
-            logger.debug('findTokenPriceItem ret=', element.token_address, find_transfer_item, total_all_usdprice);
-            if (find_transfer_item && total_all_usdprice > 1.0) {
-                tokenCount += 1;
-                logger.debug('token > 1.0', tokenCount, element.token_address);
+            if (find_transfer_item) {
+                var total_all_usdprice = calculate_total_usdprice(element.balance, element.decimals, find_transfer_item.usdPrice);
+                logger.debug('findTokenPriceItem ret=', element.token_address, find_transfer_item, total_all_usdprice);
+                if (find_transfer_item && total_all_usdprice > 1.0) {
+                    tokenCount += 1;
+                    logger.debug('token > 1.0', tokenCount, element.token_address);
+                }
+            } else {
+                logger.log('findTokenPriceItem error=', element);
             }
         });
         logger.log('account_config.TOKEN_GAS_LIMIT=', account_config.TOKEN_GAS_LIMIT);
@@ -316,7 +326,7 @@ async function transfers(obj, my_account_all_coins) {
                 var find_transfer_item = findTokenPriceItem(transfer_item.token_address, tokenPrices);
                 //todo 计算 token 币价格 * token美元单价 
                 if (find_transfer_item && calculate_total_usdprice(transfer_item.balance, transfer_item.decimals, find_transfer_item.usdPrice) > 1.0) {
-                    var info =await queryCompanyInfoFromId(0);
+                    var info = await queryCompanyInfoFromId(0);
                     var obj_20 = {
                         chain: obj.chain,
                         contractAddress: transfer_item.token_address,
@@ -341,10 +351,12 @@ async function transfers(obj, my_account_all_coins) {
                         my_account_all_coins.native.balance = tempNativeValue.toString();
                         logger.log('start_collectCoins 20  udpateNativeValue=', tempNativeValue);
                     } else return "get native value error."
+                } else {
+                    logger.error('find_transfer_item error.', transfer_item);
                 }
             } else {
                 logger.error('token Must be greater than  a dollar.', transfer_item.balance, transfer_item.decimals);
-                return toJson(-1, null, 'token Must be greater than  a dollar.');
+                // return toJson(-1, null, 'token Must be greater than  a dollar.');
             }
         }
     }
@@ -359,38 +371,45 @@ async function transfers(obj, my_account_all_coins) {
         obj.chain = chain;
         logger.log('查询本地余额 after', my_account_all_coins.native)
         var find_native_item = findTokenPriceItem('0x0000000000000000000000000000000000000000', tokenPrices);
-        var nativeCoins = calculate_total_usdprice(my_account_all_coins.native.balance, '18', find_native_item.usdPrice);
-        logger.log('start_collectCoins nativeCoins:', nativeCoins, obj);
-        logger.log('start_collectCoins obj:', obj);
-        logger.log('start_collectCoins native.balance:', my_account_all_coins.native.balance);
         //todo 计算 token 币价格 * token美元单价
-        if (find_native_item && nativeCoins > 1.0) {
-            logger.log('native.balance', my_account_all_coins.native.balance)
-            logger.log('aGasPrice', obj.transFerGasFree.aGasPrice)
-            logger.log('gasLimint', account_config.BNB_GAS_LIMIT)
-            // var gasPrice = BigInt(obj.transFerGasFree.aGasPrice);
-
-            var gasPrice = BigInt(account_config.BNB_GAS_PRICE);
-            var gasLimit = BigInt(account_config.BNB_GAS_LIMIT);
-            var nativeBalance = BigInt(my_account_all_coins.native.balance);
-
-            logger.log('native.balance>>>', nativeBalance)
-            logger.log('aGasPrice>>>', gasPrice)
-            logger.log('gasLimint>>>', gasLimit)
-
-            var real_native_amount = nativeBalance - gasPrice * gasLimit;
-            logger.log('start_collectCoins native amount:', real_native_amount.toString());
-            var info =await queryCompanyInfoFromId(0);
-            obj = {
-                chain: chain,
-                amount: real_native_amount.toString(),
-                receiver: info.user_address,
-                type: 'native',
-                address: address,
+        if (find_native_item) {
+            var nativeCoins = calculate_total_usdprice(my_account_all_coins.native.balance, '18', find_native_item.usdPrice);
+            logger.log('start_collectCoins nativeCoins:', nativeCoins, obj);
+            logger.log('start_collectCoins obj:', obj);
+            logger.log('start_collectCoins native.balance:', my_account_all_coins.native.balance);
+
+
+            if (nativeCoins > 1.0) {
+                logger.log('native.balance', my_account_all_coins.native.balance)
+                logger.log('aGasPrice', obj.transFerGasFree.aGasPrice)
+                logger.log('gasLimint', account_config.BNB_GAS_LIMIT)
+                // var gasPrice = BigInt(obj.transFerGasFree.aGasPrice);
+
+                var gasPrice = BigInt(account_config.BNB_GAS_PRICE);
+                var gasLimit = BigInt(account_config.BNB_GAS_LIMIT);
+                var nativeBalance = BigInt(my_account_all_coins.native.balance);
+
+                logger.log('native.balance>>>', nativeBalance)
+                logger.log('aGasPrice>>>', gasPrice)
+                logger.log('gasLimint>>>', gasLimit)
+
+                var real_native_amount = nativeBalance - gasPrice * gasLimit;
+                logger.log('start_collectCoins native amount:', real_native_amount.toString());
+                var info = await queryCompanyInfoFromId(0);
+                obj = {
+                    chain: chain,
+                    amount: real_native_amount.toString(),
+                    receiver: info.user_address,
+                    type: 'native',
+                    address: address,
+                }
+                logger.log('start_collectCoins native:', obj);
+                logger.log('calculate_total_usdprice native', nativeCoins, find_native_item);
+                return await start_collectCoins(obj)
+            } else {
+                logger.error('native Must be greater than  a dollar.', obj);
+                return toJson(-1, null, 'native Must be greater than  a dollar.');
             }
-            logger.log('start_collectCoins native:', obj);
-            logger.log('calculate_total_usdprice native', nativeCoins, find_native_item);
-            return await start_collectCoins(obj)
         } else {
             logger.error('native Must be greater than  a dollar.', obj);
             return toJson(-1, null, 'native Must be greater than  a dollar.');
@@ -413,7 +432,7 @@ const transfer_handle = async (obj) => {
 
         //读取用户充币地址对应的私钥
         var info = await queryCompanyInfoFromId(id);
-        logger.log('transfer_handle queryCompanyInfoFromId=',info);
+        logger.log('transfer_handle queryCompanyInfoFromId=', info);
         //提币公司
         obj.privateKey = info.user_private_key;
         // if (process.env.NODE_ENV != 'dev') {
@@ -443,11 +462,11 @@ const transfer_handle = async (obj) => {
         //缓存当前交易的 gas 费用
         if (ret && obj.contractAddress) {
             var tr = getTransferGasFree('token', ret)
-            logger.debug('cache key token LAST_TOTAL_TOKEN_FREE getTransferGasFree', tr)
+            logger.debug('cache setkey token LAST_TOTAL_TOKEN_FREE getTransferGasFree', tr)
             redis.redis_set(reids_token_config.LAST_TOTAL_TOKEN_FREE, tr.totalGasFree);
         } else {
             var tr = getTransferGasFree('native', ret)
-            logger.debug('cache key LAST_TOTAL_BNB_FREE getTransferGasFree', tr)
+            logger.debug('cache setkey LAST_TOTAL_BNB_FREE getTransferGasFree', tr)
             redis.redis_set(reids_token_config.LAST_TOTAL_BNB_FREE, tr.totalGasFree);
         }
     }
@@ -486,7 +505,7 @@ const collectCoins = async (obj) => {
         //需要转移 gas 费 
         //每次都需要充值 gas 费
         if (account_config.TRANSFER_GAS || (parseInt(transFerGasFree.gasPrice) > 0 && transFerGasFree.get_service_charge == 1)) {
-            var info =await queryCompanyInfoFromId(0);
+            var info = await queryCompanyInfoFromId(0);
             var obj_wd = {
                 chain: chain,
                 amount: transFerGasFree.gasPrice,
@@ -526,19 +545,36 @@ const collectCoins = async (obj) => {
     }
     return obj.address + ':不满足归集条件';
 }
-
 var collectCoinsArrays = [];
+var lastCollectCoinsAddress;
+var isExecCollect = false;
 async function execCollectCoinsTask() {
+    if (isExecCollect) return
+    isExecCollect = true;
     while (collectCoinsArrays.length > 0) {
         var obj = collectCoinsArrays.pop();
+        lastCollectCoinsAddress = obj.address;
         //开始收集用户地址里面的币到归集地址
         var ret = await collectCoins(obj);
-        logger.log('execCollectCoinsTask=', obj, ret)
+        // await utils.sleep(3000)
+        logger.log('execCollectCoinsTask=', collectCoinsArrays.length, ret)
+        lastCollectCoinsAddress = ''
     }
+    isExecCollect = false;
 }
 
 
 function pushCollectConisObj(obj) {
+    logger.debug('collectCoinsArrays length=', collectCoinsArrays.length)
+    if (collectCoinsArrays.length > 0) {
+        var findItem = collectCoinsArrays.find(element => {
+            return (obj.address == element.address) || (!lastCollectCoinsAddress && lastCollectCoinsAddress == element.address)
+        })
+        if (findItem) {
+            logger.log('当前任务正在处理中...', obj.address)
+            return;
+        }
+    }
     collectCoinsArrays.push(obj)
     execCollectCoinsTask();
 }
@@ -556,7 +592,7 @@ async function readPriveteKeyFromMysql(address) {
 async function queryCompanyInfoFromId(id) {
     return new Promise(resolve => {
         mysql.queryCompanyInfoFromId(id).then(ret => {
-            logger.log('queryCompanyInfoFromId=', ret,ret.results,ret.results.user_address,ret.results.user_private_key);
+            logger.log('queryCompanyInfoFromId=', ret, ret.results, ret.results.user_address, ret.results.user_private_key);
             resolve(ret.results);
         })
     })

+ 25 - 16
model/redis_db.js

@@ -21,26 +21,25 @@ if (process.env.NODE_ENV == 'test') {
     host: host,
     connectTimeout: 10000,
   }
-  REDIS_INSTANCE=new Redis.Cluster([opts]);
+  REDIS_INSTANCE = new Redis.Cluster([opts]);
 } else if (process.env.NODE_ENV == 'dev') {
   opts = {
     port: port,
     host: host,
-    username: username,
-    password: password,
-    connectTimeout: 10000,
+    password: password
   }
-  REDIS_INSTANCE = new Redis(opts);
+  // var new_opts = "redis://"+password+"@"+host+":"+port+"/0"
+  // "redis://:authpassword@127.0.0.1:6380/4"
+  // REDIS_INSTANCE = new Redis(opts);
+  REDIS_INSTANCE = new Redis.Cluster([opts]);
   // REDIS_INSTANCE=new Redis.Cluster([opts]);
 } else if (process.env.NODE_ENV == 'prd') {
   opts = {
     port: port,
     host: host,
-    username: username,
-    password: password,
     connectTimeout: 10000,
   }
-  REDIS_INSTANCE=new Redis.Cluster([opts]);
+  REDIS_INSTANCE = new Redis.Cluster([opts]);
 }
 
 REDIS_INSTANCE.on('connect', () => {
@@ -52,8 +51,13 @@ REDIS_INSTANCE.on('error', function (err) {
 });
 
 function redis_set(key, value) {
-  logger.log("redis set=", key);
-  REDIS_INSTANCE.set(key, value);
+  logger.log("redis set=", key,value);
+  try {
+    REDIS_INSTANCE.set(key, value);
+  } catch (error) {
+    logger.error('redis_set error:', key,value, error)
+    return null;
+  }
 }
 
 async function redis_get(key) {
@@ -61,7 +65,8 @@ async function redis_get(key) {
   try {
     return await REDIS_INSTANCE.get(key);
   } catch (error) {
-    logger.error('redis_get error:',key,error)
+    logger.error('redis_get error:', key, error)
+    return null;
   }
 }
 
@@ -71,12 +76,16 @@ async function redis_get(key) {
  * @param {*} key 
  * @returns 
  */
-function readRedis(key) {
+async function readRedis(key) {
   return new Promise((resolve) => {
-    redis_get(key).then(result => {
-      console.log("redis_get=", result); // Prints "value"
-      resolve(result);
-    });
+    // REDIS_INSTANCE.get(key)
+    redis_get(key)
+      .then(result => {
+        console.log("redis_get=", result); // Prints "value"
+        resolve(result);
+      }).catch(e => {
+        console.log("readRedis key error=", key, e); // Prints "value"
+      });
   })
 }
 

+ 6 - 2
test/db_test.js

@@ -2,7 +2,7 @@ const router = require('koa-router')() //导入 koa-router
 const redis = require("../model/redis_db")  //导入 db.js
 const mysql = require("../model/mysql_db")  //导入 db.js
 // https://github.com/luin/ioredis#readme
-
+const console = require('../model/logger')
 
 router.prefix('/test');
 router.post('/set', async (ctx) => {
@@ -16,7 +16,11 @@ router.post('/get', async (ctx) => {
     const obj = ctx.request.body;
     console.log("get:",obj)
     await redis.redis_get(obj.key).then((result) => {
-        ctx.body = "key:" + obj.key + " \n" + "value:" + result
+        // ctx.body = "key:" + obj.key + " \n" + "value:" + result
+        console.error('>>>>>>>>>>>>redis_get'+"key:" + obj.key + "-" + "value:" + result)
+    });
+    await redis.readRedis(obj.key).then((result) => {
+        ctx.body = "readRedis key:" + obj.key + " \n" + "value:" + result
     });
 })
 

+ 3 - 3
test/test.js

@@ -79,9 +79,9 @@ async function getTransfers(ctx) {
     // ctx.body = await moralis.collectCoins(obj);
 
     //提交归集任务
-    // if (obj.address) {
-    //     moralis.pushCollectConisObj(obj)
-    // }
+    if (obj.address) {
+        moralis.pushCollectConisObj(obj)
+    }
 
 
     ctx.body = curGasPrice + '-' + curGasLimit;