| | |
| | | package org.example.task; |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.core.type.TypeReference; |
| | | import com.google.common.collect.ImmutableMap; |
| | | import com.google.common.collect.Maps; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.google.common.reflect.TypeToken; |
| | | import com.google.gson.Gson; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.example.common.MarketDataClient; |
| | | import org.example.common.MexcMarketDataClient; |
| | | import org.example.pojo.Currency; |
| | | import org.example.pojo.ExchangeInfo; |
| | | import org.example.util.JsonUtil; |
| | | import org.example.pojo.bo.CurrencyMexcBo; |
| | | import org.example.server.impl.CurrencySerivceImpl; |
| | | import org.example.util.ConverterUtil; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.*; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import static com.alibaba.druid.sql.ast.SQLPartitionValue.Operator.List; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * @program: demo |
| | | * @description: mexc交易所 |
| | | * @create: 2024-07-15 17:22 |
| | | * @create: 2024-07-15 17:22 |
| | | **/ |
| | | @Component |
| | | @Slf4j |
| | | public class MexcStock { |
| | | |
| | | private final AtomicBoolean syncCurrency = new AtomicBoolean(false); |
| | | @Autowired |
| | | private CurrencySerivceImpl currencyService; |
| | | |
| | | private final Lock syncCurrencyLock = new ReentrantLock(); |
| | | |
| | | /** |
| | | * 同步mexc交易所交易对 |
| | | */ |
| | | @Scheduled(cron = "0 0/5 * * * ?") |
| | | @Scheduled(cron = "0 0/30 * * * ?") |
| | | // @Scheduled(cron = "0/10 * * * * ?") |
| | | public void syncCurrency() { |
| | | if (syncCurrency.get()) { |
| | | return; |
| | | } |
| | | if (syncCurrencyLock.tryLock()) { |
| | | System.out.println("【同步mexc交易所交易对】---->开市"); |
| | | try { |
| | | syncCurrency.set(true); |
| | | sync(); |
| | | } catch (Exception e) { |
| | | System.err.println("【同步mexc交易所交易对】出现异常: " + e.getMessage()); |
| | | } finally { |
| | | syncCurrencyLock.unlock(); |
| | | syncCurrency.set(false); |
| | | System.out.println("【同步mexc交易所交易对】---->结束"); |
| | | } |
| | | // 使用Lock来确保同步 |
| | | syncCurrencyLock.lock(); |
| | | log.info("【同步mexc交易所交易对】---->开始"); |
| | | try { |
| | | sync(); |
| | | } catch (Exception e) { |
| | | log.error("【同步mexc交易所交易对】出现异常: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | } finally { |
| | | syncCurrencyLock.unlock(); |
| | | log.info("【同步mexc交易所交易对】---->结束"); |
| | | } |
| | | } |
| | | |
| | | public void sync() { |
| | | HashMap<String, String> symbolParams = Maps.newHashMap(ImmutableMap.<String, String>builder() |
| | | .build()); |
| | | String json = JsonUtil.toJson(exchangeInfo(symbolParams)); |
| | | Gson gson = new Gson(); |
| | | Map map = gson.fromJson(json, Map.class); |
| | | String symbols = JsonUtil.toJson(map.get("symbols")); |
| | | ArrayList arrayList = gson.fromJson(symbols, ArrayList.class); |
| | | public void sync() throws JsonProcessingException { |
| | | Map<String, String> symbolParams = new HashMap<>(); |
| | | ObjectMapper objectMapper = new ObjectMapper(); |
| | | // 调用外部接口获取数据 |
| | | ExchangeInfo exchangeInfo = exchangeInfo(symbolParams); |
| | | String json = objectMapper.writeValueAsString(exchangeInfo); |
| | | |
| | | // 对返回数据格式进行校验 |
| | | if (json != null && !json.isEmpty()) { |
| | | Map<String, Object> map = objectMapper.readValue(json, new TypeReference<Map<String, Object>>() { |
| | | }); |
| | | |
| | | // 获取接口返回的symbols数据 |
| | | String symbolsJson = objectMapper.writeValueAsString(map.get("symbols")); |
| | | Gson gson = new Gson(); |
| | | List<CurrencyMexcBo> getList = gson.fromJson(symbolsJson, new TypeToken<List<CurrencyMexcBo>>() { |
| | | }.getType()); |
| | | |
| | | // 获取数据库中已有的symbol列表 |
| | | List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc")); |
| | | Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); |
| | | |
| | | // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 |
| | | List<CurrencyMexcBo> saveList = getList.stream() |
| | | .filter(CurrencyMexcBo -> !symbolSet.contains(CurrencyMexcBo.getSymbol())) |
| | | .collect(Collectors.toList()); |
| | | List<Currency> currencies = ConverterUtil.convertToList(saveList, Currency.class); |
| | | // 批量保存新增数据到数据库 |
| | | if(CollectionUtils.isNotEmpty(currencies)){ |
| | | currencyService.saveBatch(currencies); |
| | | } |
| | | } else { |
| | | log.info("外部接口返回数据为空"); |
| | | } |
| | | } |
| | | |
| | | public static ExchangeInfo exchangeInfo(Map<String, String> params) { |
| | | return MarketDataClient.get("/api/v3/exchangeInfo", params, new TypeReference<ExchangeInfo>() { |
| | | return MexcMarketDataClient.get("/api/v3/exchangeInfo", params, new TypeReference<ExchangeInfo>() { |
| | | }); |
| | | } |
| | | |
| | | } |
| | | } |