package project.redis.interal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.TaskExecutor; import kernel.util.ThreadUtils; import project.redis.RedisHandler; public class RedisHandlerImpl implements RedisHandler, InitializingBean, Runnable { private Redis redis; private TaskExecutor taskExecutor; private Logger logger = LoggerFactory.getLogger(RedisHandlerImpl.class); /** * get * @param key * @param object */ public Object get(String key) { return redis.get(key); } /** * 批量get,与单个get存在性能区别,一次连接(redispool),遍历取到数据后返回 */ public Object[] getList(String[] keys) { return redis.getList(keys); } @Override public HashMap getMap(Set keys) { return redis.getMap(keys); } /** * set 同步 * * @param key * @param object */ public void setSync(String key, Object object) { redis.setSync(key, object); } /** * set 批量同步 * * @param params 需要写入的 k-v 数据 */ public void setBatchSync(Map params) { redis.setBatchSync(params); } /** * set 异步 * * @param key * @param object */ public void setAsyn(String key, Object object) { AsynItem item = new AsynItem(key, object, AsynItem.TYPE_MAP); AsynItemQueue.add(item); } public void remove(String key) { redis.remove(key); } /** * push 同步 */ public void pushSync(String key, Object object) { redis.pushSync(key, object); } /** * push 异步。批量处理在业务里考虑 * */ public void pushAsyn(String key, Object object) { AsynItem item = new AsynItem(key, object, AsynItem.TYPE_QUEUE); AsynItemQueue.add(item); } /** * push 异步。批量处理在业务里考虑 * */ public void pushBatchAsyn(List> params) { redis.pushBatchSync(params); } /** * 从队列尾取一个Object * */ public Object poll(String key) { return redis.poll(key); } /** * 服务运行: 1. 从消息队列获取message 2.调用currentProvider发送短信 */ public void run() { List list = new ArrayList(); while (true) { try { AsynItem item = AsynItemQueue.poll(); if (item != null) { list.add(item); } if ((item == null && list.size() > 0) || list.size() >= 100) { taskExecutor.execute(new HandleRunner(list)); list = new ArrayList(); } if (item == null) { ThreadUtils.sleep(50); } } catch (Throwable e) { logger.error("RedisHandlerImpl taskExecutor.execute() fail", e); } } } public class HandleRunner implements Runnable { private List list; public HandleRunner(List list) { this.list = list; } public void run() { try { Map params_map = new ConcurrentHashMap(); List> params_queue = new ArrayList>(); for (int i = 0; i < list.size(); i++) { AsynItem item = list.get(i); if (AsynItem.TYPE_MAP.equals(item.getType())) { params_map.put(item.getKey(), item.getObject()); } else if (AsynItem.TYPE_QUEUE.equals(item.getType())) { Map map = new HashMap(); map.put(item.getKey(), item.getObject()); params_queue.add(map); } } if (params_map.size() > 0) { redis.setBatchSync(params_map); } if (params_queue.size() > 0) { redis.pushBatchSync(params_queue); } } catch (Throwable t) { logger.error("RedisHandlerImpl taskExecutor.execute() fail", t); } } } public void afterPropertiesSet() throws Exception { new Thread(this, "RedisHandlerImplServer").start(); if (logger.isInfoEnabled()) { logger.info("启动Redis(RedisHandlerImplServer)服务!"); } } public void setRedis(Redis redis) { this.redis = redis; } public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } }