| | |
| | | ExecutorService executor = Executors.newFixedThreadPool(20); // 适当增加线程数以匹配处理的交易所数量 |
| | | |
| | | // 异步处理每个交易所的数据 |
| | | CompletableFuture<Void> mexcFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("mexc"), mexcList, "mexc"), executor); |
| | | CompletableFuture<Void> gateFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("gate"), gateList, "gate"), executor); |
| | | CompletableFuture<Void> bitgetFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("bitget"), bitgetList, "bitget"), executor); |
| | | CompletableFuture<Void> kucoinFuture = CompletableFuture.runAsync(() -> processMarketData(RedisUtil.keys("kucoin"), kucoinList, "kucoin"), executor); |
| | | CompletableFuture<Void> mexcFuture = checkAndProcess("mexc", mexcList, executor); |
| | | CompletableFuture<Void> gateFuture = checkAndProcess("gate", gateList, executor); |
| | | CompletableFuture<Void> bitgetFuture = checkAndProcess("bitget", bitgetList, executor); |
| | | CompletableFuture<Void> kucoinFuture = checkAndProcess("kucoin", kucoinList, executor); |
| | | |
| | | // 等待所有任务完成 |
| | | CompletableFuture.allOf(mexcFuture, gateFuture, bitgetFuture, kucoinFuture).join(); |
| | | |
| | | executor.shutdown(); // 关闭线程池 |
| | | } |
| | | |
| | | private CompletableFuture<Void> checkAndProcess(String exchangeName, List<MarketBo> marketList, ExecutorService executor) { |
| | | return CompletableFuture.runAsync(() -> { |
| | | Set<String> keys = RedisUtil.keys(exchangeName); |
| | | if (keys != null && !keys.isEmpty()) { |
| | | processMarketData(keys, marketList, exchangeName); |
| | | } |
| | | }, executor); |
| | | } |
| | | |
| | | private void processMexc() { |
| | |
| | | |
| | | public void quotationCalculation(){ |
| | | extracted(); |
| | | findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); // 请确保这些变量有定义和赋值 |
| | | // 检查列表是否为空并调用 findProfitablePairs 方法 |
| | | if (!mexcList.isEmpty() || !gateList.isEmpty() || !bitgetList.isEmpty() || !kucoinList.isEmpty()) { |
| | | findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); |
| | | } |
| | | } |
| | | |
| | | public void scheduler(){ |
| | |
| | | /** |
| | | * 同步bitget交易所交易对 |
| | | */ |
| | | @Scheduled(cron = "0 0/45 * * * ?") |
| | | @Scheduled(cron = "0 0/30 * * * ?") |
| | | // @Scheduled(cron = "0/10 * * * * ?") |
| | | public void syncCurrency() { |
| | | // 使用Lock来确保同步 |
| | |
| | | |
| | | getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-"))); |
| | | |
| | | |
| | | List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"bitget")); |
| | | |
| | | // 删除已经下架的币种 |
| | | Set<String> symbolSet = getList.stream().map(CurrencyBitgetBo::getSymbol).collect(Collectors.toSet()); |
| | | List<Currency> removeList = dbList.stream() |
| | | .filter(currency -> !symbolSet.contains(currency.getSymbol())) |
| | |
| | | if(CollectionUtils.isNotEmpty(removeList)){ |
| | | removeList.forEach(f->{ |
| | | RedisUtil.delete("bitget"+f.getSymbol()); |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSymbol,f.getSymbol())); |
| | | }); |
| | | } |
| | | |
| | | // 获取数据库中已有的symbol列表 |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"bitget")); |
| | | |
| | | // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 |
| | | Set<String> loclSymbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | List<Currency> saveList = getList.stream() |
| | | .filter(currency -> !loclSymbolSet.contains(currency.getSymbol())) |
| | | .map(currency -> { |
| | | Currency newCurrency = new Currency(); |
| | | newCurrency.setSymbol(currency.getSymbol()); |
| | |
| | | return newCurrency; |
| | | }) |
| | | .collect(Collectors.toList()); |
| | | |
| | | // 批量保存新增数据到数据库 |
| | | if (CollectionUtils.isNotEmpty(saveList)) { |
| | | currencyService.saveBatch(saveList); |
| | |
| | | /** |
| | | * 同步gate交易所交易对 |
| | | */ |
| | | @Scheduled(cron = "0 0/40 * * * ?") |
| | | @Scheduled(cron = "0 0/35 * * * ?") |
| | | // @Scheduled(cron = "0/10 * * * * ?") |
| | | public void syncCurrency() { |
| | | // 使用Lock来确保同步 |
| | |
| | | getList.parallelStream().forEach(person -> person.setId(StringUtils.remove(person.getId(), "_"))); |
| | | |
| | | List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate")); |
| | | Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | |
| | | // 删除已经下架的币种 |
| | | Set<String> symbolSet = getList.stream().map(CurrencyGateBo::getId).collect(Collectors.toSet()); |
| | | List<Currency> removeList = dbList.stream() |
| | | .filter(currency -> !symbolSet.contains(currency.getSymbol())) |
| | | .collect(Collectors.toList()); |
| | |
| | | if(CollectionUtils.isNotEmpty(removeList)){ |
| | | removeList.forEach(f->{ |
| | | RedisUtil.delete("gate"+f.getSymbol()); |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSymbol,f.getSymbol())); |
| | | }); |
| | | } |
| | | |
| | | |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"gate")); |
| | | |
| | | // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 |
| | | Set<String> loclSymbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | List<Currency> saveList = getList.stream() |
| | | .filter(currency -> !loclSymbolSet.contains(currency.getId())) |
| | | .map(currency -> { |
| | | Currency newCurrency = new Currency(); |
| | | newCurrency.setSymbol(currency.getId()); |
| | |
| | | return newCurrency; |
| | | }) |
| | | .collect(Collectors.toList()); |
| | | |
| | | // 批量保存新增数据到数据库 |
| | | if(CollectionUtils.isNotEmpty(saveList)){ |
| | | currencyService.saveBatch(saveList); |
| | |
| | | import org.apache.commons.lang.StringUtils; |
| | | import org.example.common.MarketDataClient; |
| | | import org.example.pojo.Currency; |
| | | import org.example.pojo.bo.CurrencyGateBo; |
| | | import org.example.pojo.bo.CurrencyKucoin; |
| | | import org.example.server.impl.CurrencySerivceImpl; |
| | | import org.example.util.ConverterUtil; |
| | |
| | | /** |
| | | * 同步kucoin交易所交易对 |
| | | */ |
| | | @Scheduled(cron = "0 0/35 * * * ?") |
| | | @Scheduled(cron = "0 0/40 * * * ?") |
| | | // @Scheduled(cron = "0/10 * * * * ?") |
| | | public void syncCurrency() { |
| | | // 使用Lock来确保同步 |
| | |
| | | }.getType()); |
| | | |
| | | getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-"))); |
| | | |
| | | List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin")); |
| | | Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | |
| | | // 删除已经下架的币种 |
| | | Set<String> symbolSet = getList.stream().map(CurrencyKucoin::getSymbol).collect(Collectors.toSet()); |
| | | List<Currency> removeList = dbList.stream() |
| | | .filter(currency -> !symbolSet.contains(currency.getSymbol())) |
| | | .collect(Collectors.toList()); |
| | |
| | | if(CollectionUtils.isNotEmpty(removeList)){ |
| | | removeList.forEach(f->{ |
| | | RedisUtil.delete("kucoin"+f.getSymbol()); |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSymbol,f.getSymbol())); |
| | | }); |
| | | } |
| | | |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"kucoin")); |
| | | |
| | | |
| | | |
| | | // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 |
| | | Set<String> loclSymbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | List<Currency> saveList = getList.stream() |
| | | .filter(currency -> !loclSymbolSet.contains(currency.getSymbol())) |
| | | .map(currency -> { |
| | | Currency newCurrency = new Currency(); |
| | | newCurrency.setSymbol(currency.getSymbol()); |
| | |
| | | import org.example.common.MexcMarketDataClient; |
| | | import org.example.pojo.Currency; |
| | | import org.example.pojo.ExchangeInfo; |
| | | import org.example.pojo.bo.CurrencyKucoin; |
| | | import org.example.pojo.bo.CurrencyMexcBo; |
| | | import org.example.server.impl.CurrencySerivceImpl; |
| | | import org.example.util.ConverterUtil; |
| | |
| | | /** |
| | | * 同步mexc交易所交易对 |
| | | */ |
| | | @Scheduled(cron = "0 0/30 * * * ?") |
| | | @Scheduled(cron = "0 0/45 * * * ?") |
| | | // @Scheduled(cron = "0/10 * * * * ?") |
| | | public void syncCurrency() { |
| | | // 使用Lock来确保同步 |
| | |
| | | Gson gson = new Gson(); |
| | | List<CurrencyMexcBo> getList = gson.fromJson(symbolsJson, new TypeToken<List<CurrencyMexcBo>>() { |
| | | }.getType()); |
| | | |
| | | // 获取数据库中已有的symbol列表 |
| | | List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc")); |
| | | Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | |
| | | // 删除已经下架的币种 |
| | | Set<String> symbolSet = getList.stream().map(CurrencyMexcBo::getSymbol).collect(Collectors.toSet()); |
| | | List<Currency> removeList = dbList.stream() |
| | | .filter(currency -> !symbolSet.contains(currency.getSymbol())) |
| | | .collect(Collectors.toList()); |
| | |
| | | if(CollectionUtils.isNotEmpty(removeList)){ |
| | | removeList.forEach(f->{ |
| | | RedisUtil.delete("mexc"+f.getSymbol()); |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSymbol,f.getSymbol())); |
| | | }); |
| | | } |
| | | currencyService.remove(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"mexc")); |
| | | List<Currency> currencies = ConverterUtil.convertToList(getList, Currency.class); |
| | | |
| | | |
| | | |
| | | |
| | | // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 |
| | | Set<String> loclSymbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | List<CurrencyMexcBo> saveList = getList.stream() |
| | | .filter(CurrencyMexcBo -> !loclSymbolSet.contains(CurrencyMexcBo.getSymbol())) |
| | | .collect(Collectors.toList()); |
| | | List<Currency> currencies = ConverterUtil.convertToList(saveList, Currency.class); |
| | | // 批量保存新增数据到数据库 |
| | | if(CollectionUtils.isNotEmpty(currencies)){ |
| | | currencyService.saveBatch(currencies); |