1
zj
2024-08-05 a522f170fe22ffeb03e2b6bc81ae40243fb4cecb
1
9 files modified
1 files added
1 files deleted
255 ■■■■■ changed files
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java 7 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/controller/AdminLogin.java 52 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/controller/RunController.java 47 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/controller/UserController.java 39 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java 6 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java 5 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java 7 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java 44 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/util/RedisUtil.java 8 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java 39 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/resources/application.properties 1 ●●●● patch | view | raw | blame | history
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
@@ -49,10 +49,9 @@
    public void kucoinWebsocketRunClientMap() throws Exception {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
        if (!CollectionUtils.isEmpty(mexc)) {
//            String result = doPost();
//            JSONObject jsonObject = new JSONObject(result);
//            String token = jsonObject.getJSONObject("data").getString("token");
            String token = "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJwAOaCRcyD1ZqgQMAVRBiG7sPpujjw91tNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L0vRGCvQlyDu8hXEVeVU824=.c2e16bAGh64wOMyJm_jKxQ==";
            String result = doPost();
            JSONObject jsonObject = new JSONObject(result);
            String token = jsonObject.getJSONObject("data").getString("token");
            int batchSize = 100; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
websocketSerivce/src/main/java/org/example/controller/AdminLogin.java
File was deleted
websocketSerivce/src/main/java/org/example/controller/RunController.java
New file
@@ -0,0 +1,47 @@
package org.example.controller;
import lombok.extern.slf4j.Slf4j;
import org.example.WsServerApplication;
import org.example.common.ServerResponse;
import org.example.pojo.User;
import org.example.util.JwtUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
@RestController
@RequestMapping("/api")
@CrossOrigin(origins = "*")
@Slf4j
public class RunController {
    @Autowired
    private ConfigurableApplicationContext context;
    @PostMapping("/restart")
    @ResponseBody
    public ServerResponse restart(HttpServletRequest request) {
        String token = request.getHeader("token");
        User user = JwtUtil.verify(token);
        if(user.getIsRoot() != 1){
            return ServerResponse.createByErrorMsg("没有重启权限");
        }
        Thread restartThread = new Thread(() -> {
            try {
                Thread.sleep(1000); // 等待1秒钟,确保接口返回成功响应
                SpringApplication.exit(context, () -> 0);
                SpringApplication.run(WsServerApplication.class); // 替换成你的Spring Boot主类
            } catch (Exception e) {
                e.printStackTrace();
                log.error("重启失败");
            }
        });
        restartThread.setDaemon(false);
        restartThread.start();
        return ServerResponse.createBySuccess("重启成功");
    }
}
websocketSerivce/src/main/java/org/example/controller/UserController.java
@@ -59,11 +59,11 @@
        try {
            User user = userService.getOne(new LambdaQueryWrapper<User>().eq(User::getAccount, account));
            if(null == user){
                return ServerResponse.createBySuccessMsg("用户不存在");
                return ServerResponse.createByErrorMsg("用户不存在");
            }
            if (!MD5Util.verify(password, user.getPassword())) {
                return ServerResponse.createBySuccessMsg("密码错误");
                return ServerResponse.createByErrorMsg("密码错误");
            }
            List<Menu> menus = menuMapper.selectList(new LambdaQueryWrapper<Menu>());
            if(user.getIsRoot() == 1){
@@ -72,17 +72,18 @@
                map.put("token",token);
                map.put("user",user);
                map.put("menu",menus);
                extracted(account, request);
                return ServerResponse.createBySuccess(map);
            }
            //判断是否锁定
            if(user.getIsLock() == 1){
                return ServerResponse.createBySuccessMsg("账号已被锁定");
                return ServerResponse.createByErrorMsg("账号已被锁定");
            }
            //判断是否到期
            if(new java.util.Date().after(user.getEndTime())){
                return ServerResponse.createBySuccessMsg("账号已到期");
                return ServerResponse.createByErrorMsg("账号已到期");
            }
            String token = JwtUtil.getToken(user);
@@ -93,24 +94,25 @@
            map.put("user",user);
            String key = "user_";
            RedisUtil.set(key+user.getId(),token);
            String ip = IpAddressUtil.getIpAddress(request);
            String address = null;
            address = IpAddressUtil.getIpPossessionByFile(ip);
            if(null == address){
                address = IpAddressUtil.getIpAddressByOnline(ip);
            }
            Log log = new Log();
            log.setIp(ip);
            log.setAccount(account);
            log.setLoginTime(new java.util.Date());
            log.setAddress(address);
            logMapper.insert(log);
            extracted(account, request);
            return ServerResponse.createBySuccess(map);
        }catch (Exception e){
            e.printStackTrace();
            log.error("登录异常:"+e.getMessage());
        }
        return ServerResponse.createBySuccessMsg("系统异常");
        return ServerResponse.createByErrorMsg("系统异常");
    }
    private void extracted(String account, HttpServletRequest request) {
        String ip = IpAddressUtil.getIpAddress(request);
        String address = IpAddressUtil.getIpPossessionByFile(ip);
        Log log = new Log();
        log.setIp(ip);
        log.setAccount(account);
        log.setLoginTime(new java.util.Date());
        log.setAddress(address);
        logMapper.insert(log);
    }
    @PostMapping("/saveConfig")
@@ -193,13 +195,12 @@
    }
    @PostMapping("/updateUser")
    public ServerResponse deleteUser(UpdateUserVo updateUserVo) {
    public ServerResponse deleteUser(@RequestBody UpdateUserVo updateUserVo) {
        User user = userService.getById(updateUserVo.getId());
        if(null == user || user.getIsRoot() == 1){
            return ServerResponse.createByErrorMsg("用户不存在");
        }
        user.setAccount(updateUserVo.getAccount());
        user.setPassword(MD5Util.encrypt(updateUserVo.getPassword()));
        user.setEndTime(updateUserVo.getEndTime());
        user.setIsLock(updateUserVo.getIsLock());
        userService.updateById(user);
websocketSerivce/src/main/java/org/example/pojo/bo/WsBo.java
@@ -39,4 +39,10 @@
    //过滤数据
    private String buyAndSell;
    //当前页
    private Integer current = 1;
    //条数
    private Integer sizes = 10;
}
websocketSerivce/src/main/java/org/example/pojo/vo/UpdateUserVo.java
@@ -26,11 +26,6 @@
    private String account;
    /**
     * 密码
     */
    private String password;
    /**
     * 到期时间
     */
    private Date endTime;
websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -217,7 +217,10 @@
        allFutures.join();
        executor.shutdown(); // 关闭线程池
        mexcList.clear();
        gateList.clear();
        bitgetList.clear();
        kucoinList.clear();
        pushWs(marketDataOuts); // 推送结果
    }
@@ -256,6 +259,8 @@
    public void quotationCalculation(){
        extracted();
        findProfitablePairs(mexcList, gateList, bitgetList, kucoinList);
    }
    public void scheduler(){
websocketSerivce/src/main/java/org/example/util/IpAddressUtil.java
@@ -11,11 +11,9 @@
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.URL;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -37,31 +35,23 @@
     * 获取IP地址:
     */
    public static String getIpAddress(HttpServletRequest request) {
        String ipAddress = null;
        try {
            ipAddress = request.getHeader("X-Forwarded-For");
            if (ipAddress != null && ipAddress.length() != 0 && !"unknown".equalsIgnoreCase(ipAddress)) {
                // 多次反向代理后会有多个ip值,第一个ip才是真实ip
                if (ipAddress.contains(",")) {
                    ipAddress = ipAddress.split(",")[0];
            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
            while (interfaces.hasMoreElements()) {
                NetworkInterface networkInterface = interfaces.nextElement();
                Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress address = addresses.nextElement();
                    if (!address.isSiteLocalAddress() && !address.isLoopbackAddress() && address instanceof Inet4Address) {
                        System.out.println("External IP: " + address.getHostAddress());
                        return address.getHostAddress();
                    }
                }
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("WL-Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("HTTP_CLIENT_IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getRemoteAddr();
            }
        } catch (Exception e) {
            log.error("获取IP地址异常,{}", e.getMessage());
        } catch (SocketException e) {
            e.printStackTrace();
        }
        return ipAddress;
        return null;
    }
    /**
@@ -239,8 +229,8 @@
    public static void main(String[] args) {
        String ip = "183.162.252.0";// 国内IP
        String abroadIp = "48.119.248.100"; // 国外IP
        String ip = "26.26.26.1";// 国内IP
        String abroadIp = "26.26.26.1"; // 国外IP
        System.out.println("方法一(国内):" + getIpPossessionByFile(ip));
        System.out.println("方法二(国内):" + getCityInfoByVectorIndex(ip));
websocketSerivce/src/main/java/org/example/util/RedisUtil.java
@@ -5,6 +5,7 @@
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -36,6 +37,13 @@
        }
    }
    public static List<String> mget(List<String> keys) {
        try (Jedis jedis = jedisPool.getResource()) {
            List<String> values = jedis.mget(keys.toArray(new String[0]));
           return values;
        }
    }
    public static void hset(String key, String field, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.hset(key, field, value);
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -1,5 +1,6 @@
package org.example.websocket.server;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -9,6 +10,7 @@
import com.google.gson.GsonBuilder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.example.pojo.ConfigCurrency;
import org.example.pojo.MarketDataOut;
import org.example.pojo.bo.WsBo;
@@ -143,6 +145,8 @@
                        try {
                            schedulePushMessage(session, message);
                        } catch (Exception e) {
                            e.printStackTrace();
                            closeSession(session, "发送消息异常,断开链接");
                            log.error("发送消息时出现异常: {}", e.getMessage());
                        } finally {
                            sessionLock.unlock();
@@ -172,6 +176,7 @@
            long currentTime = System.currentTimeMillis();
            long lastMessageTime = lastMessageTimeMap.getOrDefault(session, 0L);
            int time = wsBo.getTime();
            message = megFiltration(wsBo,message);
            if (currentTime - lastMessageTime >= time * 1000) {
                // 时间间隔达到要求,可以发送消息
@@ -179,14 +184,14 @@
                lastMessageTimeMap.put(session, currentTime); // 更新最后发送时间
            } else {
                // 时间间隔未达到,不发送消息,可以记录日志或者其他操作
                log.info("距离上次发送消息时间未达到指定间隔,不发送消息。");
//                log.info("距离上次发送消息时间未达到指定间隔,不发送消息。");
            }
        }
    }
    private static final Gson gson = new Gson();
    private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
        List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType());
        Map<String,Object> map = new HashMap<>();
        String key = "config_";
        String value = RedisUtil.get(key + wsBo.getUserId());
        List<ConfigCurrency> currencies = null;
@@ -203,7 +208,7 @@
        //查询币种
        if(null != wsBo.getCurrency()){
        if(StringUtils.isNotEmpty(wsBo.getCurrency())){
            redisValueMap = redisValueMap.stream()
                    .filter(data -> wsBo.getCurrency().equals(data.getBaseAsset()))
                    .collect(Collectors.toList());
@@ -242,10 +247,36 @@
                    .filter(data -> list.contains(data.getBuyAndSell()))
                    .forEach(data -> data.setMarker(true));
        }
        map.put("current",wsBo.getCurrent());
        map.put("sizes",wsBo.getSizes());
        map.put("total",redisValueMap.size());
        sortBySpread(redisValueMap);
        Integer current = 0;
        if(wsBo.getCurrent() != 1){
            current = (wsBo.getCurrent() - 1) * wsBo.getSizes();
        }
        // 确保 startIndex 在有效范围内
        current = Math.min(current, redisValueMap.size());
        // 计算子列表的结束索引
        int endIndex = Math.min(current + wsBo.getSizes(), redisValueMap.size());
        // 根据计算出的索引获取子列表
        redisValueMap = redisValueMap.subList(current, endIndex);
        map.put("data",redisValueMap);
        Gson gson = new GsonBuilder().setPrettyPrinting().create();
        String json = gson.toJson(redisValueMap);
        String json = gson.toJson(map);
        return json;
    }
    public static void sortBySpread(List<MarketDataOut> marketDataList) {
        Collections.sort(marketDataList, new Comparator<MarketDataOut>() {
            @Override
            public int compare(MarketDataOut a, MarketDataOut b) {
                BigDecimal spreadA = new BigDecimal(a.getSpread());
                BigDecimal spreadB = new BigDecimal(b.getSpread());
                return spreadB.compareTo(spreadA);
            }
        });
    }
    private void pushMessage(Session session, String message) {
        try {
websocketSerivce/src/main/resources/application.properties
@@ -1,5 +1,6 @@
XDB_PATH=/www/wwwroot/csdn-ip2region.xdb
#XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb
redis1.ip=localhost
redis1.port=6379