新增websocket美国、墨西哥客户端
多线程执行同步股票、新闻咨询抓取
7 files modified
1 files added
333 ■■■■ changed files
src/main/java/com/nq/enums/EStockType.java 20 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/service/impl/SiteNewsServiceImpl.java 19 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/service/impl/UserStockSubscribeServiceImpl.java 10 ●●●● patch | view | raw | blame | history
src/main/java/com/nq/utils/task/stock/StockTask.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/ws/MXWebsocketRunClient.java 127 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/ws/WebSocketClientBeanConfig.java 56 ●●●●● patch | view | raw | blame | history
src/main/java/com/nq/ws/WebsocketRunClient.java 65 ●●●● patch | view | raw | blame | history
src/main/resources/application.properties 14 ●●●● patch | view | raw | blame | history
src/main/java/com/nq/enums/EStockType.java
@@ -12,10 +12,13 @@
    US("US","美国股票","5",PropertiesUtil.getProperty("US_HTTP_API"),PropertiesUtil.getProperty("US_KEY"),"USD","$"),
    HK("HK","香港股票","39",PropertiesUtil.getProperty("HK_HTTP_API"),PropertiesUtil.getProperty("HK_KEY"),"HKD","HK$"),
    MAS("MAS","马来西亚股票","42",PropertiesUtil.getProperty("MAS_HTTP_API"),PropertiesUtil.getProperty("MAS_KEY"),"MYR","RM"),
    IN("IN","印度股票","14", PropertiesUtil.getProperty("JS_IN_HTTP_URL"),PropertiesUtil.getProperty("JS_IN_KEY"),"INR","₹");
//    HK("HK","香港股票","39",PropertiesUtil.getProperty("HK_HTTP_API"),PropertiesUtil.getProperty("HK_KEY"),"HKD","HK$"),
//    MAS("MAS","马来西亚股票","42",PropertiesUtil.getProperty("MAS_HTTP_API"),PropertiesUtil.getProperty("MAS_KEY"),"MYR","RM"),
//
    IN("IN","印度股票","14", PropertiesUtil.getProperty("JS_IN_HTTP_URL"),PropertiesUtil.getProperty("JS_IN_KEY"),"INR","₹"),
    MX("MX","墨西哥股票","7",PropertiesUtil.getProperty("MX_HTTP_API"),PropertiesUtil.getProperty("MX_KEY"),"MXN","MXN$");
//    TH("TH","泰国股票","41",PropertiesUtil.getProperty("TH_HTTP_API"),PropertiesUtil.getProperty("TH_KEY")),
//    HG("HG","韩国股票","11",PropertiesUtil.getProperty("HG_HTTP_API"),PropertiesUtil.getProperty("HG_KEY")),
//    SZHB("SZHB","数字货币","41",PropertiesUtil.getProperty("SZHB_HTTP_API"),PropertiesUtil.getProperty("SZHB_KEY"));
@@ -44,15 +47,10 @@
    public static EStockType getEStockTypeByCode(String code){
        if(EStockType.US.getCode().equals(code)){
            return US;
        }else if(EStockType.HK.getCode().equals(code)){
            return  HK;
        }else if(EStockType.MAS.getCode().equals(code)){
            return  MAS;
        }else if(EStockType.IN.getCode().equals(code)){
            return  IN;
        }else if(EStockType.MX.getCode().equals(code)){
            return  MX;
        }else{
            return  MAS;
            return  US;
        }
    }
src/main/java/com/nq/service/impl/SiteNewsServiceImpl.java
@@ -24,6 +24,8 @@
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 新闻资讯
@@ -132,7 +134,22 @@
    @Override
    public void grabNews() {
        addNews(1, PropertiesUtil.getProperty("JS_IN_HTTP_URL") + "stock-markets?key=" + PropertiesUtil.getProperty("JS_IN_KEY") + "&type=6");
        // 创建固定大小的线程池,根据需求调整线程数量
        ExecutorService executor = Executors.newFixedThreadPool(2);
        try {
            // 提交美国新闻抓取任务
            executor.submit(() ->
                    addNews(1, PropertiesUtil.getProperty("US_HTTP_API") + "stock-markets?key=" + PropertiesUtil.getProperty("US_KEY") + "&type=1")
            );
            // 提交墨西哥新闻抓取任务
            executor.submit(() ->
                    addNews(1, PropertiesUtil.getProperty("MX_HTTP_API") + "stock-markets?key=" + PropertiesUtil.getProperty("MX_KEY") + "&type=1")
            );
        } finally {
            // 关闭线程池
            executor.shutdown();
        }
    }
    private void addNews(Integer type, String url) {
src/main/java/com/nq/service/impl/UserStockSubscribeServiceImpl.java
@@ -191,7 +191,7 @@
//                   bound =  new BigDecimal(model.getApplyNums()).multiply(stockSubscribe.getPrice());
//               }
//
                    BigDecimal useEnaAmount = iUserAssetsServices.getAvailableBalance(EStockType.IN.getCode(), user.getId());
                    BigDecimal useEnaAmount = iUserAssetsServices.getAvailableBalance(EStockType.US.getCode(), user.getId());
                    if(useEnaAmount.compareTo(bound)<0){
                        return ServerResponse.createByErrorMsg("余额不足,配售失败",request);
                    }
@@ -210,7 +210,7 @@
                    UserStockSubscribe userStockSubscribe = Convert.convert(UserStockSubscribe.class, model);
                    userStockSubscribe.setNewStockId(stockSubscribe.getNewlistId());
                    ret = userStockSubscribeMapper.insert(userStockSubscribe);
                    iUserAssetsServices.availablebalanceChange(EStockType.IN.getCode(), user.getId(),EUserAssets.BUY,bound.negate(),"","");
                    iUserAssetsServices.availablebalanceChange(EStockType.US.getCode(), user.getId(),EUserAssets.BUY,bound.negate(),"","");
                    if (ret > 0) {
                        return ServerResponse.createBySuccessMsg("配售成功",request);
                    } else {
@@ -295,7 +295,7 @@
                    }
                    BigDecimal cCount = new BigDecimal(model.getApplyNums()-model.getApplyNumber());
                    BigDecimal tMoney = ((stockSubscribe.getMinPrice() != null ? stockSubscribe.getMinPrice() : stockSubscribe.getPrice())).multiply(cCount);
                    iUserAssetsServices.availablebalanceChange(EStockType.IN.getCode(),userStockSubscribe.getUserId(),
                    iUserAssetsServices.availablebalanceChange(EStockType.US.getCode(),userStockSubscribe.getUserId(),
                            EUserAssets.TOP_UP,tMoney,"","");
                    model.setBond((stockSubscribe.getMinPrice() != null ? stockSubscribe.getMinPrice() : stockSubscribe.getPrice()).multiply(BigDecimal.valueOf(model.getApplyNumber())));
                    model.setDbMoney(BigDecimal.ZERO);
@@ -499,14 +499,14 @@
                    userStockSubscribe.setSubmitTime(DateTimeUtil.getCurrentDate());
                    userStockSubscribe.setStatus(4);
                    BigDecimal bigDecimal =  iUserAssetsServices.
                            getAvailableBalance(EStockType.IN.getCode(),
                            getAvailableBalance(EStockType.US.getCode(),
                                    userStockSubscribe.getUserId());
                    BigDecimal multiply = userStockSubscribe.getBuyPrice().multiply(new BigDecimal(userStockSubscribe.getApplyNumber()));
                    if(bigDecimal.compareTo(multiply) <= 0){
                        return ServerResponse.createByErrorMsg("余额不足",request);
                    }
                    iUserAssetsServices.availablebalanceChange(EStockType.IN.getCode(),userStockSubscribe.getUserId(),
                    iUserAssetsServices.availablebalanceChange(EStockType.US.getCode(),userStockSubscribe.getUserId(),
                            EUserAssets.BUY,multiply.negate(),"","");
                    userStockSubscribe.setDbMoney(BigDecimal.ZERO);
                    userStockSubscribeMapper.update1(userStockSubscribe);
src/main/java/com/nq/utils/task/stock/StockTask.java
@@ -25,6 +25,9 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -67,10 +70,27 @@
            return;
        }
        if (syncINStockDataLock.tryLock()) {
            ExecutorService executor = Executors.newFixedThreadPool(2);
            Future<?> usFuture = null;
            Future<?> mxFuture = null;
            try {
                syncINStockData.set(true); // 设置处理中标识为true
                loadAllStock(EStockType.IN);
                // 并行执行US和MX的股票数据加载
                usFuture = executor.submit(() -> loadAllStock(EStockType.US));
                mxFuture = executor.submit(() -> loadAllStock(EStockType.MX));
                // 等待两个任务都完成
                usFuture.get();
                mxFuture.get();
            } catch (Exception e) {
                Thread.currentThread().interrupt();
                log.error("同步股票数据出错", e);
            } finally {
                // 关闭线程池
                if (executor != null) {
                    executor.shutdown();
                }
                syncINStockDataLock.unlock();
                syncINStockData.set(false); // 设置处理中标识为false
            }
src/main/java/com/nq/ws/MXWebsocketRunClient.java
New file
@@ -0,0 +1,127 @@
package com.nq.ws;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.utils.redis.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
@Slf4j
public class MXWebsocketRunClient  extends WebSocketClient {
    private EStockType eStockType;
    public MXWebsocketRunClient(URI serverUri,
                              EStockType eStockType
    ) {
        super(serverUri);
        this.eStockType = eStockType;
    }
    private static HttpClient httpClient = HttpClients.createDefault(); // 单例化 HttpClient
    private static HttpPost httpPost;
    static {
        httpPost = new HttpPost("http://127.0.0.1:8001/api/sendNotification"); // 初始化 HttpPost
    }
    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
        Timer heartbeatTimer;
        // 启动心跳定时器
        heartbeatTimer = new Timer();
        heartbeatTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
            }
        }, 0, 3000); // 每3秒发送一次心跳消息
    }
    @Override
    public void onMessage(String message) {
        if (message.contains("身份验证成功") || message.contains("pong") || message.contains("身份验证失败")) {
            System.out.println("mx" + message);
            return;
        }
        System.out.println("mx2" + message);
        Map<String, String> stringObjectMap = jsonToMap(message);
        StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
        stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
        stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
        stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
        stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
        stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
        stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
        stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
        stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
        stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
        RedisKeyUtil.setCacheRealTimeStock(EStockType.MX,stockRealTimeBean);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            if(!stockRealTimeBean.getPcp().contains("-")){
                stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
            }
            String json = objectMapper.writeValueAsString(stockRealTimeBean);
            sendLoca(json);
            StockRealTimeBean stockDetailBean =  new Gson().fromJson(message, StockRealTimeBean.class);
            RedisKeyUtil.setCacheRealTimeStock(EStockType.MX,stockDetailBean);
        } catch (JsonProcessingException e) {
            log.error("websocket 墨西哥股票 消息错误:{}", e.getMessage());
        }
    }
    public static Map<String, String> jsonToMap(String json) {
        Gson gson = new Gson();
        Type type = new TypeToken<Map<String, String>>(){}.getType();
        return gson.fromJson(json, type);
    }
    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("websocket 墨西哥股票 关闭"+1);
    }
    @Override
    public void onError(Exception e) {
        log.info("websocket 墨西哥股票 错误");
    }
    public void sendLoca(String message) {
        try {
            // 准备 form-data 参数
            List<BasicNameValuePair> params = new ArrayList<>();
            params.add(new BasicNameValuePair("message", message));
            // 设置编码格式为 UTF-8
            UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
            httpPost.setEntity(entity); // 设置 HttpPost 对象的参数
            // 发送请求
            HttpResponse response = httpClient.execute(httpPost);
            // 处理响应
            int statusCode = response.getStatusLine().getStatusCode();
        } catch (IOException e) {
            log.error("Http 请求错误", e);
        }
    }
}
src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
@@ -22,26 +22,48 @@
        Map<String, WebSocketClient> retMap = new HashMap<>(2);
        try {
            WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(PropertiesUtil.getProperty("JS_IN_WS_URL")),EStockType.IN);
            websocketRunClient.connect();
            websocketRunClient.setConnectionLostTimeout(0);
            new Thread(() -> {
                while (true) {
            WebsocketRunClient usWebsocketRunClient = new WebsocketRunClient(new URI(PropertiesUtil.getProperty("US_WS_URL")),EStockType.US);
            usWebsocketRunClient.connect();
            usWebsocketRunClient.setConnectionLostTimeout(0);
            startHeartbeatThread(usWebsocketRunClient);
            retMap.put(EStockType.US.getStockKey(), usWebsocketRunClient);
        } catch (Exception e) {
            log.error("usWebsocketRunClient 异常: {}", e.getMessage());
        }
        try {
            MXWebsocketRunClient mxWebsocketRunClient = new MXWebsocketRunClient(new URI(PropertiesUtil.getProperty("MX_WS_URL")),EStockType.MX);
            mxWebsocketRunClient.connect();
            mxWebsocketRunClient.setConnectionLostTimeout(0);
            startHeartbeatThread(mxWebsocketRunClient);
            retMap.put(EStockType.MX.getStockKey(), mxWebsocketRunClient);
        } catch (Exception e) {
            log.error("mxWebsocketRunClient 异常: {}", e.getMessage());
        }
        return retMap;
    }
    private void startHeartbeatThread(WebSocketClient client) {
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(8000);
                    if (client.isOpen()) { // 先检查连接状态
                        client.send("heartbeat".getBytes());
                    } else {
                        client.reconnect();
                        client.setConnectionLostTimeout(0);
                    }
                } catch (Exception e) {
                    log.error("心跳线程异常, 尝试重连: {}", e.getMessage());
                    try {
                        Thread.sleep(8000);
                        websocketRunClient.send("heartbeat".getBytes());
                    } catch (Exception e) {
                        websocketRunClient.reconnect();
                        websocketRunClient.setConnectionLostTimeout(0);
                        client.reconnect();
                        client.setConnectionLostTimeout(0);
                    } catch (Exception re) {
                        log.error("重连失败: {}", re.getMessage());
                    }
                }
            }).start();
        } catch (Exception e) {
        }
        return retMap;
            }
        }).start();
    }
src/main/java/com/nq/ws/WebsocketRunClient.java
@@ -6,9 +6,6 @@
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.service.IMandatoryLiquidationService;
import com.nq.service.impl.MandatoryLiquidationService;
import com.nq.utils.ApplicationContextRegisterUtil;
import com.nq.utils.redis.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
@@ -61,35 +58,37 @@
    }
    @Override
    public void onMessage(String s) {
        if(!s.equals("pong") && !s.equals("身份验证成功")){
            Map<String, String> stringObjectMap = jsonToMap(s);
            StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
            stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
            stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
            stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
            stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
            stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
            stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
            stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
            stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
            stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
            RedisKeyUtil.setCacheRealTimeStock(EStockType.IN,stockRealTimeBean);
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                if(!stockRealTimeBean.getPcp().contains("-")){
                    stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
                }
                String json = objectMapper.writeValueAsString(stockRealTimeBean);
                sendLoca(json);
                StockRealTimeBean stockDetailBean =  new Gson().fromJson(s, StockRealTimeBean.class);
                RedisKeyUtil.setCacheRealTimeStock(EStockType.IN,stockDetailBean);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
    public void onMessage(String message) {
        if (message.contains("身份验证成功") || message.contains("pong") || message.contains("身份验证失败")) {
            System.out.println("us" + message);
            return;
        }
        System.out.println("us2" + message);
        Map<String, String> stringObjectMap = jsonToMap(message);
        StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
        stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
        stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
        stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
        stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
        stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
        stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
        stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
        stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
        stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
        RedisKeyUtil.setCacheRealTimeStock(EStockType.US,stockRealTimeBean);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            System.out.println("sdd");
            if(!stockRealTimeBean.getPcp().contains("-")){
                stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
            }
        }else{
            log.error("websocket  消息错误:"+s);
            String json = objectMapper.writeValueAsString(stockRealTimeBean);
            sendLoca(json);
            StockRealTimeBean stockDetailBean =  new Gson().fromJson(message, StockRealTimeBean.class);
            System.out.println(stockDetailBean);
            RedisKeyUtil.setCacheRealTimeStock(EStockType.US,stockDetailBean);
        } catch (JsonProcessingException e) {
            log.error("websocket 美国股票 消息错误:{}", e.getMessage());
        }
    }
@@ -101,12 +100,12 @@
    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("websocket  印度股票  关闭"+1);
        log.info("websocket 美国股票 关闭"+1);
    }
    @Override
    public void onError(Exception e) {
        log.info("websocket 错误");
        log.info("websocket 美国股票 错误" + e.getMessage());
    }
    public void sendLoca(String message) {
src/main/resources/application.properties
@@ -52,14 +52,22 @@
JS_IN_WS_URL = ws://api-in-pro-ws.js-stock.top
JS_IN_KEY = xKChgi47AP1NMwMeYI3c
US_HTTP_API = http://api-us.js-stock.top/
US_WS_URL = ws://ws-us.js-stock.top
US_KEY = jZFrku4RGQjP87Hmq5tm
#US_HTTP_API = http://api-us.js-stock.top/
#US_WS_URL = ws://ws-us.js-stock.top
#US_KEY = jZFrku4RGQjP87Hmq5tm
HK_HTTP_API = http://test.js-stock.top/
HK_WS_URL = ws://test-ws.js-stock.top
HK_KEY = mG8QQDdjGuLjLnrryd0B
US_HTTP_API = http://api-us-v2.js-stock.top/
US_WS_URL = ws://api-us-v2-ws.js-stock.top
US_KEY = x45TBc52rI0nH9Hsyqeo
MX_HTTP_API = http://api-mx.js-stock.top/
MX_WS_URL = ws://api-mx-ws.js-stock.top
MX_KEY = nFQivDtnjHZliFGPF1Gu
#HK_HTTP_API = http://api-v1.js-stock.top/
#HK_WS_URL = ws://api-v1-ws.js-stock.top