package com.nq.utils.redis; import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import com.nq.enums.EStockType; import com.nq.pojo.DataStockBean; import com.nq.pojo.Stock; import com.nq.pojo.StockRealTimeBean; import com.nq.utils.PropertiesUtil; import com.nq.utils.stock.sina.StockApi; import org.apache.http.util.TextUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStream; import java.math.BigDecimal; import java.net.HttpURLConnection; import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; public class RedisKeyUtil { private static final Logger log = LoggerFactory.getLogger(RedisKeyUtil.class); /** * 缓存股票数据源到redis * */ public static void setCaCheKeyBaseStock(EStockType eStockType, DataStockBean dataStockBean){ RedisShardedPoolUtils.set(RedisKeyConstant.RK_BASE_STOCK+":"+eStockType.getCode()+":"+dataStockBean.getId(), new Gson().toJson(dataStockBean)); } /** * 获取股票数据 * */ public static DataStockBean getCacheBaseStock(Stock stock){ String cacheBaseData = RedisShardedPoolUtils.get(RedisKeyConstant.RK_BASE_STOCK+":"+stock.getStockType()+":"+stock.getStockCode()); return new Gson().fromJson(cacheBaseData, DataStockBean.class); } /** * 保存实时数据到redis * */ public static void setCacheRealTimeStock(EStockType eStockType, StockRealTimeBean stockRealTimeBean){ RedisShardedPoolUtils.set(RedisKeyConstant.RK_REAL_TIME_STOCK+":"+eStockType.getCode()+":"+stockRealTimeBean.getPid(), new Gson().toJson(stockRealTimeBean)); } public static StockRealTimeBean getCacheRealTimeStock(Stock stock){ StockRealTimeBean stockRealTimeBean = null; String cacheBaseData = RedisShardedPoolUtils.get(RedisKeyConstant.RK_REAL_TIME_STOCK+":"+stock.getStockType()+":"+stock.getStockCode()); if(!TextUtils.isEmpty(cacheBaseData)){ stockRealTimeBean = new Gson().fromJson(cacheBaseData, StockRealTimeBean.class); } if(stockRealTimeBean == null){ String s = doPost(stock.getStockCode()); Map stringObjectMap = jsonToMap(s); stockRealTimeBean = new StockRealTimeBean(); stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()); stockRealTimeBean.setLast(stringObjectMap.get("Last").toString()); stockRealTimeBean.setHigh(stringObjectMap.get("High").toString()); stockRealTimeBean.setLow(stringObjectMap.get("Low").toString()); stockRealTimeBean.setBid(stringObjectMap.get("Id").toString()); stockRealTimeBean.setPc(stringObjectMap.get("PrevClose").toString()); stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString()); } stockRealTimeBean.setPcp(stockRealTimeBean.getPcp().replace("%","")); return stockRealTimeBean; } public static Map jsonToMap(String json) { ObjectMapper objectMapper = new ObjectMapper(); try { Object[] array = objectMapper.readValue(json, Object[].class); Gson gson = new Gson(); String s = gson.toJson(array[0]); Map map = objectMapper.readValue(s, Map.class); return map; } catch (JsonProcessingException e) { throw new RuntimeException(e); } } // 缓存条目类 private static class CacheEntry { String result; long lastExecuteTime; CacheEntry(String result, long lastExecuteTime) { this.result = result; this.lastExecuteTime = lastExecuteTime; } } // 按pid存储缓存 private static final Map cacheMap = new ConcurrentHashMap<>(); // 按pid存储锁对象,实现细粒度锁 private static final Map lockMap = new ConcurrentHashMap<>(); private static final long MIN_INTERVAL_MS = 3000; public static String doPost(String pid) { // 1. 快速检查缓存 long currentTime = System.currentTimeMillis(); CacheEntry cached = cacheMap.get(pid); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 2. 获取该pid对应的锁 ReentrantLock pidLock = lockMap.computeIfAbsent(pid, k -> new ReentrantLock()); // 3. 加锁执行 pidLock.lock(); try { // 4. 双重检查 currentTime = System.currentTimeMillis(); cached = cacheMap.get(pid); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 5. 执行实际请求 String result = executePostRequest(pid); // 6. 更新缓存 if (result != null) { cacheMap.put(pid, new CacheEntry(result, System.currentTimeMillis())); } else if (cached != null) { // 请求失败,返回旧缓存 return cached.result; } return result; } finally { pidLock.unlock(); } } private static String executePostRequest(String pid) { String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY"); try { URL url = new URL(apiUrl); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); connection.setDoOutput(true); connection.setConnectTimeout(5000); connection.setReadTimeout(10000); String postData = "pid=" + pid; try (OutputStream os = connection.getOutputStream()) { byte[] input = postData.getBytes("utf-8"); os.write(input, 0, input.length); } // 读取响应 try (BufferedReader in = new BufferedReader( new InputStreamReader(connection.getInputStream()))) { StringBuilder response = new StringBuilder(); String inputLine; while ((inputLine = in.readLine()) != null) { response.append(inputLine); } return response.toString(); } } catch (Exception e) { e.printStackTrace(); return null; // 返回null表示请求失败 } } public static void setCacheCompanies(Stock stock,String companiesInfo){ RedisShardedPoolUtils.set(RedisKeyConstant.RK_COMPANY_INFO+":"+stock.getStockType()+":"+stock.getStockCode(), new Gson().toJson(companiesInfo)); } }