|
@@ -8,8 +8,8 @@ import com.google.common.collect.Iterables;
|
|
|
import com.tzld.piaoquan.recommend.server.framework.candidiate.Queue;
|
|
|
import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
|
|
|
import com.tzld.piaoquan.recommend.server.framework.utils.FixedThreadPoolHelper;
|
|
|
-import com.tzld.piaoquan.recommend.server.framework.utils.IndexUtils;
|
|
|
import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
|
|
|
+import com.tzld.piaoquan.recommend.server.model.Video;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
@@ -17,16 +17,12 @@ import org.apache.commons.lang3.tuple.Pair;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
|
|
|
-public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(RedisBackedQueueWithoutMeta.class);
|
|
|
-
|
|
|
+public class RedisBackedQueue implements QueueProvider<Video> {
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(RedisBackedQueue.class);
|
|
|
private static final int DEFAULT_TTL = 7 * 24 * 60 * 60;
|
|
|
private static final long CACHE_MAXIMUMSIZE = 50 * 10000; // default 50w
|
|
|
private static final long CACHE_TIMEOUT_MS = 5 * 60 * 1000L;
|
|
@@ -39,14 +35,15 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
// 不能为static
|
|
|
private final BlockingQueue<QueueName> asyncRefreshQueue = new LinkedBlockingQueue<QueueName>(50000);
|
|
|
private final RedisSmartClient client;
|
|
|
- private final LoadingCache<QueueName, Pair<Long, Queue<String>>> cache;
|
|
|
+ private final LoadingCache<QueueName, Pair<Long, Queue<Video>>> cache;
|
|
|
|
|
|
/**
|
|
|
* 实例化 RedisBackedQueue, 并提供基于Guava Cache的本地缓存管理工作
|
|
|
+ *
|
|
|
* @param client redis-cluster 连接池
|
|
|
* @param cacheTimeOutMs 缓存有效期, 开启缓存的情况下,默认缓存5min,以写时间为基准
|
|
|
*/
|
|
|
- public RedisBackedQueueWithoutMeta(RedisSmartClient client, final long cacheTimeOutMs) {
|
|
|
+ public RedisBackedQueue(RedisSmartClient client, final long cacheTimeOutMs) {
|
|
|
this.client = client;
|
|
|
|
|
|
cache = CacheBuilder.newBuilder()
|
|
@@ -60,7 +57,7 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public RedisBackedQueueWithoutMeta(RedisSmartClient client) {
|
|
|
+ public RedisBackedQueue(RedisSmartClient client) {
|
|
|
this(client, CACHE_TIMEOUT_MS);
|
|
|
}
|
|
|
|
|
@@ -68,50 +65,30 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
return client;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ // redis读取队列内内容
|
|
|
public Index get(QueueName name) throws Exception {
|
|
|
- byte[] bytes = client.get(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
|
|
|
- Index index = new Index(bytes);
|
|
|
+ String valueJson = client.get(name.toString());
|
|
|
+ Index index = new Index(valueJson);
|
|
|
return index;
|
|
|
}
|
|
|
|
|
|
- public Index getDirect(QueueName name) throws Exception {
|
|
|
- return get(name);
|
|
|
- }
|
|
|
-
|
|
|
- public long dbSize() {
|
|
|
- return this.cache.size();
|
|
|
- }
|
|
|
-
|
|
|
- public long updateIndex(QueueName name, Index index) throws Exception {
|
|
|
- return updateIndex(name.toString(), index);
|
|
|
- }
|
|
|
|
|
|
- public long updateIndex(String name, Index index) throws Exception {
|
|
|
- Preconditions.checkArgument(StringUtils.equals(name, index.getIndexName()), "queue name not equal indexName, invalid Param");
|
|
|
- long start = System.nanoTime();
|
|
|
-
|
|
|
- // TODO fix redis key value details
|
|
|
- String result = client.setex(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1), this.DEFAULT_TTL, index.toString().getBytes());
|
|
|
- logger.debug("updateIndexResult= {}, queue= {}, indexSize= {}", new Object[]{result, name, index.getIndexEntryList().size()});
|
|
|
-
|
|
|
- return CollectionUtils.isNotEmpty(index.getIndexEntryList()) ? index.getIndexEntryList().size() : 0l;
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* 加载指定索引, 并指定索引本地缓存的TTL
|
|
|
- *
|
|
|
* @param name QueueName
|
|
|
* @return
|
|
|
* @throws ExecutionException
|
|
|
*/
|
|
|
- public Queue<String> load(QueueName name) throws ExecutionException {
|
|
|
+ public Queue<Video> load(QueueName name) throws ExecutionException {
|
|
|
|
|
|
- Pair<Long, Queue<String>> cachedQueue = cache.get(name);
|
|
|
+ Pair<Long, Queue<Video>> cachedQueue = cache.get(name);
|
|
|
if (cachedQueue == null) {
|
|
|
// 清理本地缓存中当前key的数据
|
|
|
cache.invalidate(name);
|
|
|
logger.debug("invalidate queue [{}]", name);
|
|
|
- return new Queue<String>(name.toString());
|
|
|
+ return new Queue<Video>(name.toString());
|
|
|
}
|
|
|
// update key refresh time
|
|
|
if (cachedQueue.getKey() == Long.MIN_VALUE) {
|
|
@@ -140,21 +117,20 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
return cachedQueue.getValue();
|
|
|
}
|
|
|
|
|
|
- public Map<QueueName, Queue<String>> loads(List<QueueName> names) throws ExecutionException {
|
|
|
+ public Map<QueueName, Queue<Video>> loads(List<QueueName> names) throws ExecutionException {
|
|
|
return this.loads(names, 200, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
- public Map<QueueName, Queue<String>> loads(List<QueueName> names, long timeout, TimeUnit timeUnit) throws ExecutionException {
|
|
|
+ public Map<QueueName, Queue<Video>> loads(List<QueueName> names, long timeout, TimeUnit timeUnit) throws ExecutionException {
|
|
|
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
final Iterable<List<QueueName>> namesIterable = Iterables.partition(names, 1);
|
|
|
- final List<Callable<Map<QueueName, Queue<String>>>> callables = new ArrayList<Callable<Map<QueueName, Queue<String>>>>();
|
|
|
+ final List<Callable<Map<QueueName, Queue<Video>>>> callables =
|
|
|
+ new ArrayList<Callable<Map<QueueName, Queue<Video>>>>();
|
|
|
for (final List<QueueName> queueNames : namesIterable) {
|
|
|
- callables.add(new Callable<Map<QueueName, Queue<String>>>() {
|
|
|
+ callables.add(new Callable<Map<QueueName, Queue<Video>>>() {
|
|
|
@Override
|
|
|
- public Map<QueueName, Queue<String>> call() {
|
|
|
-
|
|
|
- Map<QueueName, Queue<String>> result = new HashMap<QueueName, Queue<String>>();
|
|
|
+ public Map<QueueName, Queue<Video>> call() {
|
|
|
+ Map<QueueName, Queue<Video>> result = new HashMap<QueueName, Queue<Video>>();
|
|
|
for (final QueueName name : queueNames) {
|
|
|
try {
|
|
|
Queue queue = load(name);
|
|
@@ -168,13 +144,13 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Map<QueueName, Queue<String>> loadQueue = new ConcurrentHashMap<QueueName, Queue<String>>();
|
|
|
+ Map<QueueName, Queue<Video>> loadQueue = new ConcurrentHashMap<QueueName, Queue<Video>>();
|
|
|
try {
|
|
|
- List<Future<Map<QueueName, Queue<String>>>> futures = executorService.invokeAll(callables);
|
|
|
- for (Future<Map<QueueName, Queue<String>>> future : futures) {
|
|
|
+ List<Future<Map<QueueName, Queue<Video>>>> futures = executorService.invokeAll(callables);
|
|
|
+ for (Future<Map<QueueName, Queue<Video>>> future : futures) {
|
|
|
if (future.isDone() && !future.isCancelled()) {
|
|
|
try {
|
|
|
- Map<QueueName, Queue<String>> ret = future.get();
|
|
|
+ Map<QueueName, Queue<Video>> ret = future.get();
|
|
|
loadQueue.putAll(ret);
|
|
|
} catch (ExecutionException ee) {
|
|
|
logger.error("Failed to execute load a queue partition for {}", ExceptionUtils.getFullStackTrace(ee));
|
|
@@ -184,22 +160,9 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
} catch (InterruptedException ie) {
|
|
|
logger.error("Interrupted when waiting for parallel queue load finish for {}", ExceptionUtils.getFullStackTrace(ie));
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
return loadQueue;
|
|
|
}
|
|
|
|
|
|
- public long len(QueueName name) throws Exception {
|
|
|
- byte[] bytes = client.get(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
|
|
|
- // 检索结果
|
|
|
- Index index = new Index(bytes);
|
|
|
-
|
|
|
- return CollectionUtils.isNotEmpty(index.getIndexEntryList()) ? index.getIndexEntryList().size() : 0L;
|
|
|
- }
|
|
|
-
|
|
|
- public void delete(QueueName name) throws Exception {
|
|
|
- client.del(IndexUtils.convertKey(name, IndexUtils.queuePrefixV1));
|
|
|
- }
|
|
|
|
|
|
|
|
|
// 使用Guava Cache 的异步刷新接口, 异步刷新到期的key
|
|
@@ -216,7 +179,7 @@ public class RedisBackedQueueWithoutMeta implements QueueProvider<String> {
|
|
|
logger.debug("async refresh thread get queuename [{}]", queueName); // get itemid
|
|
|
}
|
|
|
|
|
|
- Pair<Long, Queue<String>> entry = cache.get(queueName);
|
|
|
+ Pair<Long, Queue<Video>> entry = cache.get(queueName);
|
|
|
// check refresh
|
|
|
if (startTime > entry.getKey()) {
|
|
|
cache.refresh(queueName);
|