package org.example.mexcclient.WsBean; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.example.mexcclient.MexcClientApplication; import org.example.mexcclient.pojo.Currency; import org.example.mexcclient.server.impl.CurrencySerivceImpl; import org.example.mexcclient.wsClient.MexcClient; import org.json.JSONException; import org.json.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * @ClassDescription: 客户端请求类 * @JdkVersion: 1.8 * @Created: 2023/8/31 16:13 */ @Slf4j @Configuration public class MexcWsBean { @Autowired private CurrencySerivceImpl currencyService; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Bean public void mexcWebsocketRunClientMap() throws InterruptedException { List mexc = currencyService.list(new LambdaQueryWrapper().eq(Currency::getSource, "mexc").eq(Currency::getBaseAsset,"USBT")); if (!CollectionUtils.isEmpty(mexc)) { int batchSize = 30; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 int count = 0; for (int i = 0; i < threadCount; i++) { int fromIndex = i * batchSize; int toIndex = Math.min(fromIndex + batchSize, totalSize); List sublist = mexc.subList(fromIndex, toIndex); count++; if(count % 15 == 0 && count != 0){ count = 0; Thread.sleep(1500); } // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(() -> { new MexcClient(sublist).start(); }); } } } }