package org.example.server.impl; import org.apache.commons.lang3.StringUtils; import org.example.websocket.server.WsServer; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.example.dao.CurrencyMapper; import org.example.pojo.Currency; import org.example.pojo.Market; import org.example.pojo.MarketDataOut; import org.example.pojo.bo.AsksBo; import org.example.pojo.bo.BidsBo; import org.example.pojo.bo.MarketBo; import org.example.server.CurrencySerivce; import org.example.util.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.yaml.snakeyaml.error.Mark; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.ArrayList; import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.io.ByteArrayOutputStream; /** * @program: demo * @description: * @create: 2024-07-16 15:23 **/ @Service @Slf4j public class CurrencySerivceImpl extends ServiceImpl implements CurrencySerivce { private static final Gson gson = new Gson(); private ArrayList keys; List mexcList = new ArrayList<>(); List gateList = new ArrayList<>(); List bitgetList = new ArrayList<>(); List kucoinList = new ArrayList<>(); private final Lock syncCurrencyLock = new ReentrantLock(); @Autowired private WsServer wsServer; private void extracted() { ExecutorService executor = Executors.newFixedThreadPool(20); // 适当增加线程数以匹配处理的交易所数量 // 异步处理每个交易所的数据 CompletableFuture mexcFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("mexc"), mexcList, "mexc"), executor); CompletableFuture gateFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("gate"), gateList, "gate"), executor); CompletableFuture bitgetFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("bitget"), bitgetList, "bitget"), executor); CompletableFuture kucoinFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("kucoin"), kucoinList, "kucoin"), executor); // 等待所有任务完成 CompletableFuture.allOf(mexcFuture, gateFuture, bitgetFuture, kucoinFuture).join(); executor.shutdown(); // 关闭线程池 } private void processMexc() { Set mexcSet = RedisUtil.keys("mexc"); for (String key : mexcSet) { MarketBo marketBo = new MarketBo(); String v = RedisUtil.get(key); Map redisValueMap = gson.fromJson(v, new TypeToken>() {}.getType()); Object asksObj = redisValueMap.get("asks"); Object bidsObj = redisValueMap.get("bids"); if (asksObj instanceof List && !((List) asksObj).isEmpty()) { List asksList = (List) asksObj; Map asksMap = (Map) asksList.get(0); AsksBo asksBo = new AsksBo(); asksBo.setP(new BigDecimal(asksMap.get("p").toString())); asksBo.setV(new BigDecimal(asksMap.get("v").toString())); marketBo.setAsks(asksBo); } if (bidsObj instanceof List && !((List) bidsObj).isEmpty()) { List bidsList = (List) bidsObj; Map bidsMap = (Map) bidsList.get(0); BidsBo bidsBo = new BidsBo(); bidsBo.setP(new BigDecimal(bidsMap.get("p").toString())); bidsBo.setV(new BigDecimal(bidsMap.get("v").toString())); marketBo.setBids(bidsBo); } marketBo.setKey(key.replaceAll("mexc", "")); marketBo.setExchange("mexc"); mexcList.add(marketBo); } } private void processMarketData(Set set, List list, String exchangeName) { ArrayList kyes = new ArrayList<>(set); List mget = RedisUtil.mget(kyes); Map resultMap = new HashMap<>(); for (int i = 0; i < kyes.size(); i++) { resultMap.put(kyes.get(i), mget.get(i)); } for (Map.Entry entry : resultMap.entrySet()) { String v = entry.getValue(); String key = entry.getKey(); MarketBo marketBo = new MarketBo(); Map> redisValueMap = gson.fromJson(v, new TypeToken>>() {}.getType()); Map asksObj = redisValueMap.get("asks"); Map bidsObj = redisValueMap.get("bids"); try { if(StringUtils.isNotEmpty(asksObj.get("p")) && StringUtils.isNotEmpty(asksObj.get("v")) && StringUtils.isNotEmpty(bidsObj.get("p")) && StringUtils.isNotEmpty(bidsObj.get("v"))) { AsksBo asksBo = new AsksBo(); asksBo.setP(new BigDecimal(asksObj.get("p").toString())); asksBo.setV(new BigDecimal(asksObj.get("v").toString())); marketBo.setAsks(asksBo); BidsBo bidsBo = new BidsBo(); bidsBo.setP(new BigDecimal(bidsObj.get("p").toString())); bidsBo.setV(new BigDecimal(bidsObj.get("v").toString())); marketBo.setBids(bidsBo); marketBo.setKey(key.replaceAll(exchangeName, "")); marketBo.setExchange(exchangeName); list.add(marketBo); } }catch (Exception e){ e.printStackTrace(); } } } // 计算利润百分比 private static BigDecimal calculateProfitPercentage(BigDecimal buyPrice, BigDecimal sellPrice) { BigDecimal profit = sellPrice.subtract(buyPrice); if (buyPrice.compareTo(BigDecimal.ZERO) == 0) { return BigDecimal.ZERO; // 防止除以零 } BigDecimal profitPercentage = profit.divide(buyPrice, 12, RoundingMode.DOWN).multiply(new BigDecimal(100)); return profitPercentage; } private void findProfitablePairs(List... exchangeLists) { Map> marketMap = new HashMap<>(); List marketDataOuts = Collections.synchronizedList(new ArrayList<>()); // 使用线程安全的集合 // 按币种名称和交易所分组市场数据 for (List exchangeList : exchangeLists) { for (MarketBo market : exchangeList) { String coinName = market.getKey(); String exchangeName = market.getExchange(); marketMap.computeIfAbsent(coinName, k -> new HashMap<>()) .put(exchangeName, market); } } // 过滤出至少在两个交易所都有的币种 marketMap.values().removeIf(exchangeMap -> exchangeMap.size() == 1); // 并行计算利润百分比 ExecutorService executor = Executors.newFixedThreadPool(Math.min(8, marketMap.size())); // 根据需要调整线程池大小 List> futures = new ArrayList<>(); // 获取当前时间,用于后续所有 MarketDataOut 的时间戳 LocalDateTime currentTime = LocalDateTime.now(); String formattedDateTime = currentTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 格式化时间字符串 for (Map.Entry> entry : marketMap.entrySet()) { String coinName = entry.getKey(); Map exchangeMap = entry.getValue(); String[] exchanges = exchangeMap.keySet().toArray(new String[0]); for (int i = 0; i < exchanges.length; i++) { MarketBo markets1 = exchangeMap.get(exchanges[i]); if (markets1.getBids() == null) continue; for (int j = 0; j < exchanges.length; j++) { MarketBo markets2 = exchangeMap.get(exchanges[j]); if(markets1.getExchange().equals(markets2.getExchange())){ continue; } if (markets2.getAsks() == null) continue; CompletableFuture future = CompletableFuture.runAsync(() -> { try { BigDecimal buyPrice = markets1.getBids().getP(); BigDecimal sellPrice = markets2.getAsks().getP(); BigDecimal profitPercentage = calculateProfitPercentage(buyPrice, sellPrice); // 只有在利润百分比大于0时才组装市场数据 if (profitPercentage.compareTo(BigDecimal.ZERO) > 0) { assembleMarketDataOut(coinName, markets1, markets2, profitPercentage, buyPrice, sellPrice, marketDataOuts, formattedDateTime); } } catch (Exception e) { e.printStackTrace(); // 输出异常信息 } }, executor); futures.add(future); // 收集所有 Future } } } // 等待所有任务完成 CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); executor.shutdown(); // 关闭线程池 mexcList.clear(); gateList.clear(); bitgetList.clear(); kucoinList.clear(); pushWs(marketDataOuts); // 推送结果 } private static void assembleMarketDataOut(String coinName, MarketBo markets1, MarketBo markets2, BigDecimal profitPercentage, BigDecimal buyPrice, BigDecimal sellPrice, List marketDataOuts, String formattedDateTime) { MarketDataOut marketDataOut = new MarketDataOut(); marketDataOut.setBaseAsset(coinName.replaceAll("USDT","").toLowerCase().toUpperCase()); // 设置基础资产 marketDataOut.setBuyingPlatform(capitalizeFirstLetter(markets1.getExchange())); // 设置买入平台,首字母大写 marketDataOut.setSellPlatform(capitalizeFirstLetter(markets2.getExchange())); // 设置卖出平台,首字母大写 marketDataOut.setSpread(profitPercentage.setScale(4, RoundingMode.DOWN).toPlainString()); // 设置利润百分比 marketDataOut.setBuyPrice(buyPrice.toPlainString()); // 设置买入价格 marketDataOut.setSellPrice(sellPrice.toPlainString()); // 设置卖出价格 marketDataOut.setBuyNumber(markets1.getBids().getV().setScale(4, RoundingMode.HALF_UP).toPlainString()); // 设置买入数量 marketDataOut.setSellNumber(markets2.getAsks().getV().setScale(4, RoundingMode.HALF_UP).toPlainString()); // 设置卖出数量 marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0, RoundingMode.HALF_UP).toPlainString()); // 设置买入总价 marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.HALF_UP).toPlainString()); // 设置卖出总价 marketDataOut.setServceTime(formattedDateTime); // 设置服务时间 marketDataOut.setBuyAndSell(marketDataOut.getBaseAsset()+marketDataOut.getBuyingPlatform()+marketDataOut.getSellPlatform()); marketDataOuts.add(marketDataOut); // 添加到输出列表 } public static String capitalizeFirstLetter(String word) { if (word == null || word.isEmpty()) { return word; } return Character.toUpperCase(word.charAt(0)) + word.substring(1); } private void pushWs(List marketDataOuts) { // String key = "MARKET_Date"; // // 创建 Gson 对象 Gson gson = new GsonBuilder().setPrettyPrinting().create(); // 将列表转换为 JSON 字符串 String json = gson.toJson(marketDataOuts); // RedisUtil.set(key,json); try { // 全体发送 wsServer.sendMessageToAll(json); } catch (Exception e) { e.printStackTrace(); } } public void quotationCalculation(){ extracted(); findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); // 请确保这些变量有定义和赋值 } public void scheduler(){ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 获取当前时间 Calendar now = Calendar.getInstance(); int currentHour = now.get(Calendar.HOUR_OF_DAY); int currentMinute = now.get(Calendar.MINUTE); // 计算距离下一个1点的小时数和分钟数 int hourToSchedule = 1; int minuteToSchedule = 0; if (currentHour > hourToSchedule || (currentHour == hourToSchedule && currentMinute >= minuteToSchedule)) { now.add(Calendar.DAY_OF_MONTH, 1); // 如果当前时间已过1点,则将调度时间设为明天的1点 } now.set(Calendar.HOUR_OF_DAY, hourToSchedule); now.set(Calendar.MINUTE, minuteToSchedule); now.set(Calendar.SECOND, 0); now.set(Calendar.MILLISECOND, 0); Date scheduledTime = now.getTime(); long initialDelay = scheduledTime.getTime() - System.currentTimeMillis(); // 调度任务 scheduler.scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toMillis(1), TimeUnit.MILLISECONDS); } Runnable task = new Runnable() { @Override public void run() { // 获取所有 Redis 键 Set setKeys = RedisUtil.keys("*"); keys = new ArrayList<>(setKeys); log.info("---------------------更新交易对完成---------------------"); } }; }