kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
@@ -49,10 +49,9 @@ public void kucoinWebsocketRunClientMap() throws Exception { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin")); if (!CollectionUtils.isEmpty(mexc)) { // String result = doPost(); // JSONObject jsonObject = new JSONObject(result); // String token = jsonObject.getJSONObject("data").getString("token"); String token = "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJwAOaCRcyD1ZqgQMAVRBiG7sPpujjw91tNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L0vRGCvQlyDu8hXEVeVU824=.c2e16bAGh64wOMyJm_jKxQ=="; String result = doPost(); JSONObject jsonObject = new JSONObject(result); String token = jsonObject.getJSONObject("data").getString("token"); int batchSize = 100; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 websocketSerivce/src/main/java/org/example/controller/AdminLogin.java
File was deleted websocketSerivce/src/main/java/org/example/controller/RunController.java
New file @@ -0,0 +1,47 @@ package org.example.controller; import lombok.extern.slf4j.Slf4j; import org.example.WsServerApplication; import org.example.common.ServerResponse; import org.example.pojo.User; import org.example.util.JwtUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; @RestController @RequestMapping("/api") @CrossOrigin(origins = "*") @Slf4j public class RunController { @Autowired private ConfigurableApplicationContext context; @PostMapping("/restart") @ResponseBody public ServerResponse restart(HttpServletRequest request) { String token = request.getHeader("token"); User user = JwtUtil.verify(token); if(user.getIsRoot() != 1){ return ServerResponse.createByErrorMsg("没有重启权限"); } Thread restartThread = new Thread(() -> { try { Thread.sleep(1000); // 等待1秒钟,确保接口返回成功响应 SpringApplication.exit(context, () -> 0); SpringApplication.run(WsServerApplication.class); // 替换成你的Spring Boot主类 } catch (Exception e) { e.printStackTrace(); log.error("重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); return ServerResponse.createBySuccess("重启成功"); } } websocketSerivce/src/main/java/org/example/controller/UserController.java
@@ -59,11 +59,11 @@ try { User user = userService.getOne(new LambdaQueryWrapper<User>().eq(User::getAccount, account)); if(null == user){ return ServerResponse.createBySuccessMsg("用户不存在"); return ServerResponse.createByErrorMsg("用户不存在"); } if (!MD5Util.verify(password, user.getPassword())) { return ServerResponse.createBySuccessMsg("密码错误"); return ServerResponse.createByErrorMsg("密码错误"); } List<Menu> menus = menuMapper.selectList(new LambdaQueryWrapper<Menu>()); if(user.getIsRoot() == 1){ @@ -72,17 +72,18 @@ map.put("token",token); map.put("user",user); map.put("menu",menus); extracted(account, request); return ServerResponse.createBySuccess(map); } //判断是否锁定 if(user.getIsLock() == 1){ return ServerResponse.createBySuccessMsg("账号已被锁定"); return ServerResponse.createByErrorMsg("账号已被锁定"); } //判断是否到期 if(new java.util.Date().after(user.getEndTime())){ return ServerResponse.createBySuccessMsg("账号已到期"); return ServerResponse.createByErrorMsg("账号已到期"); } String token = JwtUtil.getToken(user); @@ -93,24 +94,25 @@ map.put("user",user); String key = "user_"; RedisUtil.set(key+user.getId(),token); String ip = IpAddressUtil.getIpAddress(request); String address = null; address = IpAddressUtil.getIpPossessionByFile(ip); if(null == address){ address = IpAddressUtil.getIpAddressByOnline(ip); extracted(account, request); return ServerResponse.createBySuccess(map); }catch (Exception e){ e.printStackTrace(); log.error("登录异常:"+e.getMessage()); } return ServerResponse.createByErrorMsg("系统异常"); } private void extracted(String account, HttpServletRequest request) { String ip = IpAddressUtil.getIpAddress(request); String address = IpAddressUtil.getIpPossessionByFile(ip); Log log = new Log(); log.setIp(ip); log.setAccount(account); log.setLoginTime(new java.util.Date()); log.setAddress(address); logMapper.insert(log); return ServerResponse.createBySuccess(map); }catch (Exception e){ e.printStackTrace(); log.error("登录异常:"+e.getMessage()); } return ServerResponse.createBySuccessMsg("系统异常"); } @PostMapping("/saveConfig") @@ -193,13 +195,12 @@ } @PostMapping("/updateUser") public ServerResponse deleteUser(UpdateUserVo updateUserVo) { public ServerResponse deleteUser(@RequestBody UpdateUserVo updateUserVo) { User user = userService.getById(updateUserVo.getId()); if(null == user || user.getIsRoot() == 1){ return ServerResponse.createByErrorMsg("用户不存在"); } user.setAccount(updateUserVo.getAccount()); user.setPassword(MD5Util.encrypt(updateUserVo.getPassword())); user.setEndTime(updateUserVo.getEndTime()); user.setIsLock(updateUserVo.getIsLock()); userService.updateById(user); websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java
@@ -39,4 +39,10 @@ //过滤数据 private String buyAndSell; //当前页 private Integer current = 1; //条数 private Integer sizes = 10; } websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java
@@ -26,11 +26,6 @@ private String account; /** * 密码 */ private String password; /** * 到期时间 */ private Date endTime; websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -217,7 +217,10 @@ allFutures.join(); executor.shutdown(); // 关闭线程池 mexcList.clear(); gateList.clear(); bitgetList.clear(); kucoinList.clear(); pushWs(marketDataOuts); // 推送结果 } @@ -256,6 +259,8 @@ public void quotationCalculation(){ extracted(); findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); } public void scheduler(){ websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java
@@ -11,11 +11,9 @@ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.URL; import java.net.*; import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -37,31 +35,23 @@ * 获取IP地址: */ public static String getIpAddress(HttpServletRequest request) { String ipAddress = null; try { ipAddress = request.getHeader("X-Forwarded-For"); if (ipAddress != null && ipAddress.length() != 0 && !"unknown".equalsIgnoreCase(ipAddress)) { // 多次反向代理后会有多个ip值,第一个ip才是真实ip if (ipAddress.contains(",")) { ipAddress = ipAddress.split(",")[0]; Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); while (interfaces.hasMoreElements()) { NetworkInterface networkInterface = interfaces.nextElement(); Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress address = addresses.nextElement(); if (!address.isSiteLocalAddress() && !address.isLoopbackAddress() && address instanceof Inet4Address) { System.out.println("External IP: " + address.getHostAddress()); return address.getHostAddress(); } } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("WL-Proxy-Client-IP"); } catch (SocketException e) { e.printStackTrace(); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("HTTP_CLIENT_IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getRemoteAddr(); } } catch (Exception e) { log.error("获取IP地址异常,{}", e.getMessage()); } return ipAddress; return null; } /** @@ -239,8 +229,8 @@ public static void main(String[] args) { String ip = "183.162.252.0";// 国内IP String abroadIp = "48.119.248.100"; // 国外IP String ip = "26.26.26.1";// 国内IP String abroadIp = "26.26.26.1"; // 国外IP System.out.println("方法一(国内):" + getIpPossessionByFile(ip)); System.out.println("方法二(国内):" + getCityInfoByVectorIndex(ip)); websocketSerivce/src/main/java/org/example/util/RedisUtil.java
@@ -5,6 +5,7 @@ import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.Protocol; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -36,6 +37,13 @@ } } public static List<String> mget(List<String> keys) { try (Jedis jedis = jedisPool.getResource()) { List<String> values = jedis.mget(keys.toArray(new String[0])); return values; } } public static void hset(String key, String field, String value) { try (Jedis jedis = jedisPool.getResource()) { jedis.hset(key, field, value); websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -1,5 +1,6 @@ package org.example.websocket.server; import cn.hutool.core.util.NumberUtil; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -9,6 +10,7 @@ import com.google.gson.GsonBuilder; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.example.pojo.ConfigCurrency; import org.example.pojo.MarketDataOut; import org.example.pojo.bo.WsBo; @@ -143,6 +145,8 @@ try { schedulePushMessage(session, message); } catch (Exception e) { e.printStackTrace(); closeSession(session, "发送消息异常,断开链接"); log.error("发送消息时出现异常: {}", e.getMessage()); } finally { sessionLock.unlock(); @@ -172,6 +176,7 @@ long currentTime = System.currentTimeMillis(); long lastMessageTime = lastMessageTimeMap.getOrDefault(session, 0L); int time = wsBo.getTime(); message = megFiltration(wsBo,message); if (currentTime - lastMessageTime >= time * 1000) { // 时间间隔达到要求,可以发送消息 @@ -179,14 +184,14 @@ lastMessageTimeMap.put(session, currentTime); // 更新最后发送时间 } else { // 时间间隔未达到,不发送消息,可以记录日志或者其他操作 log.info("距离上次发送消息时间未达到指定间隔,不发送消息。"); // log.info("距离上次发送消息时间未达到指定间隔,不发送消息。"); } } } private static final Gson gson = new Gson(); private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException { List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType()); Map<String,Object> map = new HashMap<>(); String key = "config_"; String value = RedisUtil.get(key + wsBo.getUserId()); List<ConfigCurrency> currencies = null; @@ -203,7 +208,7 @@ //查询币种 if(null != wsBo.getCurrency()){ if(StringUtils.isNotEmpty(wsBo.getCurrency())){ redisValueMap = redisValueMap.stream() .filter(data -> wsBo.getCurrency().equals(data.getBaseAsset())) .collect(Collectors.toList()); @@ -242,10 +247,36 @@ .filter(data -> list.contains(data.getBuyAndSell())) .forEach(data -> data.setMarker(true)); } map.put("current",wsBo.getCurrent()); map.put("sizes",wsBo.getSizes()); map.put("total",redisValueMap.size()); sortBySpread(redisValueMap); Integer current = 0; if(wsBo.getCurrent() != 1){ current = (wsBo.getCurrent() - 1) * wsBo.getSizes(); } // 确保 startIndex 在有效范围内 current = Math.min(current, redisValueMap.size()); // 计算子列表的结束索引 int endIndex = Math.min(current + wsBo.getSizes(), redisValueMap.size()); // 根据计算出的索引获取子列表 redisValueMap = redisValueMap.subList(current, endIndex); map.put("data",redisValueMap); Gson gson = new GsonBuilder().setPrettyPrinting().create(); String json = gson.toJson(redisValueMap); String json = gson.toJson(map); return json; } public static void sortBySpread(List<MarketDataOut> marketDataList) { Collections.sort(marketDataList, new Comparator<MarketDataOut>() { @Override public int compare(MarketDataOut a, MarketDataOut b) { BigDecimal spreadA = new BigDecimal(a.getSpread()); BigDecimal spreadB = new BigDecimal(b.getSpread()); return spreadB.compareTo(spreadA); } }); } private void pushMessage(Session session, String message) { try { websocketSerivce/src/main/resources/application.properties
@@ -1,5 +1,6 @@ XDB_PATH=/www/wwwroot/csdn-ip2region.xdb #XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb redis1.ip=localhost redis1.port=6379