| | |
| | | package org.example.server.impl; |
| | | |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.example.websocket.server.WsServer; |
| | | import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.databind.JsonNode; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.google.common.reflect.TypeToken; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.GsonBuilder; |
| | | import lombok.Data; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.example.dao.CurrencyMapper; |
| | | import org.example.pojo.Currency; |
| | | import org.example.pojo.Market; |
| | | import org.example.pojo.MarketDataOut; |
| | | import org.example.pojo.bo.AsksBo; |
| | | import org.example.pojo.bo.BidsBo; |
| | | import org.example.pojo.bo.MarketBo; |
| | | import org.example.server.CurrencySerivce; |
| | | import org.example.util.RedisUtil; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import org.yaml.snakeyaml.error.Mark; |
| | | import java.math.BigDecimal; |
| | | import java.math.RoundingMode; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.stream.Collectors; |
| | | import java.io.ByteArrayOutputStream; |
| | | |
| | | |
| | | /** |
| | | * @program: demo |
| | |
| | | * @create: 2024-07-16 15:23 |
| | | **/ |
| | | @Service |
| | | @Slf4j |
| | | public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> implements CurrencySerivce { |
| | | |
| | | private HashMap hashMap = new HashMap(); |
| | | private static final Gson gson = new Gson(); |
| | | private Set<String> keys; |
| | | @Override |
| | | public void start() throws JsonProcessingException { |
| | | Set<String> mexcSet = RedisUtil.keys("mexc"); |
| | | Set<String> gateSet = RedisUtil.keys("gate"); |
| | | Set<String> bitgetSet = RedisUtil.keys("bitget"); |
| | | Set<String> kucoinSet = RedisUtil.keys("kucoin"); |
| | | private ArrayList<String> keys; |
| | | |
| | | //这里做一个定时器,每10秒更新一次 |
| | | keys = RedisUtil.keys("*"); |
| | | List<MarketBo> mexcList = new ArrayList<>(); |
| | | List<MarketBo> gateList = new ArrayList<>(); |
| | | List<MarketBo> bitgetList = new ArrayList<>(); |
| | | List<MarketBo> kucoinList = new ArrayList<>(); |
| | | |
| | | HashMap<String,Map<String, Object>> mexcMap = new HashMap<>(); |
| | | for (String key : mexcSet) { |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | mexcMap.put(key.replaceAll("mexc",""),redisValueMap); |
| | | } |
| | | private final Lock syncCurrencyLock = new ReentrantLock(); |
| | | |
| | | HashMap<String,List<HashMap<String,String>>> asksHashMapList = new HashMap<>(); |
| | | HashMap<String,Map<String, Object>> gateMap = new HashMap<>(); |
| | | for (String key : gateSet) { |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | String asks = redisValueMap.get("asks").toString(); |
| | | String bids = redisValueMap.get("bids").toString(); |
| | | @Autowired |
| | | private WsServer wsServer; |
| | | |
| | | // 使用 Jackson 解析 JSON |
| | | ObjectMapper objectMapper = new ObjectMapper(); |
| | | JsonNode asksNode = objectMapper.readTree(asks); |
| | | JsonNode bidsNode = objectMapper.readTree(bids); |
| | | private void extracted() { |
| | | ExecutorService executor = Executors.newFixedThreadPool(20); // 适当增加线程数以匹配处理的交易所数量 |
| | | |
| | | // 将 "asks" 数组转换为 List<List<String>> |
| | | List<HashMap<String,String>> asksList = new ArrayList<>(); |
| | | for (JsonNode arrayNode : asksNode) { |
| | | HashMap<String,String> asksMap = new HashMap<>(); |
| | | asksMap.put("p",arrayNode.get(0).toString()); |
| | | asksMap.put("v",arrayNode.get(1).toString()); |
| | | asksList.add(asksMap); |
| | | } |
| | | for (JsonNode arrayNode : bidsNode) { |
| | | HashMap<String,String> asksMap = new HashMap<>(); |
| | | asksMap.put("p",arrayNode.get(0).toString()); |
| | | asksMap.put("v",arrayNode.get(1).toString()); |
| | | asksList.add(asksMap); |
| | | } |
| | | gateMap.put(key.replaceAll("gate",""),redisValueMap); |
| | | } |
| | | // 异步处理每个交易所的数据 |
| | | CompletableFuture<Void> mexcFuture = checkAndProcess("mexc", mexcList, executor); |
| | | CompletableFuture<Void> gateFuture = checkAndProcess("gate", gateList, executor); |
| | | CompletableFuture<Void> bitgetFuture = checkAndProcess("bitget", bitgetList, executor); |
| | | CompletableFuture<Void> kucoinFuture = checkAndProcess("kucoin", kucoinList, executor); |
| | | |
| | | // HashMap<String,Map<String, Object>> bitgetMap = new HashMap<>(); |
| | | // for (String key : mexcSet) { |
| | | // String v = RedisUtil.get(key); |
| | | // Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | // mexcMap.put(key.replaceAll("bitget",""),redisValueMap); |
| | | // } |
| | | // |
| | | // HashMap<String,Map<String, Object>> kucoinMap = new HashMap<>(); |
| | | // for (String key : mexcSet) { |
| | | // String v = RedisUtil.get(key); |
| | | // Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | // mexcMap.put(key.replaceAll("kucoin",""),redisValueMap); |
| | | // } |
| | | // 等待所有任务完成 |
| | | CompletableFuture.allOf(mexcFuture, gateFuture, bitgetFuture, kucoinFuture).join(); |
| | | |
| | | executor.shutdown(); // 关闭线程池 |
| | | } |
| | | |
| | | private CompletableFuture<Void> checkAndProcess(String exchangeName, List<MarketBo> marketList, ExecutorService executor) { |
| | | return CompletableFuture.runAsync(() -> { |
| | | Set<String> keys = RedisUtil.keys(exchangeName); |
| | | if (keys != null && !keys.isEmpty()) { |
| | | processMarketData(keys, marketList, exchangeName); |
| | | } |
| | | }, executor); |
| | | } |
| | | |
| | | private void processMexc() { |
| | | Set<String> mexcSet = RedisUtil.keys("mexc"); |
| | | |
| | | for (String key : mexcSet) { |
| | | MarketBo marketBo = new MarketBo(); |
| | | |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | Object asksObj = redisValueMap.get("asks"); |
| | | Object bidsObj = redisValueMap.get("bids"); |
| | | |
| | | if (asksObj instanceof List && !((List<?>) asksObj).isEmpty()) { |
| | | List<?> asksList = (List<?>) asksObj; |
| | | Map<?, ?> asksMap = (Map<?, ?>) asksList.get(0); |
| | | AsksBo asksBo = new AsksBo(); |
| | | asksBo.setP(new BigDecimal(asksMap.get("p").toString())); |
| | | asksBo.setV(new BigDecimal(asksMap.get("v").toString())); |
| | | marketBo.setAsks(asksBo); |
| | | } |
| | | |
| | | if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) { |
| | | List<?> bidsList = (List<?>) bidsObj; |
| | | Map<?, ?> bidsMap = (Map<?, ?>) bidsList.get(0); |
| | | BidsBo bidsBo = new BidsBo(); |
| | | bidsBo.setP(new BigDecimal(bidsMap.get("p").toString())); |
| | | bidsBo.setV(new BigDecimal(bidsMap.get("v").toString())); |
| | | marketBo.setBids(bidsBo); |
| | | } |
| | | |
| | | marketBo.setKey(key.replaceAll("mexc", "")); |
| | | marketBo.setExchange("mexc"); |
| | | mexcList.add(marketBo); |
| | | } |
| | | } |
| | | |
| | | private void processMarketData(Set<String> set, List<MarketBo> list, String exchangeName) { |
| | | ArrayList<String> kyes = new ArrayList<>(set); |
| | | List<String> mget = RedisUtil.mget(kyes); |
| | | |
| | | Map<String, String> resultMap = new HashMap<>(); |
| | | for (int i = 0; i < kyes.size(); i++) { |
| | | resultMap.put(kyes.get(i), mget.get(i)); |
| | | } |
| | | |
| | | for (Map.Entry<String, String> entry : resultMap.entrySet()) { |
| | | String v = entry.getValue(); |
| | | String key = entry.getKey(); |
| | | |
| | | MarketBo marketBo = new MarketBo(); |
| | | |
| | | Map<String, Map<String,String>> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Map<String,String>>>() {}.getType()); |
| | | Map<String,String> asksObj = redisValueMap.get("asks"); |
| | | Map<String,String> bidsObj = redisValueMap.get("bids"); |
| | | |
| | | try { |
| | | if(StringUtils.isNotEmpty(asksObj.get("p")) && StringUtils.isNotEmpty(asksObj.get("v")) && |
| | | StringUtils.isNotEmpty(bidsObj.get("p")) && StringUtils.isNotEmpty(bidsObj.get("v"))) { |
| | | AsksBo asksBo = new AsksBo(); |
| | | asksBo.setP(new BigDecimal(asksObj.get("p").toString())); |
| | | asksBo.setV(new BigDecimal(asksObj.get("v").toString())); |
| | | marketBo.setAsks(asksBo); |
| | | |
| | | BidsBo bidsBo = new BidsBo(); |
| | | bidsBo.setP(new BigDecimal(bidsObj.get("p").toString())); |
| | | bidsBo.setV(new BigDecimal(bidsObj.get("v").toString())); |
| | | marketBo.setBids(bidsBo); |
| | | |
| | | |
| | | marketBo.setKey(key.replaceAll(exchangeName, "")); |
| | | marketBo.setExchange(exchangeName); |
| | | list.add(marketBo); |
| | | } |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | // 计算利润百分比 |
| | | private static BigDecimal calculateProfitPercentage(BigDecimal buyPrice, BigDecimal sellPrice) { |
| | | BigDecimal profit = sellPrice.subtract(buyPrice); |
| | | if (buyPrice.compareTo(BigDecimal.ZERO) == 0) { |
| | | return BigDecimal.ZERO; // 防止除以零 |
| | | } |
| | | BigDecimal profitPercentage = profit.divide(buyPrice, 12, RoundingMode.DOWN).multiply(new BigDecimal(100)); |
| | | return profitPercentage; |
| | | } |
| | | |
| | | private void findProfitablePairs(List<MarketBo>... exchangeLists) { |
| | | Map<String, Map<String, MarketBo>> marketMap = new HashMap<>(); |
| | | List<MarketDataOut> marketDataOuts = Collections.synchronizedList(new ArrayList<>()); // 使用线程安全的集合 |
| | | |
| | | // 按币种名称和交易所分组市场数据 |
| | | for (List<MarketBo> exchangeList : exchangeLists) { |
| | | for (MarketBo market : exchangeList) { |
| | | String coinName = market.getKey(); |
| | | String exchangeName = market.getExchange(); |
| | | marketMap.computeIfAbsent(coinName, k -> new HashMap<>()) |
| | | .put(exchangeName, market); |
| | | } |
| | | } |
| | | |
| | | // 过滤出至少在两个交易所都有的币种 |
| | | marketMap.values().removeIf(exchangeMap -> exchangeMap.size() == 1); |
| | | |
| | | // 并行计算利润百分比 |
| | | ExecutorService executor = Executors.newFixedThreadPool(Math.min(8, marketMap.size())); // 根据需要调整线程池大小 |
| | | |
| | | List<CompletableFuture<Void>> futures = new ArrayList<>(); |
| | | |
| | | // 获取当前时间,用于后续所有 MarketDataOut 的时间戳 |
| | | LocalDateTime currentTime = LocalDateTime.now(); |
| | | String formattedDateTime = currentTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 格式化时间字符串 |
| | | |
| | | for (Map.Entry<String, Map<String, MarketBo>> entry : marketMap.entrySet()) { |
| | | String coinName = entry.getKey(); |
| | | Map<String, MarketBo> exchangeMap = entry.getValue(); |
| | | String[] exchanges = exchangeMap.keySet().toArray(new String[0]); |
| | | |
| | | for (int i = 0; i < exchanges.length; i++) { |
| | | MarketBo markets1 = exchangeMap.get(exchanges[i]); |
| | | if (markets1.getBids() == null) continue; |
| | | |
| | | for (int j = 0; j < exchanges.length; j++) { |
| | | |
| | | |
| | | MarketBo markets2 = exchangeMap.get(exchanges[j]); |
| | | |
| | | if(markets1.getExchange().equals(markets2.getExchange())){ |
| | | continue; |
| | | } |
| | | |
| | | if (markets2.getAsks() == null) continue; |
| | | |
| | | CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { |
| | | try { |
| | | BigDecimal buyPrice = markets1.getBids().getP(); |
| | | BigDecimal sellPrice = markets2.getAsks().getP(); |
| | | BigDecimal profitPercentage = calculateProfitPercentage(buyPrice, sellPrice); |
| | | |
| | | // 只有在利润百分比大于0时才组装市场数据 |
| | | if (profitPercentage.compareTo(BigDecimal.ZERO) > 0) { |
| | | assembleMarketDataOut(coinName, markets1, markets2, profitPercentage, buyPrice, sellPrice, marketDataOuts, formattedDateTime); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); // 输出异常信息 |
| | | } |
| | | }, executor); |
| | | |
| | | futures.add(future); // 收集所有 Future |
| | | } |
| | | } |
| | | } |
| | | |
| | | // 等待所有任务完成 |
| | | CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); |
| | | allFutures.join(); |
| | | |
| | | executor.shutdown(); // 关闭线程池 |
| | | mexcList.clear(); |
| | | gateList.clear(); |
| | | bitgetList.clear(); |
| | | kucoinList.clear(); |
| | | pushWs(marketDataOuts); // 推送结果 |
| | | } |
| | | |
| | | private static void assembleMarketDataOut(String coinName, MarketBo markets1, MarketBo markets2, BigDecimal profitPercentage, BigDecimal buyPrice, BigDecimal sellPrice, List<MarketDataOut> marketDataOuts, String formattedDateTime) { |
| | | if(coinName.contains(String.valueOf("USDT"))){ |
| | | MarketDataOut marketDataOut = new MarketDataOut(); |
| | | marketDataOut.setBaseAsset(coinName.replaceAll("USDT","").toLowerCase().toUpperCase()); // 设置基础资产 |
| | | marketDataOut.setBuyingPlatform(capitalizeFirstLetter(markets1.getExchange())); // 设置买入平台,首字母大写 |
| | | marketDataOut.setSellPlatform(capitalizeFirstLetter(markets2.getExchange())); // 设置卖出平台,首字母大写 |
| | | marketDataOut.setSpread(profitPercentage.setScale(4, RoundingMode.DOWN).toPlainString()); // 设置利润百分比 |
| | | marketDataOut.setBuyPrice(buyPrice.toPlainString()); // 设置买入价格 |
| | | marketDataOut.setSellPrice(sellPrice.toPlainString()); // 设置卖出价格 |
| | | marketDataOut.setBuyNumber(markets1.getBids().getV().setScale(4, RoundingMode.HALF_UP).toPlainString()); // 设置买入数量 |
| | | marketDataOut.setSellNumber(markets2.getAsks().getV().setScale(4, RoundingMode.HALF_UP).toPlainString()); // 设置卖出数量 |
| | | marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0, RoundingMode.HALF_UP).toPlainString()); // 设置买入总价 |
| | | marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.HALF_UP).toPlainString()); // 设置卖出总价 |
| | | marketDataOut.setServceTime(formattedDateTime); // 设置服务时间 |
| | | marketDataOut.setBuyAndSell(marketDataOut.getBaseAsset()+marketDataOut.getBuyingPlatform()+marketDataOut.getSellPlatform()); |
| | | marketDataOuts.add(marketDataOut); // 添加到输出列表 |
| | | }else{ |
| | | System.out.println(coinName); |
| | | } |
| | | } |
| | | |
| | | public static String capitalizeFirstLetter(String word) { |
| | | if (word == null || word.isEmpty()) { |
| | | return word; |
| | | } |
| | | return Character.toUpperCase(word.charAt(0)) + word.substring(1); |
| | | } |
| | | |
| | | |
| | | private void pushWs(List<MarketDataOut> marketDataOuts) { |
| | | // String key = "MARKET_Date"; |
| | | // // 创建 Gson 对象 |
| | | Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| | | // 将列表转换为 JSON 字符串 |
| | | String json = gson.toJson(marketDataOuts); |
| | | // RedisUtil.set(key,json); |
| | | try { |
| | | // 全体发送 |
| | | wsServer.sendMessageToAll(json); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | public void quotationCalculation(){ |
| | | extracted(); |
| | | // 检查列表是否为空并调用 findProfitablePairs 方法 |
| | | if (!mexcList.isEmpty() || !gateList.isEmpty() || !bitgetList.isEmpty() || !kucoinList.isEmpty()) { |
| | | findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); |
| | | } |
| | | } |
| | | |
| | | public void scheduler(){ |
| | | ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); |
| | | |
| | | // 获取当前时间 |
| | | Calendar now = Calendar.getInstance(); |
| | | int currentHour = now.get(Calendar.HOUR_OF_DAY); |
| | | int currentMinute = now.get(Calendar.MINUTE); |
| | | |
| | | // 计算距离下一个1点的小时数和分钟数 |
| | | int hourToSchedule = 1; |
| | | int minuteToSchedule = 0; |
| | | if (currentHour > hourToSchedule || (currentHour == hourToSchedule && currentMinute >= minuteToSchedule)) { |
| | | now.add(Calendar.DAY_OF_MONTH, 1); // 如果当前时间已过1点,则将调度时间设为明天的1点 |
| | | } |
| | | now.set(Calendar.HOUR_OF_DAY, hourToSchedule); |
| | | now.set(Calendar.MINUTE, minuteToSchedule); |
| | | now.set(Calendar.SECOND, 0); |
| | | now.set(Calendar.MILLISECOND, 0); |
| | | |
| | | Date scheduledTime = now.getTime(); |
| | | long initialDelay = scheduledTime.getTime() - System.currentTimeMillis(); |
| | | |
| | | // 调度任务 |
| | | scheduler.scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toMillis(1), TimeUnit.MILLISECONDS); |
| | | } |
| | | |
| | | Runnable task = new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | |
| | | // 获取所有 Redis 键 |
| | | Set<String> setKeys = RedisUtil.keys("*"); |
| | | keys = new ArrayList<>(setKeys); |
| | | log.info("---------------------更新交易对完成---------------------"); |
| | | } |
| | | }; |
| | | } |