From a522f170fe22ffeb03e2b6bc81ae40243fb4cecb Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Mon, 05 Aug 2024 02:09:36 +0800
Subject: [PATCH] 1
---
websocketSerivce/src/main/java/org/example/util/RedisUtil.java | 8 +
/dev/null | 52 ----------
websocketSerivce/src/main/java/org/example/controller/RunController.java | 47 +++++++++
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java | 7
websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java | 44 +++-----
websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java | 7 +
websocketSerivce/src/main/resources/application.properties | 1
websocketSerivce/src/main/java/org/example/controller/UserController.java | 39 ++++---
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java | 39 +++++++
websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java | 6 +
websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java | 5 -
11 files changed, 143 insertions(+), 112 deletions(-)
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
index 6883e45..408184b 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
@@ -49,10 +49,9 @@
public void kucoinWebsocketRunClientMap() throws Exception {
List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
if (!CollectionUtils.isEmpty(mexc)) {
-// String result = doPost();
-// JSONObject jsonObject = new JSONObject(result);
-// String token = jsonObject.getJSONObject("data").getString("token");
- String token = "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJwAOaCRcyD1ZqgQMAVRBiG7sPpujjw91tNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L0vRGCvQlyDu8hXEVeVU824=.c2e16bAGh64wOMyJm_jKxQ==";
+ String result = doPost();
+ JSONObject jsonObject = new JSONObject(result);
+ String token = jsonObject.getJSONObject("data").getString("token");
int batchSize = 100; // 每个线程处理的数据量
int totalSize = mexc.size();
int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
diff --git a/websocketSerivce/src/main/java/org/example/controller/AdminLogin.java b/websocketSerivce/src/main/java/org/example/controller/AdminLogin.java
deleted file mode 100644
index 64aa642..0000000
--- a/websocketSerivce/src/main/java/org/example/controller/AdminLogin.java
+++ /dev/null
@@ -1,52 +0,0 @@
-//package org.example.controller;
-//
-//import cn.hutool.json.JSONUtil;
-//import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-//import org.example.common.ServerResponse;
-//import org.example.pojo.User;
-//import org.example.server.impl.UserServiceImpl;
-//import org.example.util.JwtUtil;
-//import org.example.util.MD5Util;
-//import org.example.util.RedisUtil;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.web.bind.annotation.PostMapping;
-//import org.springframework.web.bind.annotation.RequestMapping;
-//import org.springframework.web.bind.annotation.RequestParam;
-//import org.springframework.web.bind.annotation.RestController;
-//
-//import java.security.SecureRandom;
-//import java.util.Base64;
-//import java.util.Date;
-//import java.util.HashMap;
-//import java.util.Map;
-//
-///**
-// * @program: demo
-// * @description:
-// * @create: 2024-07-29 11:42
-// **/
-//@RestController
-//@RequestMapping("/admin")
-//public class AdminLogin {
-//
-// @Autowired
-// private UserServiceImpl userService;
-//
-// @PostMapping("/login")
-// public ServerResponse saveUser(@RequestParam("account") String account
-// , @RequestParam("password") String password) {
-//
-// User user = userService.getOne(new LambdaQueryWrapper<User>().eq(User::getAccount, account).eq(User::getIsRoot,1));
-// if(null == user){
-// return ServerResponse.createBySuccessMsg("管理员账号不存在");
-// }
-//
-// if (!MD5Util.verify(password, user.getPassword())) {
-// return ServerResponse.createBySuccessMsg("密码错误");
-// }
-// String token = JwtUtil.getToken(user);
-// Map<String,String> map = new HashMap<>();
-// map.put("token",token);
-// return ServerResponse.createBySuccess(map);
-// }
-//}
diff --git a/websocketSerivce/src/main/java/org/example/controller/RunController.java b/websocketSerivce/src/main/java/org/example/controller/RunController.java
new file mode 100644
index 0000000..649fabd
--- /dev/null
+++ b/websocketSerivce/src/main/java/org/example/controller/RunController.java
@@ -0,0 +1,47 @@
+package org.example.controller;
+
+import lombok.extern.slf4j.Slf4j;
+import org.example.WsServerApplication;
+import org.example.common.ServerResponse;
+import org.example.pojo.User;
+import org.example.util.JwtUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletRequest;
+
+@RestController
+@RequestMapping("/api")
+@CrossOrigin(origins = "*")
+@Slf4j
+public class RunController {
+
+ @Autowired
+ private ConfigurableApplicationContext context;
+
+ @PostMapping("/restart")
+ @ResponseBody
+ public ServerResponse restart(HttpServletRequest request) {
+ String token = request.getHeader("token");
+ User user = JwtUtil.verify(token);
+ if(user.getIsRoot() != 1){
+ return ServerResponse.createByErrorMsg("没有重启权限");
+ }
+ Thread restartThread = new Thread(() -> {
+ try {
+ Thread.sleep(1000); // 等待1秒钟,确保接口返回成功响应
+ SpringApplication.exit(context, () -> 0);
+ SpringApplication.run(WsServerApplication.class); // 替换成你的Spring Boot主类
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("重启失败");
+ }
+ });
+ restartThread.setDaemon(false);
+ restartThread.start();
+ return ServerResponse.createBySuccess("重启成功");
+ }
+
+}
diff --git a/websocketSerivce/src/main/java/org/example/controller/UserController.java b/websocketSerivce/src/main/java/org/example/controller/UserController.java
index 556a38d..e5d11b1 100644
--- a/websocketSerivce/src/main/java/org/example/controller/UserController.java
+++ b/websocketSerivce/src/main/java/org/example/controller/UserController.java
@@ -59,11 +59,11 @@
try {
User user = userService.getOne(new LambdaQueryWrapper<User>().eq(User::getAccount, account));
if(null == user){
- return ServerResponse.createBySuccessMsg("用户不存在");
+ return ServerResponse.createByErrorMsg("用户不存在");
}
if (!MD5Util.verify(password, user.getPassword())) {
- return ServerResponse.createBySuccessMsg("密码错误");
+ return ServerResponse.createByErrorMsg("密码错误");
}
List<Menu> menus = menuMapper.selectList(new LambdaQueryWrapper<Menu>());
if(user.getIsRoot() == 1){
@@ -72,17 +72,18 @@
map.put("token",token);
map.put("user",user);
map.put("menu",menus);
+ extracted(account, request);
return ServerResponse.createBySuccess(map);
}
//判断是否锁定
if(user.getIsLock() == 1){
- return ServerResponse.createBySuccessMsg("账号已被锁定");
+ return ServerResponse.createByErrorMsg("账号已被锁定");
}
//判断是否到期
if(new java.util.Date().after(user.getEndTime())){
- return ServerResponse.createBySuccessMsg("账号已到期");
+ return ServerResponse.createByErrorMsg("账号已到期");
}
String token = JwtUtil.getToken(user);
@@ -93,24 +94,25 @@
map.put("user",user);
String key = "user_";
RedisUtil.set(key+user.getId(),token);
- String ip = IpAddressUtil.getIpAddress(request);
- String address = null;
- address = IpAddressUtil.getIpPossessionByFile(ip);
- if(null == address){
- address = IpAddressUtil.getIpAddressByOnline(ip);
- }
- Log log = new Log();
- log.setIp(ip);
- log.setAccount(account);
- log.setLoginTime(new java.util.Date());
- log.setAddress(address);
- logMapper.insert(log);
+
+ extracted(account, request);
return ServerResponse.createBySuccess(map);
}catch (Exception e){
e.printStackTrace();
log.error("登录异常:"+e.getMessage());
}
- return ServerResponse.createBySuccessMsg("系统异常");
+ return ServerResponse.createByErrorMsg("系统异常");
+ }
+
+ private void extracted(String account, HttpServletRequest request) {
+ String ip = IpAddressUtil.getIpAddress(request);
+ String address = IpAddressUtil.getIpPossessionByFile(ip);
+ Log log = new Log();
+ log.setIp(ip);
+ log.setAccount(account);
+ log.setLoginTime(new java.util.Date());
+ log.setAddress(address);
+ logMapper.insert(log);
}
@PostMapping("/saveConfig")
@@ -193,13 +195,12 @@
}
@PostMapping("/updateUser")
- public ServerResponse deleteUser(UpdateUserVo updateUserVo) {
+ public ServerResponse deleteUser(@RequestBody UpdateUserVo updateUserVo) {
User user = userService.getById(updateUserVo.getId());
if(null == user || user.getIsRoot() == 1){
return ServerResponse.createByErrorMsg("用户不存在");
}
user.setAccount(updateUserVo.getAccount());
- user.setPassword(MD5Util.encrypt(updateUserVo.getPassword()));
user.setEndTime(updateUserVo.getEndTime());
user.setIsLock(updateUserVo.getIsLock());
userService.updateById(user);
diff --git a/websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java b/websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java
index 1266329..fbb5582 100644
--- a/websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java
+++ b/websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java
@@ -39,4 +39,10 @@
//过滤数据
private String buyAndSell;
+
+ //当前页
+ private Integer current = 1;
+
+ //条数
+ private Integer sizes = 10;
}
diff --git a/websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java b/websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java
index 580f333..5e3ce39 100644
--- a/websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java
+++ b/websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java
@@ -26,11 +26,6 @@
private String account;
/**
- * 密码
- */
- private String password;
-
- /**
* 到期时间
*/
private Date endTime;
diff --git a/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
index 1362f03..c49739d 100644
--- a/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -217,7 +217,10 @@
allFutures.join();
executor.shutdown(); // 关闭线程池
-
+ mexcList.clear();
+ gateList.clear();
+ bitgetList.clear();
+ kucoinList.clear();
pushWs(marketDataOuts); // 推送结果
}
@@ -256,6 +259,8 @@
public void quotationCalculation(){
extracted();
findProfitablePairs(mexcList, gateList, bitgetList, kucoinList);
+
+
}
public void scheduler(){
diff --git a/websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java b/websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java
index 533ac69..152b88b 100644
--- a/websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java
+++ b/websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java
@@ -11,11 +11,9 @@
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.URL;
+import java.net.*;
import java.nio.charset.StandardCharsets;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -37,31 +35,23 @@
* 获取IP地址:
*/
public static String getIpAddress(HttpServletRequest request) {
- String ipAddress = null;
try {
- ipAddress = request.getHeader("X-Forwarded-For");
- if (ipAddress != null && ipAddress.length() != 0 && !"unknown".equalsIgnoreCase(ipAddress)) {
- // 多次反向代理后会有多个ip值,第一个ip才是真实ip
- if (ipAddress.contains(",")) {
- ipAddress = ipAddress.split(",")[0];
+ Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = interfaces.nextElement();
+ Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ InetAddress address = addresses.nextElement();
+ if (!address.isSiteLocalAddress() && !address.isLoopbackAddress() && address instanceof Inet4Address) {
+ System.out.println("External IP: " + address.getHostAddress());
+ return address.getHostAddress();
+ }
}
}
- if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
- ipAddress = request.getHeader("Proxy-Client-IP");
- }
- if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
- ipAddress = request.getHeader("WL-Proxy-Client-IP");
- }
- if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
- ipAddress = request.getHeader("HTTP_CLIENT_IP");
- }
- if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
- ipAddress = request.getRemoteAddr();
- }
- } catch (Exception e) {
- log.error("获取IP地址异常,{}", e.getMessage());
+ } catch (SocketException e) {
+ e.printStackTrace();
}
- return ipAddress;
+ return null;
}
/**
@@ -239,8 +229,8 @@
public static void main(String[] args) {
- String ip = "183.162.252.0";// 国内IP
- String abroadIp = "48.119.248.100"; // 国外IP
+ String ip = "26.26.26.1";// 国内IP
+ String abroadIp = "26.26.26.1"; // 国外IP
System.out.println("方法一(国内):" + getIpPossessionByFile(ip));
System.out.println("方法二(国内):" + getCityInfoByVectorIndex(ip));
diff --git a/websocketSerivce/src/main/java/org/example/util/RedisUtil.java b/websocketSerivce/src/main/java/org/example/util/RedisUtil.java
index e1c6022..ac49bce 100644
--- a/websocketSerivce/src/main/java/org/example/util/RedisUtil.java
+++ b/websocketSerivce/src/main/java/org/example/util/RedisUtil.java
@@ -5,6 +5,7 @@
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -36,6 +37,13 @@
}
}
+ public static List<String> mget(List<String> keys) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ List<String> values = jedis.mget(keys.toArray(new String[0]));
+ return values;
+ }
+ }
+
public static void hset(String key, String field, String value) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(key, field, value);
diff --git a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
index 9f594a1..80ccfe9 100644
--- a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
+++ b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -1,5 +1,6 @@
package org.example.websocket.server;
+import cn.hutool.core.util.NumberUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -9,6 +10,7 @@
import com.google.gson.GsonBuilder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.example.pojo.ConfigCurrency;
import org.example.pojo.MarketDataOut;
import org.example.pojo.bo.WsBo;
@@ -143,6 +145,8 @@
try {
schedulePushMessage(session, message);
} catch (Exception e) {
+ e.printStackTrace();
+ closeSession(session, "发送消息异常,断开链接");
log.error("发送消息时出现异常: {}", e.getMessage());
} finally {
sessionLock.unlock();
@@ -172,6 +176,7 @@
long currentTime = System.currentTimeMillis();
long lastMessageTime = lastMessageTimeMap.getOrDefault(session, 0L);
int time = wsBo.getTime();
+
message = megFiltration(wsBo,message);
if (currentTime - lastMessageTime >= time * 1000) {
// 时间间隔达到要求,可以发送消息
@@ -179,14 +184,14 @@
lastMessageTimeMap.put(session, currentTime); // 更新最后发送时间
} else {
// 时间间隔未达到,不发送消息,可以记录日志或者其他操作
- log.info("距离上次发送消息时间未达到指定间隔,不发送消息。");
+// log.info("距离上次发送消息时间未达到指定间隔,不发送消息。");
}
}
}
private static final Gson gson = new Gson();
private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType());
-
+ Map<String,Object> map = new HashMap<>();
String key = "config_";
String value = RedisUtil.get(key + wsBo.getUserId());
List<ConfigCurrency> currencies = null;
@@ -203,7 +208,7 @@
//查询币种
- if(null != wsBo.getCurrency()){
+ if(StringUtils.isNotEmpty(wsBo.getCurrency())){
redisValueMap = redisValueMap.stream()
.filter(data -> wsBo.getCurrency().equals(data.getBaseAsset()))
.collect(Collectors.toList());
@@ -242,10 +247,36 @@
.filter(data -> list.contains(data.getBuyAndSell()))
.forEach(data -> data.setMarker(true));
}
+ map.put("current",wsBo.getCurrent());
+ map.put("sizes",wsBo.getSizes());
+ map.put("total",redisValueMap.size());
+ sortBySpread(redisValueMap);
+ Integer current = 0;
+ if(wsBo.getCurrent() != 1){
+ current = (wsBo.getCurrent() - 1) * wsBo.getSizes();
+ }
+
+ // 确保 startIndex 在有效范围内
+ current = Math.min(current, redisValueMap.size());
+ // 计算子列表的结束索引
+ int endIndex = Math.min(current + wsBo.getSizes(), redisValueMap.size());
+ // 根据计算出的索引获取子列表
+ redisValueMap = redisValueMap.subList(current, endIndex);
+ map.put("data",redisValueMap);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String json = gson.toJson(redisValueMap);
+ String json = gson.toJson(map);
return json;
}
+ public static void sortBySpread(List<MarketDataOut> marketDataList) {
+ Collections.sort(marketDataList, new Comparator<MarketDataOut>() {
+ @Override
+ public int compare(MarketDataOut a, MarketDataOut b) {
+ BigDecimal spreadA = new BigDecimal(a.getSpread());
+ BigDecimal spreadB = new BigDecimal(b.getSpread());
+ return spreadB.compareTo(spreadA);
+ }
+ });
+ }
private void pushMessage(Session session, String message) {
try {
diff --git a/websocketSerivce/src/main/resources/application.properties b/websocketSerivce/src/main/resources/application.properties
index f43eb37..235f969 100644
--- a/websocketSerivce/src/main/resources/application.properties
+++ b/websocketSerivce/src/main/resources/application.properties
@@ -1,5 +1,6 @@
XDB_PATH=/www/wwwroot/csdn-ip2region.xdb
+#XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb
redis1.ip=localhost
redis1.port=6379
--
Gitblit v1.9.3