package org.example.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.example.common.MexcMarketDataClient; import org.example.pojo.Currency; import org.example.pojo.ExchangeInfo; import org.example.pojo.bo.CurrencyKucoin; import org.example.pojo.bo.CurrencyMexcBo; import org.example.server.impl.CurrencySerivceImpl; import org.example.util.ConverterUtil; import org.example.util.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** * @program: demo * @description: mexc交易所 * @create: 2024-07-15 17:22 **/ @Component @Slf4j public class MexcStock { @Autowired private CurrencySerivceImpl currencyService; private final Lock syncCurrencyLock = new ReentrantLock(); /** * 同步mexc交易所交易对 */ @Scheduled(cron = "0 0/45 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") public void syncCurrency() { // 使用Lock来确保同步 syncCurrencyLock.lock(); log.info("【同步mexc交易所交易对】---->开始"); try { sync(); } catch (Exception e) { log.error("【同步mexc交易所交易对】出现异常: " + e.getMessage()); e.printStackTrace(); } finally { syncCurrencyLock.unlock(); log.info("【同步mexc交易所交易对】---->结束"); } } public void sync() throws JsonProcessingException { Map symbolParams = new HashMap<>(); ObjectMapper objectMapper = new ObjectMapper(); // 调用外部接口获取数据 ExchangeInfo exchangeInfo = exchangeInfo(symbolParams); String json = objectMapper.writeValueAsString(exchangeInfo); // 对返回数据格式进行校验 if (json != null && !json.isEmpty()) { Map map = objectMapper.readValue(json, new TypeReference>() { }); // 获取接口返回的symbols数据 String symbolsJson = objectMapper.writeValueAsString(map.get("symbols")); Gson gson = new Gson(); List getList = gson.fromJson(symbolsJson, new TypeToken>() { }.getType()); // 获取数据库中已有的symbol列表 List dbList = currencyService.list(new LambdaQueryWrapper().eq(Currency::getSource, "mexc")); // 删除已经下架的币种 Set symbolSet = getList.stream().map(CurrencyMexcBo::getSymbol).collect(Collectors.toSet()); List removeList = dbList.stream() .filter(currency -> !symbolSet.contains(currency.getSymbol())) .collect(Collectors.toList()); if(CollectionUtils.isNotEmpty(removeList)){ removeList.forEach(f->{ RedisUtil.delete("mexc"+f.getSymbol()); currencyService.remove(new LambdaQueryWrapper().eq(Currency::getSymbol,f.getSymbol())); }); } // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 Set loclSymbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); List saveList = getList.stream() .filter(CurrencyMexcBo -> !loclSymbolSet.contains(CurrencyMexcBo.getSymbol())) .collect(Collectors.toList()); List currencies = ConverterUtil.convertToList(saveList, Currency.class); // 批量保存新增数据到数据库 if(CollectionUtils.isNotEmpty(currencies)){ currencyService.saveBatch(currencies); } } else { log.info("外部接口返回数据为空"); } } public static ExchangeInfo exchangeInfo(Map params) { return MexcMarketDataClient.get("/api/v3/exchangeInfo", params, new TypeReference() { }); } }