1
zj
2024-09-29 8bb76f405fb9e5ee135231618c7da357946dc2f8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package com.nq.ws;
 
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.service.IMandatoryLiquidationService;
import com.nq.service.impl.MandatoryLiquidationService;
import com.nq.utils.ApplicationContextRegisterUtil;
import com.nq.utils.PropertiesUtil;
import com.nq.utils.redis.RedisKeyUtil;
import com.nq.ws.client.IO;
import com.nq.ws.client.Socket;
import io.socket.emitter.Emitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
 
/**
 * soket-io
 */
@Slf4j
@Configuration
public class WsClientConfig {
 
    private static final String SERVER_URL = PropertiesUtil.getProperty("JS_IN_HTTP_API");
    private static final String AUTH_KEY = PropertiesUtil.getProperty("JS_IN_KEY");
    private static final String ROOM_ID = "14";
    private static Socket socket;
 
    private static HttpClient httpClient = HttpClients.createDefault(); // 单例化 HttpClient
 
 
    private static HttpPost httpPost;
    static {
        httpPost = new HttpPost("http://127.0.0.1:8001/api/sendNotification"); // 初始化 HttpPost
    }
    @Bean
    public void websocketRunClientMap() {
        connectToServer();
    }
 
    private static void connectToServer() {
        IO.Options options = new IO.Options();
        options.reconnection = true;
        options.reconnectionDelay = 1000;
 
        try {
            socket = IO.socket(new URI(SERVER_URL), options);
        } catch (URISyntaxException e) {
            log.error("Invalid URI", e);
            return;
        }
 
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                authenticate();
            }
        });
 
        socket.on("marketData", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                send(args[0].toString());
                StockRealTimeBean stockDetailBean =  new Gson().fromJson(args[0].toString(), StockRealTimeBean.class);
                RedisKeyUtil.setCacheRealTimeStock(EStockType.IN,stockDetailBean);
            }
        });
 
        socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.error("socketIo连接错误: " + args[0]);
            }
        });
 
        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.error("socketIo 断开连接: " + args[0]);
                if (!socket.connected()) {
                    log.error("socketIo 开始重连: " + args[0]);
                    reconnect();
                }
            }
        });
 
        socket.connect();
    }
 
    public static Map<String, Object> jsonToMap(String json) {
        Gson gson = new Gson();
        Type type = new TypeToken<Map<String, Object>>(){}.getType();
        return gson.fromJson(json, type);
    }
 
    private static void reconnect() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!socket.connected()) {
                    try {
                        log.error("socketIo 开始重连");
                        socket.connect();
                        Thread.sleep(5000); // 重连间隔,单位毫秒
                    } catch (InterruptedException e) {
                        log.error("重连被中断", e);
                    }
                }
            }
        }).start();
    }
 
    private static void authenticate() {
        log.info("socketIo---->开始连接");
        if (socket != null && socket.connected()) {
            JSONObject authData = new JSONObject();
            try {
                authData.put("key", AUTH_KEY);
                authData.put("roomId", ROOM_ID);
                socket.emit("authenticate", authData);
                log.info("socketIo---->连接成功");
            } catch (JSONException e) {
                log.error("socketIo认证错误:"+e.getMessage(), e);
            }
        }
    }
 
    private static void send(String message) {
        try {
            // 准备 form-data 参数
            List<BasicNameValuePair> params = new ArrayList<>();
            params.add(new BasicNameValuePair("message", message));
 
            // 设置编码格式为 UTF-8
            UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
            httpPost.setEntity(entity); // 设置 HttpPost 对象的参数
 
            // 发送请求
            HttpResponse response = httpClient.execute(httpPost);
 
            // 处理响应
            int statusCode = response.getStatusLine().getStatusCode();
        } catch (IOException e) {
            log.error("Http 请求错误", e);
        }
    }
}