1
zj
2024-10-29 c8b1fc4dd96f24215cb5ed633a5b5c59c405f910
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package com.nq.ws;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.utils.redis.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
 
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
/**
 * @program: dabaogp
 * @description:
 * @create: 2024-10-25 14:44
 **/
@Slf4j
public class USWebsocketRunClient  extends WebSocketClient {
 
    private EStockType eStockType;
    public USWebsocketRunClient(URI serverUri,
                              EStockType eStockType
    ) {
        super(serverUri);
        this.eStockType = eStockType;
    }
 
    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
    }
 
    @Override
    public void onMessage(String s) {
        try {
            if(!s.equals("pong") && !s.equals("身份验证成功")){
                Map<String, String> stringObjectMap = jsonToMap(s);
                StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
                stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
                stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
                stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
                stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
                stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
                stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
                stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
                stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
                stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
                ObjectMapper objectMapper = new ObjectMapper();
                try {
                    if(!stockRealTimeBean.getPcp().contains("-")){
                        stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
                    }
                    String json = objectMapper.writeValueAsString(stockRealTimeBean);
                    RedisKeyUtil.setCacheRealTimeStock(eStockType,stockRealTimeBean);
                    send(json);
                } catch (
                        JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
            }
        }catch (Exception e){
            log.error("ws 消息接收错误:"+s);
        }
    }
 
    public static Map<String, String> jsonToMap(String json) {
        Gson gson = new Gson();
        Type type = new TypeToken<Map<String, String>>(){}.getType();
        return gson.fromJson(json, type);
    }
 
    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("websocket  美股  关闭"+1);
    }
 
    @Override
    public void onError(Exception e) {
        log.info("websocket 错误");
    }
 
 
    private HttpClient httpClient;  //  声明一个成员变量
 
    private HttpPost httpPost = new HttpPost("http://127.0.0.1:8002/api/sendNotification");  //  创建一个HttpPost对象
 
    @Override
    public void send(String message) {
 
        try {
 
            if (httpClient == null) {
                httpClient = HttpClients.createDefault();  //  使用单例模式创建HttpClient对象
            }
 
            //  准备  form-data  参数
            List<BasicNameValuePair> params = new ArrayList<>();
            params.add(new BasicNameValuePair("message", message));
 
            //  设置编码格式为  UTF-8
            UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
            httpPost.setEntity(entity);  //  设置HttpPost对象的参数
 
            //  发送请求
            HttpResponse response = httpClient.execute(httpPost);
 
            //  处理响应
            int statusCode = response.getStatusLine().getStatusCode();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
}