1
dd
2025-12-26 63769df44b9baf8339f6e0e151f4b6908386087c
1
3 files modified
382 ■■■■■ changed files
src/main/java/com/nq/service/impl/PriceServicesImpl.java 84 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/service/impl/StockServiceImpl.java 149 ●●●● patch | view | raw | blame | history
src/main/java/com/nq/utils/redis/RedisKeyUtil.java 149 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/service/impl/PriceServicesImpl.java
@@ -31,6 +31,8 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
@@ -110,39 +112,65 @@
        }
    }
    private static String lastResult = null;
    private static long lastExecuteTime = 0;
    // 缓存条目类
    private static class CacheEntry {
        String result;
        long lastExecuteTime;
        CacheEntry(String result, long lastExecuteTime) {
            this.result = result;
            this.lastExecuteTime = lastExecuteTime;
        }
    }
    // 按pid存储缓存条目
    private static final Map<String, CacheEntry> cacheMap = new ConcurrentHashMap<>();
    // 按pid存储锁对象,实现细粒度锁
    private static final Map<String, Object> lockMap = new ConcurrentHashMap<>();
    private static final long MIN_INTERVAL_MS = 3000;
    private static final Object cacheLock = new Object();
    public static String doPost(String pid) {
        long currentTime = System.currentTimeMillis();
        // 第一次快速检查(不加锁)
        if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) {
            return lastResult;
        // 1. 快速检查缓存(无锁)
        CacheEntry cached = cacheMap.get(pid);
        if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) {
            return cached.result;
        }
        // 同步块内再次检查并执行
        synchronized (cacheLock) {
        // 2. 获取该pid对应的锁对象
        Object pidLock = lockMap.computeIfAbsent(pid, k -> new Object());
        // 3. 同步块内再次检查
        synchronized (pidLock) {
            currentTime = System.currentTimeMillis();
            if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) {
                return lastResult;
            cached = cacheMap.get(pid);
            if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) {
                return cached.result;
            }
            // 执行POST请求
            // 4. 执行POST请求
            String newResult = doActualPost(pid);
            // 更新缓存
            lastResult = newResult;
            lastExecuteTime = System.currentTimeMillis();
            // 5. 更新该pid的缓存
            if (newResult != null) {
                cacheMap.put(pid, new CacheEntry(newResult, System.currentTimeMillis()));
                return newResult;
            } else if (cached != null) {
                // 请求失败,返回旧缓存
                return cached.result;
            }
            return newResult;
            return "{\"error\":\"请求失败且无缓存数据\"}";
        }
    }
    private static String doActualPost(String pid) {
        String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY");
        String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") +
                "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY");
        try {
            URL url = new URL(apiUrl);
@@ -162,24 +190,22 @@
            }
            // 读取响应
            BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            String inputLine;
            StringBuilder response = new StringBuilder();
            try (BufferedReader in = new BufferedReader(
                    new InputStreamReader(connection.getInputStream()))) {
            while ((inputLine = in.readLine()) != null) {
                response.append(inputLine);
                StringBuilder response = new StringBuilder();
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    response.append(inputLine);
                }
                return response.toString();
            }
            in.close();
            return response.toString();
        } catch (Exception e) {
            e.printStackTrace();
            // 返回缓存或错误信息
            if (lastResult != null) {
                return lastResult;
            }
            return "{\"error\":\"请求失败:" + e.getMessage() + "\"}";
            // 不再直接返回旧缓存,由上层处理
            return null;
        }
    }
src/main/java/com/nq/service/impl/StockServiceImpl.java
@@ -40,6 +40,7 @@
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
@@ -434,57 +435,145 @@
    // 只需要缓存一个最近返回的Object
    private static Object lastResult = null;
    private static long lastExecuteTime = 0;
    // 缓存键类,包含三个参数
    private static class CacheKey {
        private final String pid;
        private final String interval;
        private final String stockType;
        public CacheKey(String pid, String interval, String stockType) {
            this.pid = pid;
            this.interval = interval;
            this.stockType = stockType;
        }
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            CacheKey cacheKey = (CacheKey) o;
            return pid.equals(cacheKey.pid) &&
                    interval.equals(cacheKey.interval) &&
                    stockType.equals(cacheKey.stockType);
        }
        @Override
        public int hashCode() {
            int result = pid.hashCode();
            result = 31 * result + interval.hashCode();
            result = 31 * result + stockType.hashCode();
            return result;
        }
        @Override
        public String toString() {
            return "CacheKey{pid='" + pid + "', interval='" + interval + "', stockType='" + stockType + "'}";
        }
    }
    // 缓存条目类
    private static class CacheEntry {
        Object result;
        long lastExecuteTime;
        CacheEntry(Object result, long lastExecuteTime) {
            this.result = result;
            this.lastExecuteTime = lastExecuteTime;
        }
    }
    // 缓存存储:每个CacheKey对应一个CacheEntry
    private static final Map<CacheKey, CacheEntry> cacheMap = new ConcurrentHashMap<>();
    // 锁对象存储:每个CacheKey有自己的锁
    private static final Map<CacheKey, Object> lockMap = new ConcurrentHashMap<>();
    private static final long MIN_INTERVAL_MS = 3000;
    private static final Object cacheLock = new Object();
    @Override
    public Object getKData(String pid, String interval, String stockType) {
        CacheKey cacheKey = new CacheKey(pid, interval, stockType);
        long currentTime = System.currentTimeMillis();
        // 第一次快速检查(不加锁)
        if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) {
            return lastResult;
        // 1. 快速检查缓存(无锁)
        CacheEntry cached = cacheMap.get(cacheKey);
        if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) {
            return cached.result;
        }
        // 同步块内再次检查并更新
        synchronized (cacheLock) {
        // 2. 获取该缓存键对应的锁
        Object keyLock = lockMap.computeIfAbsent(cacheKey, k -> new Object());
        // 3. 同步块内再次检查
        synchronized (keyLock) {
            currentTime = System.currentTimeMillis();
            if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) {
                return lastResult;
            cached = cacheMap.get(cacheKey);
            if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) {
                return cached.result;
            }
            // 获取新数据
            // 4. 获取新数据
            Object newResult = doGetKData(pid, interval, stockType);
            // 更新缓存
            lastResult = newResult;
            lastExecuteTime = System.currentTimeMillis();
            // 5. 更新缓存
            if (newResult != null) {
                cacheMap.put(cacheKey, new CacheEntry(newResult, System.currentTimeMillis()));
                return newResult;
            } else if (cached != null) {
                // 请求失败,返回旧缓存
                return cached.result;
            }
            return newResult;
            return "{\"error\":\"获取K线数据失败\"}";
        }
    }
    private Object doGetKData(String pid, String interval, String stockType) {
        EStockType eStockType = EStockType.getEStockTypeByCode(stockType);
        Object object = HttpUtil.get(eStockType.stockUrl + "kline?pid=" + pid + "&interval=" + interval + "&key=" + eStockType.stockKey);
        Gson gson = new Gson();
        List<kData> dataList = gson.fromJson(object.toString(), new TypeToken<List<kData>>(){}.getType());
        try {
            EStockType eStockType = EStockType.getEStockTypeByCode(stockType);
            String url = eStockType.stockUrl + "kline?pid=" + pid +
                    "&interval=" + interval + "&key=" + eStockType.stockKey;
        Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>().eq(Stock::getStockCode, pid).eq(Stock::getStockType, "IN"));
        BigDecimal nowPrice = iPriceServices.getNowPrice(stock.getStockCode());
        Map singleStock = getSingleStock(stock.getStockCode());
        StockVO stockVO = (StockVO)singleStock.get("stock");
            Object object = HttpUtil.get(url);
            Gson gson = new Gson();
            List<kData> dataList = gson.fromJson(object.toString(),
                    new TypeToken<List<kData>>(){}.getType());
        kData lastData = dataList.get(dataList.size() - 1);
        lastData.setC(nowPrice.toString());
        lastData.setO(stockVO.getOpen_px());
        lastData.setH(stockVO.getToday_max());
        lastData.setL(stockVO.getToday_min());
            // 补充实时数据
            if (dataList != null && !dataList.isEmpty()) {
                enrichWithRealTimeData(pid, dataList);
            }
        return gson.toJson(dataList);
            return gson.toJson(dataList);
        } catch (Exception e) {
            e.printStackTrace();
            return null; // 返回null表示失败
        }
    }
    private void enrichWithRealTimeData(String pid, List<kData> dataList) {
        try {
            Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>()
                    .eq(Stock::getStockCode, pid)
                    .eq(Stock::getStockType, "IN"));
            if (stock != null) {
                BigDecimal nowPrice = iPriceServices.getNowPrice(stock.getStockCode());
                Map singleStock = getSingleStock(stock.getStockCode());
                StockVO stockVO = (StockVO)singleStock.get("stock");
                kData lastData = dataList.get(dataList.size() - 1);
                lastData.setC(nowPrice.toString());
                lastData.setO(stockVO.getOpen_px());
                lastData.setH(stockVO.getToday_max());
                lastData.setL(stockVO.getToday_min());
            }
        } catch (Exception e) {
            // 实时数据获取失败不影响K线数据返回
            e.printStackTrace();
        }
    }
    @Override
src/main/java/com/nq/utils/redis/RedisKeyUtil.java
@@ -20,7 +20,9 @@
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -90,95 +92,108 @@
        }
    }
    private static final ReentrantLock lock = new ReentrantLock();
    private static String lastResult = null;
    private static long lastExecuteTime = 0;
    // 缓存条目类
    private static class CacheEntry {
        String result;
        long lastExecuteTime;
        CacheEntry(String result, long lastExecuteTime) {
            this.result = result;
            this.lastExecuteTime = lastExecuteTime;
        }
    }
    // 按pid存储缓存
    private static final Map<String, CacheEntry> cacheMap = new ConcurrentHashMap<>();
    // 按pid存储锁对象,实现细粒度锁
    private static final Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
    private static final long MIN_INTERVAL_MS = 3000;
    public static String doPost(String pid) {
        // 快速检查缓存
        // 1. 快速检查缓存
        long currentTime = System.currentTimeMillis();
        if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) {
            return lastResult;
        CacheEntry cached = cacheMap.get(pid);
        if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) {
            return cached.result;
        }
        // 需要获取锁执行新请求
        return executeWithLock(pid);
    }
        // 2. 获取该pid对应的锁
        ReentrantLock pidLock = lockMap.computeIfAbsent(pid, k -> new ReentrantLock());
    private static String executeWithLock(String pid) {
        String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY");
        lock.lock();
        // 3. 加锁执行
        pidLock.lock();
        try {
            // 双检锁:再次检查缓存
            long currentTime = System.currentTimeMillis();
            if (lastResult != null && (currentTime - lastExecuteTime) < MIN_INTERVAL_MS) {
                return lastResult;
            // 4. 双重检查
            currentTime = System.currentTimeMillis();
            cached = cacheMap.get(pid);
            if (cached != null && (currentTime - cached.lastExecuteTime) < MIN_INTERVAL_MS) {
                return cached.result;
            }
            // 执行POST请求
            String result = null;
            try {
                URL url = new URL(apiUrl);
                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            // 5. 执行实际请求
            String result = executePostRequest(pid);
                connection.setRequestMethod("POST");
                connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
                connection.setDoOutput(true);
                String postData = "pid=" + pid;
                try (OutputStream os = connection.getOutputStream()) {
                    byte[] input = postData.getBytes("utf-8");
                    os.write(input, 0, input.length);
                }
                BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
                String inputLine;
                StringBuilder response = new StringBuilder();
                while ((inputLine = in.readLine()) != null) {
                    response.append(inputLine);
                }
                in.close();
                result = response.toString();
                // 更新缓存
                lastResult = result;
                lastExecuteTime = System.currentTimeMillis();
            } catch (Exception e) {
                e.printStackTrace();
                // 请求失败时返回缓存
                if (lastResult != null) {
                    return lastResult;
                }
            // 6. 更新缓存
            if (result != null) {
                cacheMap.put(pid, new CacheEntry(result, System.currentTimeMillis()));
            } else if (cached != null) {
                // 请求失败,返回旧缓存
                return cached.result;
            }
            return result;
        } finally {
            lock.unlock();
            pidLock.unlock();
        }
    }
    private static String executePostRequest(String pid) {
        String apiUrl = PropertiesUtil.getProperty("JS_IN_HTTP_URL") +
                "stock?key=" + PropertiesUtil.getProperty("JS_IN_KEY");
        try {
            URL url = new URL(apiUrl);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("POST");
            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
            connection.setDoOutput(true);
            connection.setConnectTimeout(5000);
            connection.setReadTimeout(10000);
            String postData = "pid=" + pid;
            try (OutputStream os = connection.getOutputStream()) {
                byte[] input = postData.getBytes("utf-8");
                os.write(input, 0, input.length);
            }
            // 读取响应
            try (BufferedReader in = new BufferedReader(
                    new InputStreamReader(connection.getInputStream()))) {
                StringBuilder response = new StringBuilder();
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    response.append(inputLine);
                }
                return response.toString();
            }
        } catch (Exception e) {
            e.printStackTrace();
            return null; // 返回null表示请求失败
        }
    }
    public  static  void setCacheCompanies(Stock stock,String companiesInfo){
        RedisShardedPoolUtils.set(RedisKeyConstant.RK_COMPANY_INFO+":"+stock.getStockType()+":"+stock.getStockCode(),
                new Gson().toJson(companiesInfo));
    }
    public static JSONObject getCacheCompanies(Stock stock){
       String  companiesInfo =  RedisShardedPoolUtils.get(RedisKeyConstant.RK_COMPANY_INFO+":"+stock.getStockType()+":"+stock.getStockCode());
       if(companiesInfo.isEmpty()){
           return  null;
       }
       return  JSONObject.parseObject(companiesInfo);
    }
}