| | |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.google.common.reflect.TypeToken; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.GsonBuilder; |
| | | import lombok.Data; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang.StringUtils; |
| | | import org.example.dao.CurrencyMapper; |
| | | import org.example.pojo.Currency; |
| | | import org.example.pojo.Market; |
| | | import org.example.pojo.MarketDataOut; |
| | | import org.example.pojo.bo.AsksBo; |
| | | import org.example.pojo.bo.BidsBo; |
| | | import org.example.pojo.bo.MarketBo; |
| | | import org.example.server.CurrencySerivce; |
| | | import org.example.util.RedisUtil; |
| | | import org.example.websocket.server.WsServer; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | import org.yaml.snakeyaml.error.Mark; |
| | | import java.math.BigDecimal; |
| | | import java.math.RoundingMode; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.stream.Collectors; |
| | | import java.io.ByteArrayOutputStream; |
| | | |
| | | |
| | | /** |
| | | * @program: demo |
| | |
| | | * @create: 2024-07-16 15:23 |
| | | **/ |
| | | @Service |
| | | @Slf4j |
| | | public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> implements CurrencySerivce { |
| | | |
| | | private HashMap hashMap = new HashMap(); |
| | | private static final Gson gson = new Gson(); |
| | | private ArrayList<String> keys; |
| | | |
| | |
| | | List<MarketBo> bitgetList = new ArrayList<>(); |
| | | List<MarketBo> kucoinList = new ArrayList<>(); |
| | | |
| | | @Override |
| | | public void start() throws JsonProcessingException { |
| | | private final Lock syncCurrencyLock = new ReentrantLock(); |
| | | |
| | | @Autowired |
| | | private WsServer wsServer; |
| | | |
| | | private void extracted() { |
| | | ExecutorService executor = Executors.newFixedThreadPool(4); // 适当增加线程数以匹配处理的交易所数量 |
| | | |
| | | // 异步处理每个交易所的数据 |
| | | CompletableFuture<Void> mexcFuture = CompletableFuture.runAsync(() -> processMexc(), executor); |
| | | CompletableFuture<Void> gateFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("gate"), gateList, "gate"), executor); |
| | | CompletableFuture<Void> bitgetFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("bitget"), bitgetList, "bitget"), executor); |
| | | CompletableFuture<Void> kucoinFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("kucoin"), kucoinList, "kucoin"), executor); |
| | | |
| | | // 等待所有任务完成 |
| | | CompletableFuture.allOf(mexcFuture, gateFuture, bitgetFuture, kucoinFuture).join(); |
| | | |
| | | executor.shutdown(); // 关闭线程池 |
| | | } |
| | | |
| | | private void processMexc() { |
| | | Set<String> mexcSet = RedisUtil.keys("mexc"); |
| | | Set<String> gateSet = RedisUtil.keys("gate"); |
| | | Set<String> bitgetSet = RedisUtil.keys("bitget"); |
| | | Set<String> kucoinSet = RedisUtil.keys("kucoin"); |
| | | |
| | | |
| | | //这里做一个定时器,每10分钟更新一次 |
| | | Set<String> setKeys = RedisUtil.keys("*"); |
| | | keys = new ArrayList<>(setKeys); |
| | | for (String key : mexcSet) { |
| | | MarketBo marketBo = new MarketBo(); |
| | | |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | String asks = redisValueMap.get("asks").toString(); |
| | | String bids = redisValueMap.get("bids").toString(); |
| | | Object asksObj = redisValueMap.get("asks"); |
| | | Object bidsObj = redisValueMap.get("bids"); |
| | | |
| | | if(!asks.equals("[]") && !StringUtils.isEmpty(asks)){ |
| | | Gson gson = new Gson(); |
| | | if (asksObj instanceof List && !((List<?>) asksObj).isEmpty()) { |
| | | List<?> asksList = (List<?>) asksObj; |
| | | Map<?, ?> asksMap = (Map<?, ?>) asksList.get(0); |
| | | AsksBo asksBo = new AsksBo(); |
| | | Market[] asksDataArray = gson.fromJson(asks, Market[].class); |
| | | Market asksElement = asksDataArray[0]; |
| | | asksBo.setP(asksElement.getP()); |
| | | asksBo.setV(asksElement.getV()); |
| | | asksBo.setP(new BigDecimal(asksMap.get("p").toString())); |
| | | asksBo.setV(new BigDecimal(asksMap.get("v").toString())); |
| | | marketBo.setAsks(asksBo); |
| | | } |
| | | |
| | | if(!bids.equals("[]") && !StringUtils.isEmpty(bids)){ |
| | | if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) { |
| | | List<?> bidsList = (List<?>) bidsObj; |
| | | Map<?, ?> bidsMap = (Map<?, ?>) bidsList.get(bidsList.size() - 1); |
| | | BidsBo bidsBo = new BidsBo(); |
| | | Market[] bidsDataArray = gson.fromJson(bids, Market[].class); |
| | | Market bidsElement = bidsDataArray[bidsDataArray.length-1]; |
| | | bidsBo.setP(bidsElement.getP()); |
| | | bidsBo.setV(bidsElement.getV()); |
| | | bidsBo.setP(new BigDecimal(bidsMap.get("p").toString())); |
| | | bidsBo.setV(new BigDecimal(bidsMap.get("v").toString())); |
| | | marketBo.setBids(bidsBo); |
| | | } |
| | | marketBo.setKey(key.replaceAll("mexc","")); |
| | | |
| | | marketBo.setKey(key.replaceAll("mexc", "")); |
| | | marketBo.setExchange("mexc"); |
| | | mexcList.add(marketBo); |
| | | } |
| | | } |
| | | |
| | | |
| | | for (String key : gateSet) { |
| | | private void processMarketData(Set<String> set, List<MarketBo> list, String exchangeName) { |
| | | for (String key : set) { |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | String asks = redisValueMap.get("asks").toString(); |
| | | String bids = redisValueMap.get("bids").toString(); |
| | | |
| | | Object asksObj = redisValueMap.get("asks"); |
| | | Object bidsObj = redisValueMap.get("bids"); |
| | | MarketBo marketBo = new MarketBo(); |
| | | if(!asks.equals("[]") && !StringUtils.isEmpty(asks)){ |
| | | String[][] dataArray = gson.fromJson(asks, String[][].class); |
| | | |
| | | if (asksObj instanceof List && !((List<?>) asksObj).isEmpty()) { |
| | | List<?> asksList = (List<?>) asksObj; |
| | | String[][] dataArray = gson.fromJson(gson.toJson(asksList), String[][].class); |
| | | String[] asksData = dataArray[0]; |
| | | AsksBo asksBo = new AsksBo(); |
| | | asksBo.setP(new BigDecimal(asksData[0])); |
| | |
| | | marketBo.setAsks(asksBo); |
| | | } |
| | | |
| | | if(!bids.equals("[]") && !StringUtils.isEmpty(bids)){ |
| | | String[][] dataArray = gson.fromJson(bids, String[][].class); |
| | | String[] bidsData = dataArray[dataArray.length-1]; |
| | | if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) { |
| | | List<?> bidsList = (List<?>) bidsObj; |
| | | String[][] dataArray = gson.fromJson(gson.toJson(bidsList), String[][].class); |
| | | String[] bidsData = dataArray[bidsList.size() - 1]; |
| | | BidsBo bidsBo = new BidsBo(); |
| | | bidsBo.setP(new BigDecimal(bidsData[0])); |
| | | bidsBo.setV(new BigDecimal(bidsData[1])); |
| | | marketBo.setBids(bidsBo); |
| | | } |
| | | marketBo.setKey(key.replaceAll("gate","")); |
| | | marketBo.setExchange("gate"); |
| | | gateList.add(marketBo); |
| | | |
| | | marketBo.setKey(key.replaceAll(exchangeName, "")); |
| | | marketBo.setExchange(exchangeName); |
| | | list.add(marketBo); |
| | | } |
| | | |
| | | |
| | | for (String key : bitgetSet) { |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | String asks = redisValueMap.get("asks").toString(); |
| | | String bids = redisValueMap.get("bids").toString(); |
| | | |
| | | MarketBo marketBo = new MarketBo(); |
| | | if(!asks.equals("[]") && !StringUtils.isEmpty(asks)){ |
| | | String[][] dataArray = gson.fromJson(asks, String[][].class); |
| | | String[] asksData = dataArray[0]; |
| | | AsksBo asksBo = new AsksBo(); |
| | | asksBo.setP(new BigDecimal(asksData[0])); |
| | | asksBo.setV(new BigDecimal(asksData[1])); |
| | | marketBo.setAsks(asksBo); |
| | | } |
| | | |
| | | if(!bids.equals("[]") && !StringUtils.isEmpty(bids)){ |
| | | String[][] dataArray = gson.fromJson(bids, String[][].class); |
| | | String[] bidsData = dataArray[dataArray.length-1]; |
| | | BidsBo bidsBo = new BidsBo(); |
| | | bidsBo.setP(new BigDecimal(bidsData[0])); |
| | | bidsBo.setV(new BigDecimal(bidsData[1])); |
| | | marketBo.setBids(bidsBo); |
| | | } |
| | | marketBo.setKey(key.replaceAll("bitget","")); |
| | | marketBo.setExchange("bitget"); |
| | | bitgetList.add(marketBo); |
| | | } |
| | | |
| | | |
| | | for (String key : kucoinSet) { |
| | | String v = RedisUtil.get(key); |
| | | Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | String asks = redisValueMap.get("asks").toString(); |
| | | String bids = redisValueMap.get("bids").toString(); |
| | | |
| | | MarketBo marketBo = new MarketBo(); |
| | | if(!asks.equals("[]") && !StringUtils.isEmpty(asks)){ |
| | | String[][] dataArray = gson.fromJson(asks, String[][].class); |
| | | String[] asksData = dataArray[0]; |
| | | AsksBo asksBo = new AsksBo(); |
| | | asksBo.setP(new BigDecimal(asksData[0])); |
| | | asksBo.setV(new BigDecimal(asksData[1])); |
| | | marketBo.setAsks(asksBo); |
| | | } |
| | | |
| | | if(!bids.equals("[]") && !StringUtils.isEmpty(bids)){ |
| | | String[][] dataArray = gson.fromJson(bids, String[][].class); |
| | | String[] bidsData = dataArray[dataArray.length-1]; |
| | | BidsBo bidsBo = new BidsBo(); |
| | | bidsBo.setP(new BigDecimal(bidsData[0])); |
| | | bidsBo.setV(new BigDecimal(bidsData[1])); |
| | | marketBo.setBids(bidsBo); |
| | | } |
| | | marketBo.setKey(key.replaceAll("kucoin","")); |
| | | marketBo.setExchange("kucoin"); |
| | | kucoinList.add(marketBo); |
| | | } |
| | | testDemo(); |
| | | } |
| | | |
| | | // 计算利润百分比 |
| | |
| | | return profitPercentage; |
| | | } |
| | | |
| | | private static List<String> findProfitablePairs(List<MarketBo>... exchangeLists) { |
| | | List<String> result = new ArrayList<>(); |
| | | private void findProfitablePairs(List<MarketBo>... exchangeLists) { |
| | | Map<String, Map<String, MarketBo>> marketMap = new HashMap<>(); |
| | | List<MarketDataOut> marketDataOuts = Collections.synchronizedList(new ArrayList<>()); // 使用线程安全的集合 |
| | | |
| | | // 按币种名称和交易所分组市场数据 |
| | | for (List<MarketBo> exchangeList : exchangeLists) { |
| | | for (MarketBo market : exchangeList) { |
| | | String coinName = market.getKey(); |
| | | String exchangeName = market.getExchange(); // 假设有获取交易所名称的方法 getExchange() |
| | | |
| | | // 如果该币种在该交易所不存在,则创建新的列表 |
| | | if (!marketMap.containsKey(coinName)) { |
| | | marketMap.put(coinName, new HashMap<>()); |
| | | } |
| | | if (!marketMap.get(coinName).containsKey(exchangeName)) { |
| | | marketMap.get(coinName).put(exchangeName, new MarketBo()); |
| | | } |
| | | |
| | | // 将市场数据添加到对应的交易所列表中 |
| | | marketMap.get(coinName).put(exchangeName,market); |
| | | String exchangeName = market.getExchange(); |
| | | marketMap.computeIfAbsent(coinName, k -> new HashMap<>()) |
| | | .put(exchangeName, market); |
| | | } |
| | | } |
| | | marketMap = marketMap.entrySet().stream() |
| | | .filter(entry -> entry.getValue().size() != 1) |
| | | .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
| | | // 遍历每个币种 |
| | | for (String coinName : marketMap.keySet()) { |
| | | Map<String, MarketBo> exchangeMap = marketMap.get(coinName); |
| | | // 遍历每个交易所 |
| | | for (String exchangeName : exchangeMap.keySet()) { |
| | | MarketBo markets1 = exchangeMap.get(exchangeName); |
| | | // 在同一交易所中寻找有差价的交易对 |
| | | for (String exchangeName2 : exchangeMap.keySet()) { |
| | | MarketBo markets2 = exchangeMap.get(exchangeName2); |
| | | if(markets1.getExchange().equals(markets2.getExchange())){ |
| | | continue; |
| | | } |
| | | |
| | | if(null != markets1.getBids() && null != markets2.getAsks()){ |
| | | BigDecimal buyPrice = markets1.getBids().getP(); |
| | | BigDecimal sellPrice = markets2.getAsks().getP(); |
| | | // 计算利润百分比 |
| | | BigDecimal profitPercentage = calculateProfitPercentage(buyPrice, sellPrice); |
| | | // 过滤出至少在两个交易所都有的币种 |
| | | marketMap.values().removeIf(exchangeMap -> exchangeMap.size() == 1); |
| | | |
| | | // 如果利润大于零,则添加到结果列表 |
| | | if (profitPercentage.compareTo(BigDecimal.ZERO) > 0) { |
| | | // 准备输出字符串 |
| | | String pair = String.format("%s: 在 %s 以 %s 买入,在 %s 以 %s 卖出,利润为 %.4f%%", |
| | | coinName, markets1.getExchange(), buyPrice, markets2.getExchange(), sellPrice, profitPercentage); |
| | | result.add(pair); |
| | | // 并行计算利润百分比 |
| | | ExecutorService executor = Executors.newFixedThreadPool(Math.min(8, marketMap.size())); // 根据需要调整线程池大小 |
| | | |
| | | List<CompletableFuture<Void>> futures = new ArrayList<>(); |
| | | |
| | | // 获取当前时间,用于后续所有 MarketDataOut 的时间戳 |
| | | LocalDateTime currentTime = LocalDateTime.now(); |
| | | String formattedDateTime = currentTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 格式化时间字符串 |
| | | |
| | | for (Map.Entry<String, Map<String, MarketBo>> entry : marketMap.entrySet()) { |
| | | String coinName = entry.getKey(); |
| | | Map<String, MarketBo> exchangeMap = entry.getValue(); |
| | | String[] exchanges = exchangeMap.keySet().toArray(new String[0]); |
| | | |
| | | for (int i = 0; i < exchanges.length; i++) { |
| | | MarketBo markets1 = exchangeMap.get(exchanges[i]); |
| | | if (markets1.getBids() == null) continue; |
| | | |
| | | for (int j = 0; j < exchanges.length; j++) { |
| | | if (i == j) continue; |
| | | |
| | | MarketBo markets2 = exchangeMap.get(exchanges[j]); |
| | | if (markets2.getAsks() == null) continue; |
| | | |
| | | CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { |
| | | try { |
| | | BigDecimal buyPrice = markets1.getBids().getP(); |
| | | BigDecimal sellPrice = markets2.getAsks().getP(); |
| | | BigDecimal profitPercentage = calculateProfitPercentage(buyPrice, sellPrice); |
| | | |
| | | // 只有在利润百分比大于0时才组装市场数据 |
| | | if (profitPercentage.compareTo(BigDecimal.ZERO) > 0) { |
| | | assembleMarketDataOut(coinName, markets1, markets2, profitPercentage, buyPrice, sellPrice, marketDataOuts, formattedDateTime); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); // 输出异常信息 |
| | | } |
| | | } |
| | | }, executor); |
| | | |
| | | futures.add(future); // 收集所有 Future |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | |
| | | // 等待所有任务完成 |
| | | CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); |
| | | allFutures.join(); |
| | | |
| | | executor.shutdown(); // 关闭线程池 |
| | | |
| | | pushWs(marketDataOuts); // 推送结果 |
| | | } |
| | | |
| | | private static void assembleMarketDataOut(String coinName, MarketBo markets1, MarketBo markets2, BigDecimal profitPercentage, BigDecimal buyPrice, BigDecimal sellPrice, List<MarketDataOut> marketDataOuts, String formattedDateTime) { |
| | | MarketDataOut marketDataOut = new MarketDataOut(); |
| | | marketDataOut.setBaseAsset(coinName.replaceAll("USDT","").toLowerCase()); // 设置基础资产 |
| | | marketDataOut.setBuyingPlatform(markets1.getExchange()); // 设置买入平台 |
| | | marketDataOut.setSellPlatform(markets2.getExchange()); // 设置卖出平台 |
| | | marketDataOut.setSpread(profitPercentage.toString()); // 设置利润百分比 |
| | | marketDataOut.setBuyPrice(buyPrice.toString()); // 设置买入价格 |
| | | marketDataOut.setSellPrice(sellPrice.toString()); // 设置卖出价格 |
| | | marketDataOut.setBuyNumber(markets1.getBids().getV().toString()); // 设置买入数量 |
| | | marketDataOut.setSellNumber(markets2.getAsks().getV().toString()); // 设置卖出数量 |
| | | marketDataOut.setBuyTotalPrice(markets1.getBids().getP().multiply(markets1.getBids().getV()).toString()); // 设置买入总价 |
| | | marketDataOut.setSellTotalPrice(markets2.getAsks().getP().multiply(markets2.getAsks().getV()).toString()); // 设置卖出总价 |
| | | marketDataOut.setServceTime(formattedDateTime); // 设置服务时间 |
| | | marketDataOuts.add(marketDataOut); // 添加到输出列表 |
| | | } |
| | | |
| | | |
| | | public void testDemo(){ |
| | | List<String> profitablePairs = findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); |
| | | |
| | | // 输出结果 |
| | | for (String pair : profitablePairs) { |
| | | System.out.println(pair); |
| | | private void pushWs(List<MarketDataOut> marketDataOuts) { |
| | | // String key = "MARKET_Date"; |
| | | // // 创建 Gson 对象 |
| | | Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| | | // 将列表转换为 JSON 字符串 |
| | | String json = gson.toJson(marketDataOuts); |
| | | // RedisUtil.set(key,json); |
| | | try { |
| | | // 全体发送 |
| | | wsServer.sendMessageToAll(json); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | public void quotationCalculation(){ |
| | | extracted(); |
| | | findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); |
| | | } |
| | | |
| | | public void scheduler(){ |
| | | ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); |
| | | |
| | | // 获取当前时间 |
| | | Calendar now = Calendar.getInstance(); |
| | | int currentHour = now.get(Calendar.HOUR_OF_DAY); |
| | | int currentMinute = now.get(Calendar.MINUTE); |
| | | |
| | | // 计算距离下一个1点的小时数和分钟数 |
| | | int hourToSchedule = 1; |
| | | int minuteToSchedule = 0; |
| | | if (currentHour > hourToSchedule || (currentHour == hourToSchedule && currentMinute >= minuteToSchedule)) { |
| | | now.add(Calendar.DAY_OF_MONTH, 1); // 如果当前时间已过1点,则将调度时间设为明天的1点 |
| | | } |
| | | now.set(Calendar.HOUR_OF_DAY, hourToSchedule); |
| | | now.set(Calendar.MINUTE, minuteToSchedule); |
| | | now.set(Calendar.SECOND, 0); |
| | | now.set(Calendar.MILLISECOND, 0); |
| | | |
| | | Date scheduledTime = now.getTime(); |
| | | long initialDelay = scheduledTime.getTime() - System.currentTimeMillis(); |
| | | |
| | | // 调度任务 |
| | | scheduler.scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toMillis(1), TimeUnit.MILLISECONDS); |
| | | } |
| | | |
| | | Runnable task = new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | |
| | | // 获取所有 Redis 键 |
| | | Set<String> setKeys = RedisUtil.keys("*"); |
| | | keys = new ArrayList<>(setKeys); |
| | | log.info("---------------------更新交易对完成---------------------"); |
| | | } |
| | | }; |
| | | } |