package org.example.server.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.example.dao.CurrencyMapper; import org.example.pojo.Currency; import org.example.pojo.Market; import org.example.pojo.MarketDataOut; import org.example.pojo.bo.AsksBo; import org.example.pojo.bo.BidsBo; import org.example.pojo.bo.MarketBo; import org.example.server.CurrencySerivce; import org.example.util.RedisUtil; import org.example.websocket.server.WsServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.yaml.snakeyaml.error.Mark; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.ArrayList; import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.io.ByteArrayOutputStream; /** * @program: demo * @description: * @create: 2024-07-16 15:23 **/ @Service @Slf4j public class CurrencySerivceImpl extends ServiceImpl implements CurrencySerivce { private static final Gson gson = new Gson(); private ArrayList keys; List mexcList = new ArrayList<>(); List gateList = new ArrayList<>(); List bitgetList = new ArrayList<>(); List kucoinList = new ArrayList<>(); private final Lock syncCurrencyLock = new ReentrantLock(); @Autowired private WsServer wsServer; private void extracted() { ExecutorService executor = Executors.newFixedThreadPool(4); // 适当增加线程数以匹配处理的交易所数量 // 异步处理每个交易所的数据 CompletableFuture mexcFuture = CompletableFuture.runAsync(() -> processMexc(), executor); CompletableFuture gateFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("gate"), gateList, "gate"), executor); CompletableFuture bitgetFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("bitget"), bitgetList, "bitget"), executor); CompletableFuture kucoinFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("kucoin"), kucoinList, "kucoin"), executor); // 等待所有任务完成 CompletableFuture.allOf(mexcFuture, gateFuture, bitgetFuture, kucoinFuture).join(); executor.shutdown(); // 关闭线程池 } private void processMexc() { Set mexcSet = RedisUtil.keys("mexc"); for (String key : mexcSet) { MarketBo marketBo = new MarketBo(); String v = RedisUtil.get(key); Map redisValueMap = gson.fromJson(v, new TypeToken>() {}.getType()); Object asksObj = redisValueMap.get("asks"); Object bidsObj = redisValueMap.get("bids"); if (asksObj instanceof List && !((List) asksObj).isEmpty()) { List asksList = (List) asksObj; Map asksMap = (Map) asksList.get(0); AsksBo asksBo = new AsksBo(); asksBo.setP(new BigDecimal(asksMap.get("p").toString())); asksBo.setV(new BigDecimal(asksMap.get("v").toString())); marketBo.setAsks(asksBo); } if (bidsObj instanceof List && !((List) bidsObj).isEmpty()) { List bidsList = (List) bidsObj; Map bidsMap = (Map) bidsList.get(bidsList.size() - 1); BidsBo bidsBo = new BidsBo(); bidsBo.setP(new BigDecimal(bidsMap.get("p").toString())); bidsBo.setV(new BigDecimal(bidsMap.get("v").toString())); marketBo.setBids(bidsBo); } marketBo.setKey(key.replaceAll("mexc", "")); marketBo.setExchange("mexc"); mexcList.add(marketBo); } } private void processMarketData(Set set, List list, String exchangeName) { for (String key : set) { String v = RedisUtil.get(key); Map redisValueMap = gson.fromJson(v, new TypeToken>() {}.getType()); Object asksObj = redisValueMap.get("asks"); Object bidsObj = redisValueMap.get("bids"); MarketBo marketBo = new MarketBo(); if (asksObj instanceof List && !((List) asksObj).isEmpty()) { List asksList = (List) asksObj; String[][] dataArray = gson.fromJson(gson.toJson(asksList), String[][].class); String[] asksData = dataArray[0]; AsksBo asksBo = new AsksBo(); asksBo.setP(new BigDecimal(asksData[0])); asksBo.setV(new BigDecimal(asksData[1])); marketBo.setAsks(asksBo); } if (bidsObj instanceof List && !((List) bidsObj).isEmpty()) { List bidsList = (List) bidsObj; String[][] dataArray = gson.fromJson(gson.toJson(bidsList), String[][].class); String[] bidsData = dataArray[bidsList.size() - 1]; BidsBo bidsBo = new BidsBo(); bidsBo.setP(new BigDecimal(bidsData[0])); bidsBo.setV(new BigDecimal(bidsData[1])); marketBo.setBids(bidsBo); } marketBo.setKey(key.replaceAll(exchangeName, "")); marketBo.setExchange(exchangeName); list.add(marketBo); } } // 计算利润百分比 private static BigDecimal calculateProfitPercentage(BigDecimal buyPrice, BigDecimal sellPrice) { BigDecimal profit = sellPrice.subtract(buyPrice); if (buyPrice.compareTo(BigDecimal.ZERO) == 0) { return BigDecimal.ZERO; // 防止除以零 } BigDecimal profitPercentage = profit.divide(buyPrice, 12, RoundingMode.DOWN).multiply(new BigDecimal(100)); return profitPercentage; } private void findProfitablePairs(List... exchangeLists) { Map> marketMap = new HashMap<>(); List marketDataOuts = Collections.synchronizedList(new ArrayList<>()); // 使用线程安全的集合 // 按币种名称和交易所分组市场数据 for (List exchangeList : exchangeLists) { for (MarketBo market : exchangeList) { String coinName = market.getKey(); String exchangeName = market.getExchange(); marketMap.computeIfAbsent(coinName, k -> new HashMap<>()) .put(exchangeName, market); } } // 过滤出至少在两个交易所都有的币种 marketMap.values().removeIf(exchangeMap -> exchangeMap.size() == 1); // 并行计算利润百分比 ExecutorService executor = Executors.newFixedThreadPool(Math.min(8, marketMap.size())); // 根据需要调整线程池大小 List> futures = new ArrayList<>(); // 获取当前时间,用于后续所有 MarketDataOut 的时间戳 LocalDateTime currentTime = LocalDateTime.now(); String formattedDateTime = currentTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 格式化时间字符串 for (Map.Entry> entry : marketMap.entrySet()) { String coinName = entry.getKey(); Map exchangeMap = entry.getValue(); String[] exchanges = exchangeMap.keySet().toArray(new String[0]); for (int i = 0; i < exchanges.length; i++) { MarketBo markets1 = exchangeMap.get(exchanges[i]); if (markets1.getBids() == null) continue; for (int j = 0; j < exchanges.length; j++) { if (i == j) continue; MarketBo markets2 = exchangeMap.get(exchanges[j]); if (markets2.getAsks() == null) continue; CompletableFuture future = CompletableFuture.runAsync(() -> { try { BigDecimal buyPrice = markets1.getBids().getP(); BigDecimal sellPrice = markets2.getAsks().getP(); BigDecimal profitPercentage = calculateProfitPercentage(buyPrice, sellPrice); // 只有在利润百分比大于0时才组装市场数据 if (profitPercentage.compareTo(BigDecimal.ZERO) > 0) { assembleMarketDataOut(coinName, markets1, markets2, profitPercentage, buyPrice, sellPrice, marketDataOuts, formattedDateTime); } } catch (Exception e) { e.printStackTrace(); // 输出异常信息 } }, executor); futures.add(future); // 收集所有 Future } } } // 等待所有任务完成 CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); executor.shutdown(); // 关闭线程池 pushWs(marketDataOuts); // 推送结果 } private static void assembleMarketDataOut(String coinName, MarketBo markets1, MarketBo markets2, BigDecimal profitPercentage, BigDecimal buyPrice, BigDecimal sellPrice, List marketDataOuts, String formattedDateTime) { MarketDataOut marketDataOut = new MarketDataOut(); marketDataOut.setBaseAsset(coinName.replaceAll("USDT","").toLowerCase()); // 设置基础资产 marketDataOut.setBuyingPlatform(markets1.getExchange()); // 设置买入平台 marketDataOut.setSellPlatform(markets2.getExchange()); // 设置卖出平台 marketDataOut.setSpread(profitPercentage.toString()); // 设置利润百分比 marketDataOut.setBuyPrice(buyPrice.toString()); // 设置买入价格 marketDataOut.setSellPrice(sellPrice.toString()); // 设置卖出价格 marketDataOut.setBuyNumber(markets1.getBids().getV().toString()); // 设置买入数量 marketDataOut.setSellNumber(markets2.getAsks().getV().toString()); // 设置卖出数量 marketDataOut.setBuyTotalPrice(markets1.getBids().getP().multiply(markets1.getBids().getV()).toString()); // 设置买入总价 marketDataOut.setSellTotalPrice(markets2.getAsks().getP().multiply(markets2.getAsks().getV()).toString()); // 设置卖出总价 marketDataOut.setServceTime(formattedDateTime); // 设置服务时间 marketDataOuts.add(marketDataOut); // 添加到输出列表 } private void pushWs(List marketDataOuts) { // String key = "MARKET_Date"; // // 创建 Gson 对象 Gson gson = new GsonBuilder().setPrettyPrinting().create(); // 将列表转换为 JSON 字符串 String json = gson.toJson(marketDataOuts); // RedisUtil.set(key,json); try { // 全体发送 wsServer.sendMessageToAll("11111111111111111111"); } catch (Exception e) { e.printStackTrace(); } } public void quotationCalculation(){ extracted(); findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); } public void scheduler(){ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 获取当前时间 Calendar now = Calendar.getInstance(); int currentHour = now.get(Calendar.HOUR_OF_DAY); int currentMinute = now.get(Calendar.MINUTE); // 计算距离下一个1点的小时数和分钟数 int hourToSchedule = 1; int minuteToSchedule = 0; if (currentHour > hourToSchedule || (currentHour == hourToSchedule && currentMinute >= minuteToSchedule)) { now.add(Calendar.DAY_OF_MONTH, 1); // 如果当前时间已过1点,则将调度时间设为明天的1点 } now.set(Calendar.HOUR_OF_DAY, hourToSchedule); now.set(Calendar.MINUTE, minuteToSchedule); now.set(Calendar.SECOND, 0); now.set(Calendar.MILLISECOND, 0); Date scheduledTime = now.getTime(); long initialDelay = scheduledTime.getTime() - System.currentTimeMillis(); // 调度任务 scheduler.scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toMillis(1), TimeUnit.MILLISECONDS); } Runnable task = new Runnable() { @Override public void run() { // 获取所有 Redis 键 Set setKeys = RedisUtil.keys("*"); keys = new ArrayList<>(setKeys); log.info("---------------------更新交易对完成---------------------"); } }; }