1
zj
2024-07-10 c0750bad4e7ab9acdef951d50f67cb488639b4de
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.nq.ws;
 
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.service.IMandatoryLiquidationService;
import com.nq.service.impl.MandatoryLiquidationService;
import com.nq.utils.ApplicationContextRegisterUtil;
import com.nq.utils.PropertiesUtil;
import com.nq.ws.client.IO;
import com.nq.ws.client.Socket;
import io.socket.emitter.Emitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
 
@Slf4j
@Configuration
public class WsClientConfig {
 
    private static final String SERVER_URL = PropertiesUtil.getProperty("JP_HTTP_API");
    private static final String AUTH_KEY = PropertiesUtil.getProperty("JP_KEY");
    private static final String ROOM_ID = "14";
    private static Socket socket;
 
    @Bean
    public void websocketRunClientMap() {
        connectToServer();
    }
 
    private static void connectToServer() {
        IO.Options options = new IO.Options();
        options.reconnection = true;
        options.reconnectionDelay = 1000;
 
        try {
            socket = IO.socket(new URI(SERVER_URL), options);
        } catch (URISyntaxException e) {
            log.error("Invalid URI", e);
            return;
        }
 
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                authenticate();
            }
        });
 
        socket.on("marketData", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                    Map<String, Object> map = jsonToMap(args[0].toString());
                    if(map.get("pid").equals("00000001")){
                        System.out.println("接收时间:"  +  new SimpleDateFormat("HH:mm:ss").format(new  Date())  +  "  "  +  args[0].toString());
                    }
                try {
                    ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
                    MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
                    StockRealTimeBean stockDetailBean =  new Gson().fromJson(args[0].toString(), StockRealTimeBean.class);
                    liquidationService.RealTimeDataProcess(EStockType.JP,stockDetailBean);
                }catch (Exception e){
                    log.error("socket数据存入缓存错误:", e.getMessage());
                }
            }
        });
 
        socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.error("socketIo连接错误: " + args[0]);
            }
        });
 
        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.error("socketIo 断开连接: " + args[0]);
                if (!socket.connected()) {
                    log.error("socketIo 开始重连: " + args[0]);
                    reconnect();
                }
            }
        });
 
        socket.connect();
    }
 
    public static Map<String, Object> jsonToMap(String json) {
        Gson gson = new Gson();
        Type type = new TypeToken<Map<String, Object>>(){}.getType();
        return gson.fromJson(json, type);
    }
 
    private static void reconnect() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!socket.connected()) {
                    try {
                        log.error("socketIo 开始重连");
                        socket.connect();
                        Thread.sleep(5000); // 重连间隔,单位毫秒
                    } catch (InterruptedException e) {
                        log.error("重连被中断", e);
                    }
                }
            }
        }).start();
    }
 
    private static void authenticate() {
        log.info("socketIo---->开始连接");
        if (socket != null && socket.connected()) {
            JSONObject authData = new JSONObject();
            try {
                authData.put("key", AUTH_KEY);
                authData.put("roomId", ROOM_ID);
                socket.emit("authenticate", authData);
                log.info("socketIo---->连接成功");
            } catch (JSONException e) {
                log.error("socketIo认证错误:"+e.getMessage(), e);
            }
        }
    }
}