package org.example.WsBean; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.example.pojo.Currency; import org.example.server.impl.CurrencySerivceImpl; import org.example.wsClient.BitgetClient; import org.example.wsClient.GateClient; import org.example.wsClient.KucoinClient; import org.example.wsClient.MexcClient; import org.json.JSONException; import org.json.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * @ClassDescription: 客户端请求类 * @JdkVersion: 1.8 * @Created: 2023/8/31 16:13 */ @Slf4j @Configuration public class MexcWsBean { @Autowired private CurrencySerivceImpl currencyService; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; // @Bean // public void mexcWebsocketRunClientMap() { // List mexc = currencyService.list(new LambdaQueryWrapper().eq(Currency::getSource, "mexc")); // if (!CollectionUtils.isEmpty(mexc)) { // int batchSize = 30; // 每个线程处理的数据量 // int totalSize = mexc.size(); // int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 // // for (int i = 0; i < threadCount; i++) { // int fromIndex = i * batchSize; // int toIndex = Math.min(fromIndex + batchSize, totalSize); // List sublist = mexc.subList(fromIndex, toIndex); // // // 使用自定义线程池提交任务 // threadPoolTaskExecutor.execute(new MexcClient(sublist)::start); // } // // } // } // @Bean // public void gateWebsocketRunClientMap() { // List mexc = currencyService.list(new LambdaQueryWrapper().eq(Currency::getSource, "gate")); // if (!CollectionUtils.isEmpty(mexc)) { // int batchSize = 100; // 每个线程处理的数据量 // int totalSize = mexc.size(); // int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 // // for (int i = 0; i < threadCount; i++) { // int fromIndex = i * batchSize; // int toIndex = Math.min(fromIndex + batchSize, totalSize); // List sublist = mexc.subList(fromIndex, toIndex); // // // 使用自定义线程池提交任务 // threadPoolTaskExecutor.execute(new GateClient(sublist)::start); // } // // } // } @Bean public void bitgetWebsocketRunClientMap() throws JSONException, JsonProcessingException { List mexc = currencyService.list(new LambdaQueryWrapper().eq(Currency::getSource, "bitget")); if (!CollectionUtils.isEmpty(mexc)) { int batchSize = 100; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 for (int i = 0; i < threadCount; i++) { int fromIndex = i * batchSize; int toIndex = Math.min(fromIndex + batchSize, totalSize); List sublist = mexc.subList(fromIndex, toIndex); String parameter = getParameter(sublist); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start); } } } // // @Bean // public void kucoinWebsocketRunClientMap() throws Exception { // List mexc = currencyService.list(new LambdaQueryWrapper().eq(Currency::getSource, "kucoin")); // if (!CollectionUtils.isEmpty(mexc)) { // String result = doPost(); // JSONObject jsonObject = new JSONObject(result); // String token = jsonObject.getJSONObject("data").getString("token"); // int batchSize = 100; // 每个线程处理的数据量 // int totalSize = mexc.size(); // int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 // // for (int i = 0; i < threadCount; i++) { // int fromIndex = i * batchSize; // int toIndex = Math.min(fromIndex + batchSize, totalSize); // List sublist = mexc.subList(fromIndex, toIndex); // // // 使用自定义线程池提交任务 // threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start); // } // // } // } public static String doPost() throws Exception { String url = "https://api.kucoin.com/api/v1/bullet-public"; HttpPost httpPost = new HttpPost(url); DefaultHttpClient defaultHttpClient = new DefaultHttpClient(); List nvps = new ArrayList(); httpPost.setEntity(new UrlEncodedFormEntity(nvps)); HttpResponse response = defaultHttpClient.execute(httpPost); HttpEntity respEntity = response.getEntity(); String text = EntityUtils.toString(respEntity, "UTF-8"); defaultHttpClient.getConnectionManager().shutdown(); return text; } public String getParameter(List list) throws JsonProcessingException, JSONException { // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); List symbolList = list.stream().map(Currency::getSymbol).collect(Collectors.toList()); // 使用Map构建JSON对象 Map jsonMap = new HashMap<>(); jsonMap.put("op", "subscribe"); List> mapList = new ArrayList<>(); symbolList.forEach(f->{ Map argsMap = new HashMap<>(); argsMap.put("instType", "SPOT"); argsMap.put("channel", "books15"); argsMap.put("instId", f); mapList.add(argsMap); }); jsonMap.put("args", mapList); // 将Map转换为JSON字符串 String jsonString = mapper.writeValueAsString(jsonMap); return jsonString; } }