1
zyy
2 days ago 01b3fecb2d1b03861a72d53a7afea6ca557a209c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.nq.ws;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.utils.redis.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
 
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
 
@Slf4j
public class USWebsocketRunClient extends WebSocketClient {
 
    private EStockType eStockType;
    public USWebsocketRunClient(URI serverUri,
                                EStockType eStockType
    ) {
        super(serverUri);
        this.eStockType = eStockType;
    }
 
    private static HttpClient httpClient = HttpClients.createDefault(); // 单例化 HttpClient
 
 
    private static HttpPost httpPost;
    static {
        httpPost = new HttpPost("http://127.0.0.1:8001/api/sendNotification"); // 初始化 HttpPost
    }
 
    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
        Timer heartbeatTimer;
        // 启动心跳定时器
        heartbeatTimer = new Timer();
        heartbeatTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                if (isOpen()) {
                    //send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
                    send("heartbeat");
                }
            }
        }, 0, 3000); // 每3秒发送一次心跳消息
    }
 
    @Override
    public void onMessage(String message) {
        if (message.contains("身份验证成功") || message.contains("pong") || message.contains("身份验证失败") || message.contains("ws连接点只能有一个")) {
            return;
        }
        Map<String, String> stringObjectMap = jsonToMap(message);
        StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
        stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
        stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
        stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
        stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
        stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
        stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
        stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
        stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
        stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
        RedisKeyUtil.setCacheRealTimeStock(EStockType.US,stockRealTimeBean);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            if(!stockRealTimeBean.getPcp().contains("-")){
                stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
            }
            String json = objectMapper.writeValueAsString(stockRealTimeBean);
            sendLoca(json);
            StockRealTimeBean stockDetailBean =  new Gson().fromJson(message, StockRealTimeBean.class);
            RedisKeyUtil.setCacheRealTimeStock(EStockType.US,stockDetailBean);
        } catch (JsonProcessingException e) {
            log.error("websocket 美股股票 消息错误:{}", e.getMessage());
        }
    }
 
    public static Map<String, String> jsonToMap(String json) {
        Gson gson = new Gson();
        Type type = new TypeToken<Map<String, String>>(){}.getType();
        return gson.fromJson(json, type);
    }
 
    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("websocket 美股股票 关闭 {} ", i);
    }
 
    @Override
    public void onError(Exception e) {
        log.info("websocket 美股股票 错误{}", e.getMessage());
    }
 
    public void sendLoca(String message) {
        try {
            // 准备 form-data 参数
            List<BasicNameValuePair> params = new ArrayList<>();
            params.add(new BasicNameValuePair("message", message));
 
            // 设置编码格式为 UTF-8
            UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
            httpPost.setEntity(entity); // 设置 HttpPost 对象的参数
 
            // 发送请求
            HttpResponse response = httpClient.execute(httpPost);
 
            // 处理响应
            int statusCode = response.getStatusLine().getStatusCode();
        } catch (IOException e) {
            log.error("Http 请求错误", e);
        }
    }
}