package org.example.task; import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.example.common.*; import org.example.pojo.Currency; import org.example.pojo.bo.CurrencyBitgetBo; import org.example.server.impl.CurrencySerivceImpl; import org.example.util.ConverterUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.sql.SQLOutput; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** * @program: demo * @description: bitget交易所 * @create: 2024-07-15 17:24 **/ @Component @Slf4j public class BitgetStock { @Autowired private CurrencySerivceImpl currencyService; private final Lock syncCurrencyLock = new ReentrantLock(); /** * 同步bitget交易所交易对 */ @Scheduled(cron = "0 0/45 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") public void syncCurrency() { // 使用Lock来确保同步 syncCurrencyLock.lock(); log.info("【同步bitget交易所交易对】---->开始"); try { sync(); } catch (Exception e) { log.error("【同步bitget交易所交易对】出现异常: " + e.getMessage()); e.printStackTrace(); } finally { syncCurrencyLock.unlock(); log.info("【同步bitget交易所交易对】---->结束"); } } public void sync() throws IOException { // 调用外部接口获取数据 String json = doGet(); // 对返回数据格式进行校验 if (json != null && !json.isEmpty()) { ObjectMapper objectMapper = new ObjectMapper(); Map map = objectMapper.readValue(json, new TypeReference>() { }); String symbolsJson = objectMapper.writeValueAsString(map.get("data")); Gson gson = new Gson(); List getList = gson.fromJson(symbolsJson, new TypeToken>() { }.getType()); getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-"))); // 获取数据库中已有的symbol列表 currencyService.remove(new LambdaQueryWrapper().eq(Currency::getSource,"bitget")); // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 List saveList = getList.stream() .map(currency -> { Currency newCurrency = new Currency(); newCurrency.setSymbol(currency.getSymbol()); newCurrency.setBaseAsset(currency.getBaseCoin()); newCurrency.setQuoteAsset(currency.getQuoteCoin()); newCurrency.setSource(currency.getSource()); return newCurrency; }) .collect(Collectors.toList()); // 批量保存新增数据到数据库 if (CollectionUtils.isNotEmpty(saveList)) { currencyService.saveBatch(saveList); } } else { log.info("同步bitget交易所交易对,外部接口返回数据为空"); } } public static String doGet() throws IOException { HttpClient httpClient = HttpClients.createDefault(); HttpGet request = new HttpGet("https://api.bitget.com/api/v2/spot/public/symbols"); HttpResponse response = httpClient.execute(request); try { // 处理响应内容 HttpEntity entity = response.getEntity(); String responseBody = EntityUtils.toString(entity); return responseBody; } finally { // 确保释放资源 EntityUtils.consume(response.getEntity()); } } }