| src/main/java/com/nq/service/impl/PriceServicesImpl.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/com/nq/service/impl/StockServiceImpl.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/com/nq/utils/redis/RedisKeyUtil.java | ●●●●● patch | view | raw | blame | history |
src/main/java/com/nq/service/impl/PriceServicesImpl.java
@@ -31,6 +31,8 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.fasterxml.jackson.databind.ObjectMapper; @Service @@ -110,39 +112,65 @@ } } private static String lastResult = null; private static long lastExecuteTime = 0; // 缓存条目类 private static class CacheEntry { String result; long lastExecuteTime; CacheEntry(String result, long lastExecuteTime) { this.result = result; this.lastExecuteTime = lastExecuteTime; } } // 按pid存储缓存条目 private static final Map<String, CacheEntry> cacheMap = new ConcurrentHashMap<>(); // 按pid存储锁对象,实现细粒度锁 private static final Map<String, Object> lockMap = new ConcurrentHashMap<>(); private static final long MIN_INTERVAL_MS = 3000; private static final Object cacheLock = new Object(); public static String doPost(String pid) { long currentTime = System.currentTimeMillis(); // 第一次快速检查(不加锁) if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) { return lastResult; // 1. 快速检查缓存(无锁) CacheEntry cached = cacheMap.get(pid); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 同步块内再次检查并执行 synchronized (cacheLock) { // 2. 获取该pid对应的锁对象 Object pidLock = lockMap.computeIfAbsent(pid, k -> new Object()); // 3. 同步块内再次检查 synchronized (pidLock) { currentTime = System.currentTimeMillis(); if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) { return lastResult; cached = cacheMap.get(pid); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 执行POST请求 // 4. 执行POST请求 String newResult = doActualPost(pid); // 更新缓存 lastResult = newResult; lastExecuteTime = System.currentTimeMillis(); // 5. 更新该pid的缓存 if (newResult != null) { cacheMap.put(pid, new CacheEntry(newResult, System.currentTimeMillis())); return newResult; } else if (cached != null) { // 请求失败,返回旧缓存 return cached.result; } return newResult; return "{\"error\":\"请求失败且无缓存数据\"}"; } } private static String doActualPost(String pid) { String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY"); String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY"); try { URL url = new URL(apiUrl); @@ -162,24 +190,22 @@ } // 读取响应 BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); String inputLine; StringBuilder response = new StringBuilder(); try (BufferedReader in = new BufferedReader( new InputStreamReader(connection.getInputStream()))) { while ((inputLine = in.readLine()) != null) { response.append(inputLine); StringBuilder response = new StringBuilder(); String inputLine; while ((inputLine = in.readLine()) != null) { response.append(inputLine); } return response.toString(); } in.close(); return response.toString(); } catch (Exception e) { e.printStackTrace(); // 返回缓存或错误信息 if (lastResult != null) { return lastResult; } return "{\"error\":\"请求失败:" + e.getMessage() + "\"}"; // 不再直接返回旧缓存,由上层处理 return null; } } src/main/java/com/nq/service/impl/StockServiceImpl.java
@@ -40,6 +40,7 @@ import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; @@ -434,57 +435,145 @@ // 只需要缓存一个最近返回的Object private static Object lastResult = null; private static long lastExecuteTime = 0; // 缓存键类,包含三个参数 private static class CacheKey { private final String pid; private final String interval; private final String stockType; public CacheKey(String pid, String interval, String stockType) { this.pid = pid; this.interval = interval; this.stockType = stockType; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CacheKey cacheKey = (CacheKey) o; return pid.equals(cacheKey.pid) && interval.equals(cacheKey.interval) && stockType.equals(cacheKey.stockType); } @Override public int hashCode() { int result = pid.hashCode(); result = 31 * result + interval.hashCode(); result = 31 * result + stockType.hashCode(); return result; } @Override public String toString() { return "CacheKey{pid='" + pid + "', interval='" + interval + "', stockType='" + stockType + "'}"; } } // 缓存条目类 private static class CacheEntry { Object result; long lastExecuteTime; CacheEntry(Object result, long lastExecuteTime) { this.result = result; this.lastExecuteTime = lastExecuteTime; } } // 缓存存储:每个CacheKey对应一个CacheEntry private static final Map<CacheKey, CacheEntry> cacheMap = new ConcurrentHashMap<>(); // 锁对象存储:每个CacheKey有自己的锁 private static final Map<CacheKey, Object> lockMap = new ConcurrentHashMap<>(); private static final long MIN_INTERVAL_MS = 3000; private static final Object cacheLock = new Object(); @Override public Object getKData(String pid, String interval, String stockType) { CacheKey cacheKey = new CacheKey(pid, interval, stockType); long currentTime = System.currentTimeMillis(); // 第一次快速检查(不加锁) if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) { return lastResult; // 1. 快速检查缓存(无锁) CacheEntry cached = cacheMap.get(cacheKey); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 同步块内再次检查并更新 synchronized (cacheLock) { // 2. 获取该缓存键对应的锁 Object keyLock = lockMap.computeIfAbsent(cacheKey, k -> new Object()); // 3. 同步块内再次检查 synchronized (keyLock) { currentTime = System.currentTimeMillis(); if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) { return lastResult; cached = cacheMap.get(cacheKey); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 获取新数据 // 4. 获取新数据 Object newResult = doGetKData(pid, interval, stockType); // 更新缓存 lastResult = newResult; lastExecuteTime = System.currentTimeMillis(); // 5. 更新缓存 if (newResult != null) { cacheMap.put(cacheKey, new CacheEntry(newResult, System.currentTimeMillis())); return newResult; } else if (cached != null) { // 请求失败,返回旧缓存 return cached.result; } return newResult; return "{\"error\":\"获取K线数据失败\"}"; } } private Object doGetKData(String pid, String interval, String stockType) { EStockType eStockType = EStockType.getEStockTypeByCode(stockType); Object object = HttpUtil.get(eStockType.stockUrl + "kline?pid=" + pid + "&interval=" + interval + "&key=" + eStockType.stockKey); Gson gson = new Gson(); List<kData> dataList = gson.fromJson(object.toString(), new TypeToken<List<kData>>(){}.getType()); try { EStockType eStockType = EStockType.getEStockTypeByCode(stockType); String url = eStockType.stockUrl + "kline?pid=" + pid + "&interval=" + interval + "&key=" + eStockType.stockKey; Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>().eq(Stock::getStockCode, pid).eq(Stock::getStockType, "IN")); BigDecimal nowPrice = iPriceServices.getNowPrice(stock.getStockCode()); Map singleStock = getSingleStock(stock.getStockCode()); StockVO stockVO = (StockVO)singleStock.get("stock"); Object object = HttpUtil.get(url); Gson gson = new Gson(); List<kData> dataList = gson.fromJson(object.toString(), new TypeToken<List<kData>>(){}.getType()); kData lastData = dataList.get(dataList.size() - 1); lastData.setC(nowPrice.toString()); lastData.setO(stockVO.getOpen_px()); lastData.setH(stockVO.getToday_max()); lastData.setL(stockVO.getToday_min()); // 补充实时数据 if (dataList != null && !dataList.isEmpty()) { enrichWithRealTimeData(pid, dataList); } return gson.toJson(dataList); return gson.toJson(dataList); } catch (Exception e) { e.printStackTrace(); return null; // 返回null表示失败 } } private void enrichWithRealTimeData(String pid, List<kData> dataList) { try { Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>() .eq(Stock::getStockCode, pid) .eq(Stock::getStockType, "IN")); if (stock != null) { BigDecimal nowPrice = iPriceServices.getNowPrice(stock.getStockCode()); Map singleStock = getSingleStock(stock.getStockCode()); StockVO stockVO = (StockVO)singleStock.get("stock"); kData lastData = dataList.get(dataList.size() - 1); lastData.setC(nowPrice.toString()); lastData.setO(stockVO.getOpen_px()); lastData.setH(stockVO.getToday_max()); lastData.setL(stockVO.getToday_min()); } } catch (Exception e) { // 实时数据获取失败不影响K线数据返回 e.printStackTrace(); } } @Override src/main/java/com/nq/utils/redis/RedisKeyUtil.java
@@ -20,7 +20,9 @@ import java.math.BigDecimal; import java.net.HttpURLConnection; import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -90,95 +92,108 @@ } } private static final ReentrantLock lock = new ReentrantLock(); private static String lastResult = null; private static long lastExecuteTime = 0; // 缓存条目类 private static class CacheEntry { String result; long lastExecuteTime; CacheEntry(String result, long lastExecuteTime) { this.result = result; this.lastExecuteTime = lastExecuteTime; } } // 按pid存储缓存 private static final Map<String, CacheEntry> cacheMap = new ConcurrentHashMap<>(); // 按pid存储锁对象,实现细粒度锁 private static final Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<>(); private static final long MIN_INTERVAL_MS = 3000; public static String doPost(String pid) { // 快速检查缓存 // 1. 快速检查缓存 long currentTime = System.currentTimeMillis(); if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) { return lastResult; CacheEntry cached = cacheMap.get(pid); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 需要获取锁执行新请求 return executeWithLock(pid); } // 2. 获取该pid对应的锁 ReentrantLock pidLock = lockMap.computeIfAbsent(pid, k -> new ReentrantLock()); private static String executeWithLock(String pid) { String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY"); lock.lock(); // 3. 加锁执行 pidLock.lock(); try { // 双检锁:再次检查缓存 long currentTime = System.currentTimeMillis(); if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) { return lastResult; // 4. 双重检查 currentTime = System.currentTimeMillis(); cached = cacheMap.get(pid); if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) { return cached.result; } // 执行POST请求 String result = null; try { URL url = new URL(apiUrl); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); // 5. 执行实际请求 String result = executePostRequest(pid); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); connection.setDoOutput(true); String postData = "pid=" + pid; try (OutputStream os = connection.getOutputStream()) { byte[] input = postData.getBytes("utf-8"); os.write(input, 0, input.length); } BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); String inputLine; StringBuilder response = new StringBuilder(); while ((inputLine = in.readLine()) != null) { response.append(inputLine); } in.close(); result = response.toString(); // 更新缓存 lastResult = result; lastExecuteTime = System.currentTimeMillis(); } catch (Exception e) { e.printStackTrace(); // 请求失败时返回缓存 if (lastResult != null) { return lastResult; } // 6. 更新缓存 if (result != null) { cacheMap.put(pid, new CacheEntry(result, System.currentTimeMillis())); } else if (cached != null) { // 请求失败,返回旧缓存 return cached.result; } return result; } finally { lock.unlock(); pidLock.unlock(); } } private static String executePostRequest(String pid) { String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY"); try { URL url = new URL(apiUrl); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); connection.setDoOutput(true); connection.setConnectTimeout(5000); connection.setReadTimeout(10000); String postData = "pid=" + pid; try (OutputStream os = connection.getOutputStream()) { byte[] input = postData.getBytes("utf-8"); os.write(input, 0, input.length); } // 读取响应 try (BufferedReader in = new BufferedReader( new InputStreamReader(connection.getInputStream()))) { StringBuilder response = new StringBuilder(); String inputLine; while ((inputLine = in.readLine()) != null) { response.append(inputLine); } return response.toString(); } } catch (Exception e) { e.printStackTrace(); return null; // 返回null表示请求失败 } } public static void setCacheCompanies(Stock stock,String companiesInfo){ RedisShardedPoolUtils.set(RedisKeyConstant.RK_COMPANY_INFO+":"+stock.getStockType()+":"+stock.getStockCode(), new Gson().toJson(companiesInfo)); } public static JSONObject getCacheCompanies(Stock stock){ String companiesInfo = RedisShardedPoolUtils.get(RedisKeyConstant.RK_COMPANY_INFO+":"+stock.getStockType()+":"+stock.getStockCode()); if(companiesInfo.isEmpty()){ return null; } return JSONObject.parseObject(companiesInfo); } }