From 7bcae85b0f7ea58b927b7441f61b7f33b11646c2 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Thu, 16 May 2024 17:54:12 +0800
Subject: [PATCH] 1
---
.idea/inspectionProfiles/Project_Default.xml | 91 +
src/main/java/com/nq/ws/WebsocketRunClient.java | 121 +-
src/main/java/com/nq/ws/parser/Parser.java | 82 +
src/main/java/com/nq/enums/EStockType.java | 2
src/main/java/com/nq/ws/client/Url.java | 84 +
src/main/java/com/nq/ws/hasbinary/HasBinary.java | 63 +
src/main/java/com/nq/ws/client/Ack.java | 11
src/main/java/com/nq/ws/parser/DecodingException.java | 7
src/main/java/com/nq/ws/client/AckWithTimeout.java | 35
src/main/java/com/nq/ws/client/SocketOptionBuilder.java | 196 ++++
pom.xml | 16
src/main/java/com/nq/ws/WsClientConfig.java | 147 +++
src/main/java/com/nq/ws/client/Socket.java | 574 ++++++++++++
src/main/resources/application.properties | 4
src/main/java/com/nq/ws/WebSocketClientBeanConfig.java | 96 +-
src/main/java/com/nq/ws/backo/Backoff.java | 66 +
src/main/java/com/nq/ws/client/On.java | 23
src/main/java/com/nq/ws/parser/IOParser.java | 263 +++++
src/main/java/com/nq/ws/parser/Binary.java | 127 ++
src/main/java/com/nq/ws/client/Manager.java | 575 ++++++++++++
src/main/java/com/nq/ws/client/SocketIOException.java | 20
src/main/java/com/nq/ws/parser/Packet.java | 22
src/main/java/com/nq/ws/client/IO.java | 118 ++
23 files changed, 2,639 insertions(+), 104 deletions(-)
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 6560a98..4cc2f57 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -1,6 +1,61 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
+ <inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAbstractClassShouldStartWithAbstractNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAbstractMethodOrInterfaceMethodMustUseJavadoc" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidApacheBeanUtilsCopy" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidCallStaticSimpleDateFormat" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidCommentBehindStatement" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidComplexCondition" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidConcurrentCompetitionRandom" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidDoubleOrFloatEqualCompare" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidManuallyCreateThread" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidMissUseOfMathRandom" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidNegationOperator" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidNewDateGetTime" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidPatternCompileInMethod" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidReturnInFinally" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidStartWithDollarAndUnderLineNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaAvoidUseTimer" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaBigDecimalAvoidDoubleConstructor" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaBooleanPropertyShouldNotStartWithIs" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaClassCastExceptionWithSubListToArrayList" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaClassCastExceptionWithToArray" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaClassMustHaveAuthor" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaClassNamingShouldBeCamel" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaCollectionInitShouldAssignCapacity" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaCommentsMustBeJavadocFormat" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaConcurrentExceptionWithModifyOriginSubList" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaConstantFieldShouldBeUpperCase" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaCountDownShouldInFinally" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaDontModifyInForeachCircle" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaEnumConstantsMustHaveComment" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaExceptionClassShouldEndWithException" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaIbatisMethodQueryForList" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaLockShouldWithTryFinally" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaLowerCamelCaseVariableNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaMethodReturnWrapperType" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaMethodTooLong" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaPackageNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaPojoMustOverrideToString" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaPojoMustUsePrimitiveField" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaPojoNoDefaultValue" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaRemoveCommentedCode" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaServiceOrDaoClassShouldEndWithImpl" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaSneakyThrowsWithoutExceptionType" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaStringConcat" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaSwitchExpression" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaSwitchStatement" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaTestClassShouldEndWithTestNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaThreadLocalShouldRemove" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaThreadPoolCreation" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaThreadShouldSetName" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaTransactionMustHaveRollback" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaUndefineMagicConstant" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaUnsupportedExceptionWithModifyAsList" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaUseQuietReferenceNotation" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AlibabaUseRightCaseForDateFormat" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="JavaDoc" enabled="true" level="WARNING" enabled_by_default="true">
<option name="TOP_LEVEL_CLASS_OPTIONS">
<value>
@@ -32,5 +87,41 @@
<option name="IGNORE_POINT_TO_ITSELF" value="false" />
<option name="myAdditionalJavadocTags" value="date" />
</inspection_tool>
+ <inspection_tool class="JavadocDeclaration" enabled="true" level="WARNING" enabled_by_default="true">
+ <option name="ADDITIONAL_TAGS" value="date" />
+ </inspection_tool>
+ <inspection_tool class="MissingJavadoc" enabled="true" level="WARNING" enabled_by_default="true">
+ <option name="PACKAGE_SETTINGS">
+ <Options>
+ <option name="ENABLED" value="false" />
+ </Options>
+ </option>
+ <option name="MODULE_SETTINGS">
+ <Options>
+ <option name="ENABLED" value="false" />
+ </Options>
+ </option>
+ <option name="TOP_LEVEL_CLASS_SETTINGS">
+ <Options>
+ <option name="ENABLED" value="false" />
+ </Options>
+ </option>
+ <option name="INNER_CLASS_SETTINGS">
+ <Options>
+ <option name="ENABLED" value="false" />
+ </Options>
+ </option>
+ <option name="METHOD_SETTINGS">
+ <Options>
+ <option name="REQUIRED_TAGS" value="@return@param@throws or @exception" />
+ <option name="ENABLED" value="false" />
+ </Options>
+ </option>
+ <option name="FIELD_SETTINGS">
+ <Options>
+ <option name="ENABLED" value="false" />
+ </Options>
+ </option>
+ </inspection_tool>
</profile>
</component>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 84a2b55..058a9cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,7 +197,21 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>io.socket</groupId>
+ <artifactId>engine.io-client</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20090211</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.8</version>
+ </dependency>
</dependencies>
diff --git a/src/main/java/com/nq/enums/EStockType.java b/src/main/java/com/nq/enums/EStockType.java
index 23029e6..6cc3f17 100644
--- a/src/main/java/com/nq/enums/EStockType.java
+++ b/src/main/java/com/nq/enums/EStockType.java
@@ -15,7 +15,7 @@
HK("HK","香港股票","39",PropertiesUtil.getProperty("HK_HTTP_API"),PropertiesUtil.getProperty("HK_KEY"),"HKD","HK$"),
MAS("MAS","马来西亚股票","42",PropertiesUtil.getProperty("MAS_HTTP_API"),PropertiesUtil.getProperty("MAS_KEY"),"MYR","RM"),
- IN("IN","印度股票","14", PropertiesUtil.getProperty("IN_HTTP_API"),PropertiesUtil.getProperty("IN_KEY"),"INR","₹");
+ IN("IN","印度股票","14", PropertiesUtil.getProperty("IN_HTTP_API"),PropertiesUtil.getProperty("JS_IN_KEY"),"INR","₹");
// TH("TH","泰国股票","41",PropertiesUtil.getProperty("TH_HTTP_API"),PropertiesUtil.getProperty("TH_KEY")),
// HG("HG","韩国股票","11",PropertiesUtil.getProperty("HG_HTTP_API"),PropertiesUtil.getProperty("HG_KEY")),
// SZHB("SZHB","数字货币","41",PropertiesUtil.getProperty("SZHB_HTTP_API"),PropertiesUtil.getProperty("SZHB_KEY"));
diff --git a/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java b/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
index 53bf924..355774e 100644
--- a/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
+++ b/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
@@ -1,48 +1,48 @@
-package com.nq.ws;
-
-
-import com.nq.enums.EStockType;
-import com.nq.utils.PropertiesUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.java_websocket.client.WebSocketClient;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-@Slf4j
-@Configuration
-public class WebSocketClientBeanConfig {
-
-
- @Bean
- public Map<String, WebSocketClient> websocketRunClientMap() {
-
- Map<String, WebSocketClient> retMap = new HashMap<>(2);
- try {
- WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(PropertiesUtil.getProperty("IN_WS_URL")),EStockType.IN);
- websocketRunClient.connect();
- websocketRunClient.setConnectionLostTimeout(0);
- new Thread(() -> {
- while (true) {
- try {
- Thread.sleep(8000);
- websocketRunClient.send("heartbeat".getBytes());
- } catch (Exception e) {
- websocketRunClient.reconnect();
- websocketRunClient.setConnectionLostTimeout(0);
- }
- }
- }).start();
- } catch (Exception e) {
- }
-
-
-
- return retMap;
- }
-
-
-}
+//package com.nq.ws;
+//
+//
+//import com.nq.enums.EStockType;
+//import com.nq.utils.PropertiesUtil;
+//import lombok.extern.slf4j.Slf4j;
+//import org.java_websocket.client.WebSocketClient;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//
+//import java.net.URI;
+//import java.util.HashMap;
+//import java.util.Map;
+//
+//@Slf4j
+//@Configuration
+//public class WebSocketClientBeanConfig {
+//
+//
+// @Bean
+// public Map<String, WebSocketClient> websocketRunClientMap() {
+//
+// Map<String, WebSocketClient> retMap = new HashMap<>(2);
+// try {
+// WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(PropertiesUtil.getProperty("JS_IN_WS_URL")),EStockType.IN);
+// websocketRunClient.connect();
+// websocketRunClient.setConnectionLostTimeout(0);
+// new Thread(() -> {
+// while (true) {
+// try {
+// Thread.sleep(8000);
+// websocketRunClient.send("heartbeat".getBytes());
+// } catch (Exception e) {
+// websocketRunClient.reconnect();
+// websocketRunClient.setConnectionLostTimeout(0);
+// }
+// }
+// }).start();
+// } catch (Exception e) {
+// }
+//
+//
+//
+// return retMap;
+// }
+//
+//
+//}
diff --git a/src/main/java/com/nq/ws/WebsocketRunClient.java b/src/main/java/com/nq/ws/WebsocketRunClient.java
index 2030fdc..b782a65 100644
--- a/src/main/java/com/nq/ws/WebsocketRunClient.java
+++ b/src/main/java/com/nq/ws/WebsocketRunClient.java
@@ -1,54 +1,67 @@
-package com.nq.ws;
-
-import com.google.gson.Gson;
-import com.nq.enums.EStockType;
-import com.nq.pojo.StockRealTimeBean;
-import com.nq.service.IMandatoryLiquidationService;
-import com.nq.service.impl.MandatoryLiquidationService;
-import com.nq.utils.ApplicationContextRegisterUtil;
-import com.nq.utils.redis.RedisKeyUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-import org.springframework.context.ApplicationContext;
-
-import java.net.URI;
-
-@Slf4j
-public class WebsocketRunClient extends WebSocketClient {
-
- private EStockType eStockType;
- public WebsocketRunClient(URI serverUri,
- EStockType eStockType
- ) {
- super(serverUri);
- this.eStockType = eStockType;
- }
-
- @Override
- public void onOpen(ServerHandshake serverHandshake) {
- send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
- }
-
- @Override
- public void onMessage(String s) {
- try {
- ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
- MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
- StockRealTimeBean stockDetailBean = new Gson().fromJson(s, StockRealTimeBean.class);
- liquidationService.RealTimeDataProcess(eStockType,stockDetailBean);
- }catch (Exception e){
-
- }
- }
-
- @Override
- public void onClose(int i, String s, boolean b) {
- log.info("websocket 马来西亚 关闭"+1);
- }
-
- @Override
- public void onError(Exception e) {
- log.info("websocket 错误");
- }
-}
+//package com.nq.ws;
+//
+//import com.google.gson.Gson;
+//import com.google.gson.reflect.TypeToken;
+//import com.nq.enums.EStockType;
+//import com.nq.pojo.StockRealTimeBean;
+//import com.nq.service.IMandatoryLiquidationService;
+//import com.nq.service.impl.MandatoryLiquidationService;
+//import com.nq.utils.ApplicationContextRegisterUtil;
+//import com.nq.utils.redis.RedisKeyUtil;
+//import lombok.extern.slf4j.Slf4j;
+//import org.java_websocket.client.WebSocketClient;
+//import org.java_websocket.handshake.ServerHandshake;
+//import org.springframework.context.ApplicationContext;
+//
+//import java.lang.reflect.Type;
+//import java.net.URI;
+//import java.util.Map;
+//
+//@Slf4j
+//public class WebsocketRunClient extends WebSocketClient {
+//
+// private EStockType eStockType;
+// public WebsocketRunClient(URI serverUri,
+// EStockType eStockType
+// ) {
+// super(serverUri);
+// this.eStockType = eStockType;
+// }
+//
+// @Override
+// public void onOpen(ServerHandshake serverHandshake) {
+// send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
+// }
+//
+// @Override
+// public void onMessage(String s) {
+// Map<String, Object> map = jsonToMap(s);
+// if(map.get("pid").equals("00000001")){
+// System.out.println(s);
+// }
+// try {
+// ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
+// MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
+// StockRealTimeBean stockDetailBean = new Gson().fromJson(s, StockRealTimeBean.class);
+// liquidationService.RealTimeDataProcess(eStockType,stockDetailBean);
+// }catch (Exception e){
+//
+// }
+// }
+//
+// public static Map<String, Object> jsonToMap(String json) {
+// Gson gson = new Gson();
+// Type type = new TypeToken<Map<String, Object>>(){}.getType();
+// return gson.fromJson(json, type);
+// }
+//
+// @Override
+// public void onClose(int i, String s, boolean b) {
+// log.info("websocket 马来西亚 关闭"+1);
+// }
+//
+// @Override
+// public void onError(Exception e) {
+// log.info("websocket 错误");
+// }
+//}
diff --git a/src/main/java/com/nq/ws/WsClientConfig.java b/src/main/java/com/nq/ws/WsClientConfig.java
new file mode 100644
index 0000000..61a2f51
--- /dev/null
+++ b/src/main/java/com/nq/ws/WsClientConfig.java
@@ -0,0 +1,147 @@
+package com.nq.ws;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import com.nq.enums.EStockType;
+import com.nq.pojo.StockRealTimeBean;
+import com.nq.service.IMandatoryLiquidationService;
+import com.nq.service.impl.MandatoryLiquidationService;
+import com.nq.utils.ApplicationContextRegisterUtil;
+import com.nq.utils.PropertiesUtil;
+import com.nq.ws.client.IO;
+import com.nq.ws.client.Socket;
+import io.socket.emitter.Emitter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Configuration
+public class WsClientConfig {
+
+ private static final String SERVER_URL = PropertiesUtil.getProperty("JS_IN_HTTP_API");
+ private static final String AUTH_KEY = PropertiesUtil.getProperty("JS_IN_KEY");
+ private static final String ROOM_ID = "14";
+ private static Socket socket;
+
+ @Bean
+ public void websocketRunClientMap() {
+ connectToServer();
+ }
+
+ private static void connectToServer() {
+ IO.Options options = new IO.Options();
+ options.reconnection = true;
+ options.reconnectionDelay = 1000;
+
+ try {
+ socket = IO.socket(new URI(SERVER_URL), options);
+ } catch (URISyntaxException e) {
+ log.error("Invalid URI", e);
+ return;
+ }
+
+ socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
+ @Override
+ public void call(Object... args) {
+ authenticate();
+ }
+ });
+
+ socket.on("marketData", new Emitter.Listener() {
+ @Override
+ public void call(Object... args) {
+ Map<String, Object> map = jsonToMap(args[0].toString());
+ if(map.get("pid").equals("00000001")){
+ System.out.println("接收时间:" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + " " + args[0].toString());
+ }
+ try {
+ ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
+ MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
+ StockRealTimeBean stockDetailBean = new Gson().fromJson(args[0].toString(), StockRealTimeBean.class);
+ liquidationService.RealTimeDataProcess(EStockType.IN,stockDetailBean);
+ }catch (Exception e){
+ log.error("socket数据存入缓存错误:", e.getMessage());
+ }
+ }
+ });
+
+ socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
+ @Override
+ public void call(Object... args) {
+ log.error("socketIo连接错误: " + args[0]);
+ }
+ });
+
+ socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
+ @Override
+ public void call(Object... args) {
+ log.error("socketIo 断开连接: " + args[0]);
+ if (!socket.connected()) {
+ log.error("socketIo 开始重连: " + args[0]);
+ reconnect();
+ }
+ }
+ });
+
+ socket.connect();
+ }
+
+ public static Map<String, Object> jsonToMap(String json) {
+ Gson gson = new Gson();
+ Type type = new TypeToken<Map<String, Object>>(){}.getType();
+ return gson.fromJson(json, type);
+ }
+
+ private static void reconnect() {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!socket.connected()) {
+ try {
+ log.error("socketIo 开始重连");
+ socket.connect();
+ Thread.sleep(5000); // 重连间隔,单位毫秒
+ } catch (InterruptedException e) {
+ log.error("重连被中断", e);
+ }
+ }
+ }
+ }).start();
+ }
+
+ private static void authenticate() {
+ log.info("socketIo---->开始连接");
+ if (socket != null && socket.connected()) {
+ JSONObject authData = new JSONObject();
+ try {
+ authData.put("key", AUTH_KEY);
+ authData.put("roomId", ROOM_ID);
+ socket.emit("authenticate", authData);
+ log.info("socketIo---->连接成功");
+ } catch (JSONException e) {
+ log.error("socketIo认证错误:"+e.getMessage(), e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/nq/ws/backo/Backoff.java b/src/main/java/com/nq/ws/backo/Backoff.java
new file mode 100644
index 0000000..4f440af
--- /dev/null
+++ b/src/main/java/com/nq/ws/backo/Backoff.java
@@ -0,0 +1,66 @@
+package com.nq.ws.backo;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * Imported from https://github.com/mokesmokes/backo
+ */
+public class Backoff {
+
+ private long ms = 100;
+ private long max = 10000;
+ private int factor = 2;
+ private double jitter;
+ private int attempts;
+
+ public Backoff() {}
+
+ public long duration() {
+ BigInteger ms = BigInteger.valueOf(this.ms)
+ .multiply(BigInteger.valueOf(this.factor).pow(this.attempts++));
+ if (jitter != 0.0) {
+ double rand = Math.random();
+ BigInteger deviation = BigDecimal.valueOf(rand)
+ .multiply(BigDecimal.valueOf(jitter))
+ .multiply(new BigDecimal(ms)).toBigInteger();
+ ms = (((int) Math.floor(rand * 10)) & 1) == 0 ? ms.subtract(deviation) : ms.add(deviation);
+ }
+ return ms
+ .min(BigInteger.valueOf(this.max))
+ .max(BigInteger.valueOf(this.ms))
+ .longValue();
+ }
+
+ public void reset() {
+ this.attempts = 0;
+ }
+
+ public Backoff setMin(long min) {
+ this.ms = min;
+ return this;
+ }
+
+ public Backoff setMax(long max) {
+ this.max = max;
+ return this;
+ }
+
+ public Backoff setFactor(int factor) {
+ this.factor = factor;
+ return this;
+ }
+
+ public Backoff setJitter(double jitter) {
+ boolean isValid = jitter >= 0 && jitter < 1;
+ if (!isValid) {
+ throw new IllegalArgumentException("jitter must be between 0 and 1");
+ }
+ this.jitter = jitter;
+ return this;
+ }
+
+ public int getAttempts() {
+ return this.attempts;
+ }
+}
diff --git a/src/main/java/com/nq/ws/client/Ack.java b/src/main/java/com/nq/ws/client/Ack.java
new file mode 100644
index 0000000..d055cfc
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Ack.java
@@ -0,0 +1,11 @@
+package com.nq.ws.client;
+
+/**
+ * Acknowledgement.
+ */
+public interface Ack {
+
+ void call(Object... args);
+
+}
+
diff --git a/src/main/java/com/nq/ws/client/AckWithTimeout.java b/src/main/java/com/nq/ws/client/AckWithTimeout.java
new file mode 100644
index 0000000..e7374ff
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/AckWithTimeout.java
@@ -0,0 +1,35 @@
+package com.nq.ws.client;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public abstract class AckWithTimeout implements Ack {
+ private final long timeout;
+ private final Timer timer = new Timer();
+
+ /**
+ *
+ * @param timeout delay in milliseconds
+ */
+ public AckWithTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ @Override
+ public final void call(Object... args) {
+ this.timer.cancel();
+ this.onSuccess(args);
+ }
+
+ public final void schedule(TimerTask task) {
+ this.timer.schedule(task, this.timeout);
+ }
+
+ public final void cancelTimer() {
+ this.timer.cancel();
+ }
+
+ public abstract void onSuccess(Object... args);
+ public abstract void onTimeout();
+
+}
diff --git a/src/main/java/com/nq/ws/client/IO.java b/src/main/java/com/nq/ws/client/IO.java
new file mode 100644
index 0000000..ae4abc7
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/IO.java
@@ -0,0 +1,118 @@
+package com.nq.ws.client;
+
+
+import com.nq.ws.parser.Parser;
+import okhttp3.Call;
+import okhttp3.WebSocket;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+public class IO {
+
+ private static final Logger logger = Logger.getLogger(IO.class.getName());
+
+ private static final ConcurrentHashMap<String, Manager> managers = new ConcurrentHashMap<>();
+
+ /**
+ * Protocol version.
+ */
+ public static int protocol = Parser.protocol;
+
+ public static void setDefaultOkHttpWebSocketFactory(WebSocket.Factory factory) {
+ Manager.defaultWebSocketFactory = factory;
+ }
+
+ public static void setDefaultOkHttpCallFactory(Call.Factory factory) {
+ Manager.defaultCallFactory = factory;
+ }
+
+ private IO() {}
+
+ public static Socket socket(String uri) throws URISyntaxException {
+ return socket(uri, null);
+ }
+
+ public static Socket socket(String uri, Options opts) throws URISyntaxException {
+ return socket(new URI(uri), opts);
+ }
+
+ public static Socket socket(URI uri) {
+ return socket(uri, null);
+ }
+
+ /**
+ * Initializes a {@link Socket} from an existing {@link Manager} for multiplexing.
+ *
+ * @param uri uri to connect.
+ * @param opts options for socket.
+ * @return {@link Socket} instance.
+ */
+ public static Socket socket(URI uri, Options opts) {
+ if (opts == null) {
+ opts = new Options();
+ }
+
+ Url.ParsedURI parsed = Url.parse(uri);
+ URI source = parsed.uri;
+ String id = parsed.id;
+
+ boolean sameNamespace = managers.containsKey(id)
+ && managers.get(id).nsps.containsKey(source.getPath());
+ boolean newConnection = opts.forceNew || !opts.multiplex || sameNamespace;
+ Manager io;
+
+ String query = source.getQuery();
+ if (query != null && (opts.query == null || opts.query.isEmpty())) {
+ opts.query = query;
+ }
+
+ if (newConnection) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("ignoring socket cache for %s", source));
+ }
+ io = new Manager(source, opts);
+ } else {
+ if (!managers.containsKey(id)) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("new io instance for %s", source));
+ }
+ managers.putIfAbsent(id, new Manager(source, opts));
+ }
+ io = managers.get(id);
+ }
+
+ return io.socket(source.getPath(), opts);
+ }
+
+
+ public static class Options extends Manager.Options {
+
+ public boolean forceNew;
+
+ /**
+ * Whether to enable multiplexing. Default is true.
+ */
+ public boolean multiplex = true;
+
+ /**
+ * <p>
+ * Retrieve new builder class that helps creating socket option as builder pattern.
+ * This method returns exactly same result as :
+ * </p>
+ * <code>
+ * SocketOptionBuilder builder = SocketOptionBuilder.builder();
+ * </code>
+ *
+ * @return builder class that helps creating socket option as builder pattern.
+ * @see SocketOptionBuilder#builder()
+ */
+ public static SocketOptionBuilder builder() {
+ return SocketOptionBuilder.builder();
+ }
+ }
+}
diff --git a/src/main/java/com/nq/ws/client/Manager.java b/src/main/java/com/nq/ws/client/Manager.java
new file mode 100644
index 0000000..75e2562
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Manager.java
@@ -0,0 +1,575 @@
+package com.nq.ws.client;
+
+import com.nq.ws.backo.Backoff;
+import com.nq.ws.parser.DecodingException;
+import com.nq.ws.parser.IOParser;
+import com.nq.ws.parser.Packet;
+import com.nq.ws.parser.Parser;
+import io.socket.emitter.Emitter;
+import io.socket.thread.EventThread;
+import okhttp3.Call;
+import okhttp3.WebSocket;
+
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manager class represents a connection to a given Socket.IO server.
+ */
+public class Manager extends Emitter {
+
+ private static final Logger logger = Logger.getLogger(Manager.class.getName());
+
+ /*package*/ enum ReadyState {
+ CLOSED, OPENING, OPEN
+ }
+
+ /**
+ * Called on a successful connection.
+ */
+ public static final String EVENT_OPEN = "open";
+
+ /**
+ * Called on a disconnection.
+ */
+ public static final String EVENT_CLOSE = "close";
+
+ public static final String EVENT_PACKET = "packet";
+ public static final String EVENT_ERROR = "error";
+
+ /**
+ * Called on a successful reconnection.
+ */
+ public static final String EVENT_RECONNECT = "reconnect";
+
+ /**
+ * Called on a reconnection attempt error.
+ */
+ public static final String EVENT_RECONNECT_ERROR = "reconnect_error";
+
+ public static final String EVENT_RECONNECT_FAILED = "reconnect_failed";
+
+ public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";
+
+ /**
+ * Called when a new transport is created. (experimental)
+ */
+ public static final String EVENT_TRANSPORT = Engine.EVENT_TRANSPORT;
+
+ /*package*/ static WebSocket.Factory defaultWebSocketFactory;
+ /*package*/ static Call.Factory defaultCallFactory;
+
+ /*package*/ ReadyState readyState;
+
+ private boolean _reconnection;
+ private boolean skipReconnect;
+ private boolean reconnecting;
+ private boolean encoding;
+ private int _reconnectionAttempts;
+ private long _reconnectionDelay;
+ private long _reconnectionDelayMax;
+ private double _randomizationFactor;
+ private Backoff backoff;
+ private long _timeout;
+ private URI uri;
+ private List<Packet> packetBuffer;
+ private Queue<On.Handle> subs;
+ private Options opts;
+ /*package*/ io.socket.engineio.client.Socket engine;
+ private Parser.Encoder encoder;
+ private Parser.Decoder decoder;
+
+ /**
+ * This HashMap can be accessed from outside of EventThread.
+ */
+ /*package*/ ConcurrentHashMap<String, Socket> nsps;
+
+
+ public Manager() {
+ this(null, null);
+ }
+
+ public Manager(URI uri) {
+ this(uri, null);
+ }
+
+ public Manager(Options opts) {
+ this(null, opts);
+ }
+
+ public Manager(URI uri, Options opts) {
+ if (opts == null) {
+ opts = new Options();
+ }
+ if (opts.path == null) {
+ opts.path = "/socket.io";
+ }
+ if (opts.webSocketFactory == null) {
+ opts.webSocketFactory = defaultWebSocketFactory;
+ }
+ if (opts.callFactory == null) {
+ opts.callFactory = defaultCallFactory;
+ }
+ this.opts = opts;
+ this.nsps = new ConcurrentHashMap<>();
+ this.subs = new LinkedList<>();
+ this.reconnection(opts.reconnection);
+ this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
+ this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
+ this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
+ this.randomizationFactor(opts.randomizationFactor != 0.0 ? opts.randomizationFactor : 0.5);
+ this.backoff = new Backoff()
+ .setMin(this.reconnectionDelay())
+ .setMax(this.reconnectionDelayMax())
+ .setJitter(this.randomizationFactor());
+ this.timeout(opts.timeout);
+ this.readyState = ReadyState.CLOSED;
+ this.uri = uri;
+ this.encoding = false;
+ this.packetBuffer = new ArrayList<>();
+ this.encoder = opts.encoder != null ? opts.encoder : new IOParser.Encoder();
+ this.decoder = opts.decoder != null ? opts.decoder : new IOParser.Decoder();
+ }
+
+ public boolean reconnection() {
+ return this._reconnection;
+ }
+
+ public Manager reconnection(boolean v) {
+ this._reconnection = v;
+ return this;
+ }
+
+ public boolean isReconnecting() {
+ return reconnecting;
+ }
+
+ public int reconnectionAttempts() {
+ return this._reconnectionAttempts;
+ }
+
+ public Manager reconnectionAttempts(int v) {
+ this._reconnectionAttempts = v;
+ return this;
+ }
+
+ public final long reconnectionDelay() {
+ return this._reconnectionDelay;
+ }
+
+ public Manager reconnectionDelay(long v) {
+ this._reconnectionDelay = v;
+ if (this.backoff != null) {
+ this.backoff.setMin(v);
+ }
+ return this;
+ }
+
+ public final double randomizationFactor() {
+ return this._randomizationFactor;
+ }
+
+ public Manager randomizationFactor(double v) {
+ this._randomizationFactor = v;
+ if (this.backoff != null) {
+ this.backoff.setJitter(v);
+ }
+ return this;
+ }
+
+ public final long reconnectionDelayMax() {
+ return this._reconnectionDelayMax;
+ }
+
+ public Manager reconnectionDelayMax(long v) {
+ this._reconnectionDelayMax = v;
+ if (this.backoff != null) {
+ this.backoff.setMax(v);
+ }
+ return this;
+ }
+
+ public long timeout() {
+ return this._timeout;
+ }
+
+ public Manager timeout(long v) {
+ this._timeout = v;
+ return this;
+ }
+
+ private void maybeReconnectOnOpen() {
+ // Only try to reconnect if it's the first time we're connecting
+ if (!this.reconnecting && this._reconnection && this.backoff.getAttempts() == 0) {
+ this.reconnect();
+ }
+ }
+
+ public Manager open(){
+ return open(null);
+ }
+
+ /**
+ * Connects the client.
+ *
+ * @param fn callback.
+ * @return a reference to this object.
+ */
+ public Manager open(final OpenCallback fn) {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("readyState %s", Manager.this.readyState));
+ }
+ if (Manager.this.readyState == ReadyState.OPEN || Manager.this.readyState == ReadyState.OPENING) return;
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("opening %s", Manager.this.uri));
+ }
+ Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
+ final io.socket.engineio.client.Socket socket = Manager.this.engine;
+ final Manager self = Manager.this;
+ Manager.this.readyState = ReadyState.OPENING;
+ Manager.this.skipReconnect = false;
+
+ // propagate transport event.
+ socket.on(Engine.EVENT_TRANSPORT, new Listener() {
+ @Override
+ public void call(Object... args) {
+ self.emit(Manager.EVENT_TRANSPORT, args);
+ }
+ });
+
+ final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
+ @Override
+ public void call(Object... objects) {
+ self.onopen();
+ if (fn != null) fn.call(null);
+ }
+ });
+
+ On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
+ @Override
+ public void call(Object... objects) {
+ Object data = objects.length > 0 ? objects[0] : null;
+ logger.fine("connect_error");
+ self.cleanup();
+ self.readyState = ReadyState.CLOSED;
+ self.emit(EVENT_ERROR, data);
+ if (fn != null) {
+ Exception err = new SocketIOException("Connection error",
+ data instanceof Exception ? (Exception) data : null);
+ fn.call(err);
+ } else {
+ // Only do this if there is no fn to handle the error
+ self.maybeReconnectOnOpen();
+ }
+ }
+ });
+
+ final long timeout = Manager.this._timeout;
+ final Runnable onTimeout = new Runnable() {
+ @Override
+ public void run() {
+ logger.fine(String.format("connect attempt timed out after %d", timeout));
+ openSub.destroy();
+ socket.close();
+ socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
+ }
+ };
+
+ if (timeout == 0) {
+ EventThread.exec(onTimeout);
+ return;
+ } else if (Manager.this._timeout > 0) {
+ logger.fine(String.format("connection attempt will timeout after %d", timeout));
+
+ final Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ EventThread.exec(onTimeout);
+ }
+ }, timeout);
+
+ Manager.this.subs.add(new On.Handle() {
+ @Override
+ public void destroy() {
+ timer.cancel();
+ }
+ });
+ }
+
+ Manager.this.subs.add(openSub);
+ Manager.this.subs.add(errorSub);
+
+ Manager.this.engine.open();
+ }
+ });
+ return this;
+ }
+
+ private void onopen() {
+ logger.fine("open");
+
+ this.cleanup();
+
+ this.readyState = ReadyState.OPEN;
+ this.emit(EVENT_OPEN);
+
+ final io.socket.engineio.client.Socket socket = this.engine;
+ this.subs.add(On.on(socket, Engine.EVENT_DATA, new Listener() {
+ @Override
+ public void call(Object... objects) {
+ Object data = objects[0];
+ try {
+ if (data instanceof String) {
+ Manager.this.decoder.add((String) data);
+ } else if (data instanceof byte[]) {
+ Manager.this.decoder.add((byte[]) data);
+ }
+ } catch (DecodingException e) {
+ logger.fine("error while decoding the packet: " + e.getMessage());
+ }
+ }
+ }));
+ this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
+ @Override
+ public void call(Object... objects) {
+ Manager.this.onerror((Exception)objects[0]);
+ }
+ }));
+ this.subs.add(On.on(socket, Engine.EVENT_CLOSE, new Listener() {
+ @Override
+ public void call(Object... objects) {
+ Manager.this.onclose((String)objects[0]);
+ }
+ }));
+ this.decoder.onDecoded(new Parser.Decoder.Callback() {
+ @Override
+ public void call (Packet packet) {
+ Manager.this.ondecoded(packet);
+ }
+ });
+ }
+
+ private void ondecoded(Packet packet) {
+ this.emit(EVENT_PACKET, packet);
+ }
+
+ private void onerror(Exception err) {
+ logger.log(Level.FINE, "error", err);
+ this.emit(EVENT_ERROR, err);
+ }
+
+ /**
+ * Initializes {@link Socket} instances for each namespaces.
+ *
+ * @param nsp namespace.
+ * @param opts options.
+ * @return a socket instance for the namespace.
+ */
+ public Socket socket(final String nsp, Options opts) {
+ synchronized (this.nsps) {
+ Socket socket = this.nsps.get(nsp);
+ if (socket == null) {
+ socket = new Socket(this, nsp, opts);
+ this.nsps.put(nsp, socket);
+ }
+ return socket;
+ }
+ }
+
+ public Socket socket(String nsp) {
+ return socket(nsp, null);
+ }
+
+ /*package*/ void destroy() {
+ synchronized (this.nsps) {
+ for (Socket socket : this.nsps.values()) {
+ if (socket.isActive()) {
+ logger.fine("socket is still active, skipping close");
+ return;
+ }
+ }
+
+ this.close();
+ }
+ }
+
+ /*package*/ void packet(Packet packet) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("writing packet %s", packet));
+ }
+ final Manager self = this;
+
+ if (!self.encoding) {
+ self.encoding = true;
+ this.encoder.encode(packet, new Parser.Encoder.Callback() {
+ @Override
+ public void call(Object[] encodedPackets) {
+ for (Object packet : encodedPackets) {
+ if (packet instanceof String) {
+ self.engine.write((String)packet);
+ } else if (packet instanceof byte[]) {
+ self.engine.write((byte[])packet);
+ }
+ }
+ self.encoding = false;
+ self.processPacketQueue();
+ }
+ });
+ } else {
+ self.packetBuffer.add(packet);
+ }
+ }
+
+ private void processPacketQueue() {
+ if (!this.packetBuffer.isEmpty() && !this.encoding) {
+ Packet pack = this.packetBuffer.remove(0);
+ this.packet(pack);
+ }
+ }
+
+ private void cleanup() {
+ logger.fine("cleanup");
+
+ On.Handle sub;
+ while ((sub = this.subs.poll()) != null) sub.destroy();
+ this.decoder.onDecoded(null);
+
+ this.packetBuffer.clear();
+ this.encoding = false;
+
+ this.decoder.destroy();
+ }
+
+ /*package*/ void close() {
+ logger.fine("disconnect");
+ this.skipReconnect = true;
+ this.reconnecting = false;
+ if (this.readyState != ReadyState.OPEN) {
+ // `onclose` will not fire because
+ // an open event never happened
+ this.cleanup();
+ }
+ this.backoff.reset();
+ this.readyState = ReadyState.CLOSED;
+ if (this.engine != null) {
+ this.engine.close();
+ }
+ }
+
+ private void onclose(String reason) {
+ logger.fine("onclose");
+ this.cleanup();
+ this.backoff.reset();
+ this.readyState = ReadyState.CLOSED;
+ this.emit(EVENT_CLOSE, reason);
+
+ if (this._reconnection && !this.skipReconnect) {
+ this.reconnect();
+ }
+ }
+
+ private void reconnect() {
+ if (this.reconnecting || this.skipReconnect) return;
+
+ final Manager self = this;
+
+ if (this.backoff.getAttempts() >= this._reconnectionAttempts) {
+ logger.fine("reconnect failed");
+ this.backoff.reset();
+ this.emit(EVENT_RECONNECT_FAILED);
+ this.reconnecting = false;
+ } else {
+ long delay = this.backoff.duration();
+ logger.fine(String.format("will wait %dms before reconnect attempt", delay));
+
+ this.reconnecting = true;
+ final Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ if (self.skipReconnect) return;
+
+ logger.fine("attempting reconnect");
+ int attempts = self.backoff.getAttempts();
+ self.emit(EVENT_RECONNECT_ATTEMPT, attempts);
+
+ // check again for the case socket closed in above events
+ if (self.skipReconnect) return;
+
+ self.open(new OpenCallback() {
+ @Override
+ public void call(Exception err) {
+ if (err != null) {
+ logger.fine("reconnect attempt error");
+ self.reconnecting = false;
+ self.reconnect();
+ self.emit(EVENT_RECONNECT_ERROR, err);
+ } else {
+ logger.fine("reconnect success");
+ self.onreconnect();
+ }
+ }
+ });
+ }
+ });
+ }
+ }, delay);
+
+ this.subs.add(new On.Handle() {
+ @Override
+ public void destroy() {
+ timer.cancel();
+ }
+ });
+ }
+ }
+
+ private void onreconnect() {
+ int attempts = this.backoff.getAttempts();
+ this.reconnecting = false;
+ this.backoff.reset();
+ this.emit(EVENT_RECONNECT, attempts);
+ }
+
+
+ public interface OpenCallback {
+
+ void call(Exception err);
+ }
+
+
+ private static class Engine extends io.socket.engineio.client.Socket {
+
+ Engine(URI uri, Options opts) {
+ super(uri, opts);
+ }
+ }
+
+ public static class Options extends io.socket.engineio.client.Socket.Options {
+
+ public boolean reconnection = true;
+ public int reconnectionAttempts;
+ public long reconnectionDelay;
+ public long reconnectionDelayMax;
+ public double randomizationFactor;
+ public Parser.Encoder encoder;
+ public Parser.Decoder decoder;
+ public Map<String, String> auth;
+
+ /**
+ * Connection timeout (ms). Set -1 to disable.
+ */
+ public long timeout = 20000;
+ }
+}
diff --git a/src/main/java/com/nq/ws/client/On.java b/src/main/java/com/nq/ws/client/On.java
new file mode 100644
index 0000000..2cf4dad
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/On.java
@@ -0,0 +1,23 @@
+package com.nq.ws.client;
+
+import io.socket.emitter.Emitter;
+
+public class On {
+
+ private On() {}
+
+ public static Handle on(final Emitter obj, final String ev, final Emitter.Listener fn) {
+ obj.on(ev, fn);
+ return new Handle() {
+ @Override
+ public void destroy() {
+ obj.off(ev, fn);
+ }
+ };
+ }
+
+ public interface Handle {
+
+ void destroy();
+ }
+}
diff --git a/src/main/java/com/nq/ws/client/Socket.java b/src/main/java/com/nq/ws/client/Socket.java
new file mode 100644
index 0000000..facda91
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Socket.java
@@ -0,0 +1,574 @@
+package com.nq.ws.client;
+
+import com.nq.ws.parser.Packet;
+import com.nq.ws.parser.Parser;
+import io.socket.emitter.Emitter;
+import io.socket.thread.EventThread;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The socket class for Socket.IO Client.
+ */
+public class Socket extends Emitter {
+
+ private static final Logger logger = Logger.getLogger(Socket.class.getName());
+
+ /**
+ * Called on a connection.
+ */
+ public static final String EVENT_CONNECT = "connect";
+
+ /**
+ * Called on a disconnection.
+ */
+ public static final String EVENT_DISCONNECT = "disconnect";
+
+ /**
+ * Called on a connection error.
+ *
+ * <p>Parameters:</p>
+ * <ul>
+ * <li>(Exception) error data.</li>
+ * </ul>
+ */
+ public static final String EVENT_CONNECT_ERROR = "connect_error";
+
+ static final String EVENT_MESSAGE = "message";
+
+ protected static Map<String, Integer> RESERVED_EVENTS = new HashMap<String, Integer>() {{
+ put(EVENT_CONNECT, 1);
+ put(EVENT_CONNECT_ERROR, 1);
+ put(EVENT_DISCONNECT, 1);
+ // used on the server-side
+ put("disconnecting", 1);
+ put("newListener", 1);
+ put("removeListener", 1);
+ }};
+
+ /*package*/ String id;
+
+ private volatile boolean connected;
+ private int ids;
+ private String nsp;
+ private Manager io;
+ private Map<String, String> auth;
+ private Map<Integer, Ack> acks = new HashMap<>();
+ private Queue<On.Handle> subs;
+ private final Queue<List<Object>> receiveBuffer = new LinkedList<>();
+ private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<>();
+
+ private ConcurrentLinkedQueue<Listener> onAnyIncomingListeners = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Listener> onAnyOutgoingListeners = new ConcurrentLinkedQueue<>();
+
+ public Socket(Manager io, String nsp, Manager.Options opts) {
+ this.io = io;
+ this.nsp = nsp;
+ if (opts != null) {
+ this.auth = opts.auth;
+ }
+ }
+
+ private void subEvents() {
+ if (this.subs != null) return;
+
+ final Manager io = Socket.this.io;
+ Socket.this.subs = new LinkedList<On.Handle>() {{
+ add(On.on(io, Manager.EVENT_OPEN, new Listener() {
+ @Override
+ public void call(Object... args) {
+ Socket.this.onopen();
+ }
+ }));
+ add(On.on(io, Manager.EVENT_PACKET, new Listener() {
+ @Override
+ public void call(Object... args) {
+ Socket.this.onpacket((Packet<?>) args[0]);
+ }
+ }));
+ add(On.on(io, Manager.EVENT_ERROR, new Listener() {
+ @Override
+ public void call(Object... args) {
+ if (!Socket.this.connected) {
+ Socket.super.emit(EVENT_CONNECT_ERROR, args[0]);
+ }
+ }
+ }));
+ add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
+ @Override
+ public void call(Object... args) {
+ Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
+ }
+ }));
+ }};
+ }
+
+ public boolean isActive() {
+ return this.subs != null;
+ }
+
+ /**
+ * Connects the socket.
+ */
+ public Socket open() {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ if (Socket.this.connected || Socket.this.io.isReconnecting()) return;
+
+ Socket.this.subEvents();
+ Socket.this.io.open(); // ensure open
+ if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
+ }
+ });
+ return this;
+ }
+
+ /**
+ * Connects the socket.
+ */
+ public Socket connect() {
+ return this.open();
+ }
+
+ /**
+ * Send messages.
+ *
+ * @param args data to send.
+ * @return a reference to this object.
+ */
+ public Socket send(final Object... args) {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ Socket.this.emit(EVENT_MESSAGE, args);
+ }
+ });
+ return this;
+ }
+
+ /**
+ * Emits an event. When you pass {@link Ack} at the last argument, then the acknowledge is done.
+ *
+ * @param event an event name.
+ * @param args data to send.
+ * @return a reference to this object.
+ */
+ @Override
+ public Emitter emit(final String event, final Object... args) {
+ if (RESERVED_EVENTS.containsKey(event)) {
+ throw new RuntimeException("'" + event + "' is a reserved event name");
+ }
+
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ Ack ack;
+ Object[] _args;
+ int lastIndex = args.length - 1;
+
+ if (args.length > 0 && args[lastIndex] instanceof Ack) {
+ _args = new Object[lastIndex];
+ for (int i = 0; i < lastIndex; i++) {
+ _args[i] = args[i];
+ }
+ ack = (Ack) args[lastIndex];
+ } else {
+ _args = args;
+ ack = null;
+ }
+
+ emit(event, _args, ack);
+ }
+ });
+ return this;
+ }
+
+ /**
+ * Emits an event with an acknowledge.
+ *
+ * @param event an event name
+ * @param args data to send.
+ * @param ack the acknowledgement to be called
+ * @return a reference to this object.
+ */
+ public Emitter emit(final String event, final Object[] args, final Ack ack) {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ JSONArray jsonArgs = new JSONArray();
+ jsonArgs.put(event);
+
+ if (args != null) {
+ for (Object arg : args) {
+ jsonArgs.put(arg);
+ }
+ }
+
+ Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);
+
+ if (ack != null) {
+ final int ackId = Socket.this.ids;
+
+ logger.fine(String.format("emitting packet with ack id %d", ackId));
+
+ if (ack instanceof AckWithTimeout) {
+ final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack;
+ ackWithTimeout.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ // remove the ack from the map (to prevent an actual acknowledgement)
+ acks.remove(ackId);
+
+ // remove the packet from the buffer (if applicable)
+ Iterator<Packet<JSONArray>> iterator = sendBuffer.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().id == ackId) {
+ iterator.remove();
+ }
+ }
+
+ ackWithTimeout.onTimeout();
+ }
+ });
+ }
+
+ Socket.this.acks.put(ackId, ack);
+ packet.id = ids++;
+ }
+
+ if (Socket.this.connected) {
+ Socket.this.packet(packet);
+ } else {
+ Socket.this.sendBuffer.add(packet);
+ }
+ }
+ });
+ return this;
+ }
+
+ private void packet(Packet packet) {
+ if (packet.type == Parser.EVENT) {
+ if (!onAnyOutgoingListeners.isEmpty()) {
+ Object[] argsAsArray = toArray((JSONArray) packet.data);
+ for (Listener listener : onAnyOutgoingListeners) {
+ listener.call(argsAsArray);
+ }
+ }
+ }
+ packet.nsp = this.nsp;
+ this.io.packet(packet);
+ }
+
+ private void onopen() {
+ logger.fine("transport is open - connecting");
+
+ if (this.auth != null) {
+ this.packet(new Packet<>(Parser.CONNECT, new JSONObject(this.auth)));
+ } else {
+ this.packet(new Packet<>(Parser.CONNECT));
+ }
+ }
+
+ private void onclose(String reason) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("close (%s)", reason));
+ }
+ this.connected = false;
+ this.id = null;
+ super.emit(EVENT_DISCONNECT, reason);
+ }
+
+ private void onpacket(Packet<?> packet) {
+ if (!this.nsp.equals(packet.nsp)) return;
+
+ switch (packet.type) {
+ case Parser.CONNECT: {
+ if (packet.data instanceof JSONObject && ((JSONObject) packet.data).has("sid")) {
+ try {
+ this.onconnect(((JSONObject) packet.data).getString("sid"));
+ return;
+ } catch (JSONException e) {}
+ } else {
+ super.emit(EVENT_CONNECT_ERROR, new SocketIOException("It seems you are trying to reach a Socket.IO server in v2.x with a v3.x client, which is not possible"));
+ }
+ break;
+ }
+
+ case Parser.EVENT: {
+ @SuppressWarnings("unchecked")
+ Packet<JSONArray> p = (Packet<JSONArray>) packet;
+ this.onevent(p);
+ break;
+ }
+
+ case Parser.BINARY_EVENT: {
+ @SuppressWarnings("unchecked")
+ Packet<JSONArray> p = (Packet<JSONArray>) packet;
+ this.onevent(p);
+ break;
+ }
+
+ case Parser.ACK: {
+ @SuppressWarnings("unchecked")
+ Packet<JSONArray> p = (Packet<JSONArray>) packet;
+ this.onack(p);
+ break;
+ }
+
+ case Parser.BINARY_ACK: {
+ @SuppressWarnings("unchecked")
+ Packet<JSONArray> p = (Packet<JSONArray>) packet;
+ this.onack(p);
+ break;
+ }
+
+ case Parser.DISCONNECT:
+ this.ondisconnect();
+ break;
+
+ case Parser.CONNECT_ERROR:
+ this.destroy();
+ super.emit(EVENT_CONNECT_ERROR, packet.data);
+ break;
+ }
+ }
+
+ private void onevent(Packet<JSONArray> packet) {
+ List<Object> args = new ArrayList<>(Arrays.asList(toArray(packet.data)));
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("emitting event %s", args));
+ }
+
+ if (packet.id >= 0) {
+ logger.fine("attaching ack callback to event");
+ args.add(this.ack(packet.id));
+ }
+
+ if (this.connected) {
+ if (args.isEmpty()) return;
+ if (!this.onAnyIncomingListeners.isEmpty()) {
+ Object[] argsAsArray = args.toArray();
+ for (Listener listener : this.onAnyIncomingListeners) {
+ listener.call(argsAsArray);
+ }
+ }
+ String event = args.remove(0).toString();
+ super.emit(event, args.toArray());
+ } else {
+ this.receiveBuffer.add(args);
+ }
+ }
+
+ private Ack ack(final int id) {
+ final Socket self = this;
+ final boolean[] sent = new boolean[] {false};
+ return new Ack() {
+ @Override
+ public void call(final Object... args) {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ if (sent[0]) return;
+ sent[0] = true;
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("sending ack %s", args.length != 0 ? args : null));
+ }
+
+ JSONArray jsonArgs = new JSONArray();
+ for (Object arg : args) {
+ jsonArgs.put(arg);
+ }
+
+ Packet<JSONArray> packet = new Packet<>(Parser.ACK, jsonArgs);
+ packet.id = id;
+ self.packet(packet);
+ }
+ });
+ }
+ };
+ }
+
+ private void onack(Packet<JSONArray> packet) {
+ Ack fn = this.acks.remove(packet.id);
+ if (fn != null) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("calling ack %s with %s", packet.id, packet.data));
+ }
+ fn.call(toArray(packet.data));
+ } else {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("bad ack %s", packet.id));
+ }
+ }
+ }
+
+ private void onconnect(String id) {
+ this.connected = true;
+ this.id = id;
+ this.emitBuffered();
+ super.emit(EVENT_CONNECT);
+ }
+
+ private void emitBuffered() {
+ List<Object> data;
+ while ((data = this.receiveBuffer.poll()) != null) {
+ String event = (String)data.get(0);
+ super.emit(event, data.toArray());
+ }
+ this.receiveBuffer.clear();
+
+ Packet<JSONArray> packet;
+ while ((packet = this.sendBuffer.poll()) != null) {
+ this.packet(packet);
+ }
+ this.sendBuffer.clear();
+ }
+
+ private void ondisconnect() {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("server disconnect (%s)", this.nsp));
+ }
+ this.destroy();
+ this.onclose("io server disconnect");
+ }
+
+ private void destroy() {
+ if (this.subs != null) {
+ // clean subscriptions to avoid reconnection
+ for (On.Handle sub : this.subs) {
+ sub.destroy();
+ }
+ this.subs = null;
+ }
+
+ for (Ack ack : acks.values()) {
+ if (ack instanceof AckWithTimeout) {
+ ((AckWithTimeout) ack).cancelTimer();
+ }
+ }
+
+ this.io.destroy();
+ }
+
+ /**
+ * Disconnects the socket.
+ *
+ * @return a reference to this object.
+ */
+ public Socket close() {
+ EventThread.exec(new Runnable() {
+ @Override
+ public void run() {
+ if (Socket.this.connected) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp));
+ }
+ Socket.this.packet(new Packet(Parser.DISCONNECT));
+ }
+
+ Socket.this.destroy();
+
+ if (Socket.this.connected) {
+ Socket.this.onclose("io client disconnect");
+ }
+ }
+ });
+ return this;
+ }
+
+ /**
+ * Disconnects the socket.
+ *
+ * @return a reference to this object.
+ */
+ public Socket disconnect() {
+ return this.close();
+ }
+
+ public Manager io() {
+ return this.io;
+ }
+
+ public boolean connected() {
+ return this.connected;
+ }
+
+ /**
+ * A property on the socket instance that is equal to the underlying engine.io socket id.
+ *
+ * The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
+ *
+ * @return a socket id
+ */
+ public String id() {
+ return this.id;
+ }
+
+ private static Object[] toArray(JSONArray array) {
+ int length = array.length();
+ Object[] data = new Object[length];
+ for (int i = 0; i < length; i++) {
+ Object v;
+ try {
+ v = array.get(i);
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while retrieving data from JSONArray", e);
+ v = null;
+ }
+ data[i] = JSONObject.NULL.equals(v) ? null : v;
+ }
+ return data;
+ }
+
+ public Socket onAnyIncoming(Listener fn) {
+ this.onAnyIncomingListeners.add(fn);
+ return this;
+ }
+
+ public Socket offAnyIncoming() {
+ this.onAnyIncomingListeners.clear();
+ return this;
+ }
+
+ public Socket offAnyIncoming(Listener fn) {
+ Iterator<Listener> it = this.onAnyIncomingListeners.iterator();
+ while (it.hasNext()) {
+ Listener listener = it.next();
+ if (listener == fn) {
+ it.remove();
+ break;
+ }
+ }
+ return this;
+ }
+
+ public Socket onAnyOutgoing(Listener fn) {
+ this.onAnyOutgoingListeners.add(fn);
+ return this;
+ }
+
+ public Socket offAnyOutgoing() {
+ this.onAnyOutgoingListeners.clear();
+ return this;
+ }
+
+ public Socket offAnyOutgoing(Listener fn) {
+ Iterator<Listener> it = this.onAnyOutgoingListeners.iterator();
+ while (it.hasNext()) {
+ Listener listener = it.next();
+ if (listener == fn) {
+ it.remove();
+ break;
+ }
+ }
+ return this;
+ }
+}
+
diff --git a/src/main/java/com/nq/ws/client/SocketIOException.java b/src/main/java/com/nq/ws/client/SocketIOException.java
new file mode 100644
index 0000000..0808387
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/SocketIOException.java
@@ -0,0 +1,20 @@
+package com.nq.ws.client;
+
+public class SocketIOException extends Exception {
+
+ public SocketIOException() {
+ super();
+ }
+
+ public SocketIOException(String message) {
+ super(message);
+ }
+
+ public SocketIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SocketIOException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/com/nq/ws/client/SocketOptionBuilder.java b/src/main/java/com/nq/ws/client/SocketOptionBuilder.java
new file mode 100644
index 0000000..6df7eab
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/SocketOptionBuilder.java
@@ -0,0 +1,196 @@
+package com.nq.ws.client;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Convenient builder class that helps creating
+ * {@link io.socket.client.IO.Options Client Option} object as builder pattern.
+ * Finally, you can get option object with call {@link #build()} method.
+ *
+ * @author junbong
+ */
+public class SocketOptionBuilder {
+ /**
+ * Construct new builder with default preferences.
+ *
+ * @return new builder object
+ * @see SocketOptionBuilder#builder(IO.Options)
+ */
+ public static SocketOptionBuilder builder() {
+ return new SocketOptionBuilder();
+ }
+
+
+ /**
+ * Construct this builder from specified option object.
+ * The option that returned from {@link #build()} method
+ * is not equals with given option.
+ * In other words, builder creates new option object
+ * and copy all preferences from given option.
+ *
+ * @param options option object which to copy preferences
+ * @return new builder object
+ */
+ public static SocketOptionBuilder builder(IO.Options options) {
+ return new SocketOptionBuilder(options);
+ }
+
+
+ private final IO.Options options = new IO.Options();
+
+
+ /**
+ * Construct new builder with default preferences.
+ */
+ protected SocketOptionBuilder() {
+ this(null);
+ }
+
+
+ /**
+ * Construct this builder from specified option object.
+ * The option that returned from {@link #build()} method
+ * is not equals with given option.
+ * In other words, builder creates new option object
+ * and copy all preferences from given option.
+ *
+ * @param options option object which to copy preferences. Null-ok.
+ */
+ protected SocketOptionBuilder(IO.Options options) {
+ if (options != null) {
+ this.setForceNew(options.forceNew)
+ .setMultiplex(options.multiplex)
+ .setReconnection(options.reconnection)
+ .setReconnectionAttempts(options.reconnectionAttempts)
+ .setReconnectionDelay(options.reconnectionDelay)
+ .setReconnectionDelayMax(options.reconnectionDelayMax)
+ .setRandomizationFactor(options.randomizationFactor)
+ .setTimeout(options.timeout)
+ .setTransports(options.transports)
+ .setUpgrade(options.upgrade)
+ .setRememberUpgrade(options.rememberUpgrade)
+ .setHost(options.host)
+ .setHostname(options.hostname)
+ .setPort(options.port)
+ .setPolicyPort(options.policyPort)
+ .setSecure(options.secure)
+ .setPath(options.path)
+ .setQuery(options.query)
+ .setAuth(options.auth)
+ .setExtraHeaders(options.extraHeaders);
+ }
+ }
+
+ public SocketOptionBuilder setForceNew(boolean forceNew) {
+ this.options.forceNew = forceNew;
+ return this;
+ }
+
+ public SocketOptionBuilder setMultiplex(boolean multiplex) {
+ this.options.multiplex = multiplex;
+ return this;
+ }
+
+ public SocketOptionBuilder setReconnection(boolean reconnection) {
+ this.options.reconnection = reconnection;
+ return this;
+ }
+
+ public SocketOptionBuilder setReconnectionAttempts(int reconnectionAttempts) {
+ this.options.reconnectionAttempts = reconnectionAttempts;
+ return this;
+ }
+
+ public SocketOptionBuilder setReconnectionDelay(long reconnectionDelay) {
+ this.options.reconnectionDelay = reconnectionDelay;
+ return this;
+ }
+
+ public SocketOptionBuilder setReconnectionDelayMax(long reconnectionDelayMax) {
+ this.options.reconnectionDelayMax = reconnectionDelayMax;
+ return this;
+ }
+
+
+ public SocketOptionBuilder setRandomizationFactor(double randomizationFactor) {
+ this.options.randomizationFactor = randomizationFactor;
+ return this;
+ }
+
+ public SocketOptionBuilder setTimeout(long timeout) {
+ this.options.timeout = timeout;
+ return this;
+ }
+
+ public SocketOptionBuilder setTransports(String[] transports) {
+ this.options.transports = transports;
+ return this;
+ }
+
+ public SocketOptionBuilder setUpgrade(boolean upgrade) {
+ this.options.upgrade = upgrade;
+ return this;
+ }
+
+ public SocketOptionBuilder setRememberUpgrade(boolean rememberUpgrade) {
+ this.options.rememberUpgrade = rememberUpgrade;
+ return this;
+ }
+
+ public SocketOptionBuilder setHost(String host) {
+ this.options.host = host;
+ return this;
+ }
+
+ public SocketOptionBuilder setHostname(String hostname) {
+ this.options.hostname = hostname;
+ return this;
+ }
+
+ public SocketOptionBuilder setPort(int port) {
+ this.options.port = port;
+ return this;
+ }
+
+ public SocketOptionBuilder setPolicyPort(int policyPort) {
+ this.options.policyPort = policyPort;
+ return this;
+ }
+
+ public SocketOptionBuilder setQuery(String query) {
+ this.options.query = query;
+ return this;
+ }
+
+ public SocketOptionBuilder setSecure(boolean secure) {
+ this.options.secure = secure;
+ return this;
+ }
+
+ public SocketOptionBuilder setPath(String path) {
+ this.options.path = path;
+ return this;
+ }
+
+ public SocketOptionBuilder setAuth(Map<String, String> auth) {
+ this.options.auth = auth;
+ return this;
+ }
+
+ public SocketOptionBuilder setExtraHeaders(Map<String, List<String>> extraHeaders) {
+ this.options.extraHeaders = extraHeaders;
+ return this;
+ }
+
+ /**
+ * Finally retrieve {@link io.socket.client.IO.Options} object
+ * from this builder.
+ *
+ * @return option that built from this builder
+ */
+ public IO.Options build() {
+ return this.options;
+ }
+}
diff --git a/src/main/java/com/nq/ws/client/Url.java b/src/main/java/com/nq/ws/client/Url.java
new file mode 100644
index 0000000..5efa349
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Url.java
@@ -0,0 +1,84 @@
+package com.nq.ws.client;
+
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Url {
+
+ /**
+ * Expected format: "[id:password@]host[:port]"
+ */
+ private static Pattern PATTERN_AUTHORITY = Pattern.compile("^(.*@)?([^:]+)(:\\d+)?$");
+
+ private Url() {}
+
+ static class ParsedURI {
+ public final URI uri;
+ public final String id;
+
+ public ParsedURI(URI uri, String id) {
+ this.uri = uri;
+ this.id = id;
+ }
+ }
+
+ public static ParsedURI parse(URI uri) {
+ String protocol = uri.getScheme();
+ if (protocol == null || !protocol.matches("^https?|wss?$")) {
+ protocol = "https";
+ }
+
+ int port = uri.getPort();
+ if (port == -1) {
+ if ("http".equals(protocol) || "ws".equals(protocol)) {
+ port = 80;
+ } else if ("https".equals(protocol) || "wss".equals(protocol)) {
+ port = 443;
+ }
+ }
+
+ String path = uri.getRawPath();
+ if (path == null || path.length() == 0) {
+ path = "/";
+ }
+
+ String userInfo = uri.getRawUserInfo();
+ String query = uri.getRawQuery();
+ String fragment = uri.getRawFragment();
+ String _host = uri.getHost();
+ if (_host == null) {
+ // might happen on some of Samsung Devices such as S4.
+ _host = extractHostFromAuthorityPart(uri.getRawAuthority());
+ }
+ URI completeUri = URI.create(protocol + "://"
+ + (userInfo != null ? userInfo + "@" : "")
+ + _host
+ + (port != -1 ? ":" + port : "")
+ + path
+ + (query != null ? "?" + query : "")
+ + (fragment != null ? "#" + fragment : ""));
+ String id = protocol + "://" + _host + ":" + port;
+
+ return new ParsedURI(completeUri, id);
+ }
+
+
+ private static String extractHostFromAuthorityPart(String authority)
+ {
+ if (authority == null) {
+ throw new RuntimeException("unable to parse the host from the authority");
+ }
+
+ Matcher matcher = PATTERN_AUTHORITY.matcher(authority);
+
+ // If the authority part does not match the expected format.
+ if (!matcher.matches()) {
+ throw new RuntimeException("unable to parse the host from the authority");
+ }
+
+ // Return the host part.
+ return matcher.group(2);
+ }
+
+}
diff --git a/src/main/java/com/nq/ws/hasbinary/HasBinary.java b/src/main/java/com/nq/ws/hasbinary/HasBinary.java
new file mode 100644
index 0000000..08c4e58
--- /dev/null
+++ b/src/main/java/com/nq/ws/hasbinary/HasBinary.java
@@ -0,0 +1,63 @@
+package com.nq.ws.hasbinary;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class HasBinary {
+
+ private static final Logger logger = Logger.getLogger(HasBinary.class.getName());
+
+ private HasBinary() {}
+
+ public static boolean hasBinary(Object data) {
+ return _hasBinary(data);
+ }
+
+ private static boolean _hasBinary(Object obj) {
+ if (obj == null) return false;
+
+ if (obj instanceof byte[]) {
+ return true;
+ }
+
+ if (obj instanceof JSONArray) {
+ JSONArray _obj = (JSONArray)obj;
+ int length = _obj.length();
+ for (int i = 0; i < length; i++) {
+ Object v;
+ try {
+ v = _obj.isNull(i) ? null : _obj.get(i);
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while retrieving data from JSONArray", e);
+ return false;
+ }
+ if (_hasBinary(v)) {
+ return true;
+ }
+ }
+ } else if (obj instanceof JSONObject) {
+ JSONObject _obj = (JSONObject)obj;
+ Iterator keys = _obj.keys();
+ while (keys.hasNext()) {
+ String key = (String)keys.next();
+ Object v;
+ try {
+ v = _obj.get(key);
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while retrieving data from JSONObject", e);
+ return false;
+ }
+ if (_hasBinary(v)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/src/main/java/com/nq/ws/parser/Binary.java b/src/main/java/com/nq/ws/parser/Binary.java
new file mode 100644
index 0000000..ac2618f
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/Binary.java
@@ -0,0 +1,127 @@
+package com.nq.ws.parser;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class Binary {
+
+ private static final String KEY_PLACEHOLDER = "_placeholder";
+
+ private static final String KEY_NUM = "num";
+
+ private static final Logger logger = Logger.getLogger(Binary.class.getName());
+
+ @SuppressWarnings("unchecked")
+ public static DeconstructedPacket deconstructPacket(Packet packet) {
+ List<byte[]> buffers = new ArrayList<>();
+
+ packet.data = _deconstructPacket(packet.data, buffers);
+ packet.attachments = buffers.size();
+
+ DeconstructedPacket result = new DeconstructedPacket();
+ result.packet = packet;
+ result.buffers = buffers.toArray(new byte[buffers.size()][]);
+ return result;
+ }
+
+ private static Object _deconstructPacket(Object data, List<byte[]> buffers) {
+ if (data == null) return null;
+
+ if (data instanceof byte[]) {
+ JSONObject placeholder = new JSONObject();
+ try {
+ placeholder.put(KEY_PLACEHOLDER, true);
+ placeholder.put(KEY_NUM, buffers.size());
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while putting data to JSONObject", e);
+ return null;
+ }
+ buffers.add((byte[])data);
+ return placeholder;
+ } else if (data instanceof JSONArray) {
+ JSONArray newData = new JSONArray();
+ JSONArray _data = (JSONArray)data;
+ int len = _data.length();
+ for (int i = 0; i < len; i ++) {
+ try {
+ newData.put(i, _deconstructPacket(_data.get(i), buffers));
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while putting packet data to JSONObject", e);
+ return null;
+ }
+ }
+ return newData;
+ } else if (data instanceof JSONObject) {
+ JSONObject newData = new JSONObject();
+ JSONObject _data = (JSONObject)data;
+ Iterator<?> iterator = _data.keys();
+ while (iterator.hasNext()) {
+ String key = (String)iterator.next();
+ try {
+ newData.put(key, _deconstructPacket(_data.get(key), buffers));
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while putting data to JSONObject", e);
+ return null;
+ }
+ }
+ return newData;
+ }
+ return data;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Packet reconstructPacket(Packet packet, byte[][] buffers) {
+ packet.data = _reconstructPacket(packet.data, buffers);
+ packet.attachments = -1;
+ return packet;
+ }
+
+ private static Object _reconstructPacket(Object data, byte[][] buffers) {
+ if (data instanceof JSONArray) {
+ JSONArray _data = (JSONArray)data;
+ int len = _data.length();
+ for (int i = 0; i < len; i ++) {
+ try {
+ _data.put(i, _reconstructPacket(_data.get(i), buffers));
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while putting packet data to JSONObject", e);
+ return null;
+ }
+ }
+ return _data;
+ } else if (data instanceof JSONObject) {
+ JSONObject _data = (JSONObject)data;
+ if (_data.optBoolean(KEY_PLACEHOLDER)) {
+ int num = _data.optInt(KEY_NUM, -1);
+ return num >= 0 && num < buffers.length ? buffers[num] : null;
+ }
+ Iterator<?> iterator = _data.keys();
+ while (iterator.hasNext()) {
+ String key = (String)iterator.next();
+ try {
+ _data.put(key, _reconstructPacket(_data.get(key), buffers));
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while putting data to JSONObject", e);
+ return null;
+ }
+ }
+ return _data;
+ }
+ return data;
+ }
+
+ public static class DeconstructedPacket {
+
+ public Packet packet;
+ public byte[][] buffers;
+ }
+}
+
+
diff --git a/src/main/java/com/nq/ws/parser/DecodingException.java b/src/main/java/com/nq/ws/parser/DecodingException.java
new file mode 100644
index 0000000..67806ea
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/DecodingException.java
@@ -0,0 +1,7 @@
+package com.nq.ws.parser;
+
+public class DecodingException extends RuntimeException {
+ public DecodingException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/nq/ws/parser/IOParser.java b/src/main/java/com/nq/ws/parser/IOParser.java
new file mode 100644
index 0000000..ec93478
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/IOParser.java
@@ -0,0 +1,263 @@
+package com.nq.ws.parser;
+
+import com.nq.ws.hasbinary.HasBinary;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+final public class IOParser implements Parser {
+
+ private static final Logger logger = Logger.getLogger(IOParser.class.getName());
+
+ private IOParser() {}
+
+ final public static class Encoder implements Parser.Encoder {
+
+ public Encoder() {}
+
+ @Override
+ public void encode(Packet obj, Callback callback) {
+ if ((obj.type == EVENT || obj.type == ACK) && HasBinary.hasBinary(obj.data)) {
+ obj.type = obj.type == EVENT ? BINARY_EVENT : BINARY_ACK;
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("encoding packet %s", obj));
+ }
+
+ if (BINARY_EVENT == obj.type || BINARY_ACK == obj.type) {
+ encodeAsBinary(obj, callback);
+ } else {
+ String encoding = encodeAsString(obj);
+ callback.call(new String[] {encoding});
+ }
+ }
+
+ private String encodeAsString(Packet obj) {
+ StringBuilder str = new StringBuilder("" + obj.type);
+
+ if (BINARY_EVENT == obj.type || BINARY_ACK == obj.type) {
+ str.append(obj.attachments);
+ str.append("-");
+ }
+
+ if (obj.nsp != null && obj.nsp.length() != 0 && !"/".equals(obj.nsp)) {
+ str.append(obj.nsp);
+ str.append(",");
+ }
+
+ if (obj.id >= 0) {
+ str.append(obj.id);
+ }
+
+ if (obj.data != null) {
+ str.append(obj.data);
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("encoded %s as %s", obj, str));
+ }
+ return str.toString();
+ }
+
+ private void encodeAsBinary(Packet obj, Callback callback) {
+ Binary.DeconstructedPacket deconstruction = Binary.deconstructPacket(obj);
+ String pack = encodeAsString(deconstruction.packet);
+ List<Object> buffers = new ArrayList<Object>(Arrays.asList(deconstruction.buffers));
+
+ buffers.add(0, pack);
+ callback.call(buffers.toArray());
+ }
+ }
+
+ final public static class Decoder implements Parser.Decoder {
+
+ /*package*/ BinaryReconstructor reconstructor;
+
+ private Decoder.Callback onDecodedCallback;
+
+ public Decoder() {
+ this.reconstructor = null;
+ }
+
+ @Override
+ public void add(String obj) {
+ Packet packet = decodeString(obj);
+ if (BINARY_EVENT == packet.type || BINARY_ACK == packet.type) {
+ this.reconstructor = new BinaryReconstructor(packet);
+
+ if (this.reconstructor.reconPack.attachments == 0) {
+ if (this.onDecodedCallback != null) {
+ this.onDecodedCallback.call(packet);
+ }
+ }
+ } else {
+ if (this.onDecodedCallback != null) {
+ this.onDecodedCallback.call(packet);
+ }
+ }
+ }
+
+ @Override
+ public void add(byte[] obj) {
+ if (this.reconstructor == null) {
+ throw new RuntimeException("got binary data when not reconstructing a packet");
+ } else {
+ Packet packet = this.reconstructor.takeBinaryData(obj);
+ if (packet != null) {
+ this.reconstructor = null;
+ if (this.onDecodedCallback != null) {
+ this.onDecodedCallback.call(packet);
+ }
+ }
+ }
+ }
+
+ private static Packet decodeString(String str) {
+ int i = 0;
+ int length = str.length();
+
+ Packet<Object> p = new Packet<>(Character.getNumericValue(str.charAt(0)));
+
+ if (p.type < 0 || p.type > types.length - 1) {
+ throw new DecodingException("unknown packet type " + p.type);
+ }
+
+ if (BINARY_EVENT == p.type || BINARY_ACK == p.type) {
+ if (!str.contains("-") || length <= i + 1) {
+ throw new DecodingException("illegal attachments");
+ }
+ StringBuilder attachments = new StringBuilder();
+ while (str.charAt(++i) != '-') {
+ attachments.append(str.charAt(i));
+ }
+ p.attachments = Integer.parseInt(attachments.toString());
+ }
+
+ if (length > i + 1 && '/' == str.charAt(i + 1)) {
+ StringBuilder nsp = new StringBuilder();
+ while (true) {
+ ++i;
+ char c = str.charAt(i);
+ if (',' == c) break;
+ nsp.append(c);
+ if (i + 1 == length) break;
+ }
+ p.nsp = nsp.toString();
+ } else {
+ p.nsp = "/";
+ }
+
+ if (length > i + 1){
+ Character next = str.charAt(i + 1);
+ if (Character.getNumericValue(next) > -1) {
+ StringBuilder id = new StringBuilder();
+ while (true) {
+ ++i;
+ char c = str.charAt(i);
+ if (Character.getNumericValue(c) < 0) {
+ --i;
+ break;
+ }
+ id.append(c);
+ if (i + 1 == length) break;
+ }
+ try {
+ p.id = Integer.parseInt(id.toString());
+ } catch (NumberFormatException e){
+ throw new DecodingException("invalid payload");
+ }
+ }
+ }
+
+ if (length > i + 1){
+ try {
+ str.charAt(++i);
+ p.data = new JSONTokener(str.substring(i)).nextValue();
+ } catch (JSONException e) {
+ logger.log(Level.WARNING, "An error occured while retrieving data from JSONTokener", e);
+ throw new DecodingException("invalid payload");
+ }
+ if (!isPayloadValid(p.type, p.data)) {
+ throw new DecodingException("invalid payload");
+ }
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(String.format("decoded %s as %s", str, p));
+ }
+ return p;
+ }
+
+ private static boolean isPayloadValid(int type, Object payload) {
+ switch (type) {
+ case Parser.CONNECT:
+ case Parser.CONNECT_ERROR:
+ return payload instanceof JSONObject;
+ case Parser.DISCONNECT:
+ return payload == null;
+ case Parser.EVENT:
+ case Parser.BINARY_EVENT:
+ return payload instanceof JSONArray
+ && ((JSONArray) payload).length() > 0
+ && !((JSONArray) payload).isNull(0);
+ case Parser.ACK:
+ case Parser.BINARY_ACK:
+ return payload instanceof JSONArray;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (this.reconstructor != null) {
+ this.reconstructor.finishReconstruction();
+ }
+ this.onDecodedCallback = null;
+ }
+
+ @Override
+ public void onDecoded (Callback callback) {
+ this.onDecodedCallback = callback;
+ }
+ }
+
+
+ /*package*/ static class BinaryReconstructor {
+
+ public Packet reconPack;
+
+ /*package*/ List<byte[]> buffers;
+
+ BinaryReconstructor(Packet packet) {
+ this.reconPack = packet;
+ this.buffers = new ArrayList<>();
+ }
+
+ public Packet takeBinaryData(byte[] binData) {
+ this.buffers.add(binData);
+ if (this.buffers.size() == this.reconPack.attachments) {
+ Packet packet = Binary.reconstructPacket(this.reconPack,
+ this.buffers.toArray(new byte[this.buffers.size()][]));
+ this.finishReconstruction();
+ return packet;
+ }
+ return null;
+ }
+
+ public void finishReconstruction () {
+ this.reconPack = null;
+ this.buffers = new ArrayList<>();
+ }
+ }
+}
+
+
diff --git a/src/main/java/com/nq/ws/parser/Packet.java b/src/main/java/com/nq/ws/parser/Packet.java
new file mode 100644
index 0000000..e9be057
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/Packet.java
@@ -0,0 +1,22 @@
+package com.nq.ws.parser;
+
+
+public class Packet<T> {
+
+ public int type = -1;
+ public int id = -1;
+ public String nsp;
+ public T data;
+ public int attachments;
+
+ public Packet() {}
+
+ public Packet(int type) {
+ this.type = type;
+ }
+
+ public Packet(int type, T data) {
+ this.type = type;
+ this.data = data;
+ }
+}
diff --git a/src/main/java/com/nq/ws/parser/Parser.java b/src/main/java/com/nq/ws/parser/Parser.java
new file mode 100644
index 0000000..f5f0c32
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/Parser.java
@@ -0,0 +1,82 @@
+package com.nq.ws.parser;
+
+public interface Parser {
+
+ /**
+ * Packet type `connect`.
+ */
+ int CONNECT = 0;
+
+ /**
+ * Packet type `disconnect`.
+ */
+ int DISCONNECT = 1;
+
+ /**
+ * Packet type `event`.
+ */
+ int EVENT = 2;
+
+ /**
+ * Packet type `ack`.
+ */
+ int ACK = 3;
+
+ /**
+ * Packet type `error`.
+ */
+ int CONNECT_ERROR = 4;
+
+ /**
+ * Packet type `binary event`.
+ */
+ int BINARY_EVENT = 5;
+
+ /**
+ * Packet type `binary ack`.
+ */
+ int BINARY_ACK = 6;
+
+ int protocol = 5;
+
+ /**
+ * Packet types.
+ */
+ String[] types = new String[] {
+ "CONNECT",
+ "DISCONNECT",
+ "EVENT",
+ "ACK",
+ "ERROR",
+ "BINARY_EVENT",
+ "BINARY_ACK"
+ };
+
+ interface Encoder {
+
+ void encode(Packet obj, Callback callback);
+
+ interface Callback {
+
+ void call(Object[] data);
+ }
+ }
+
+ interface Decoder {
+
+ void add(String obj);
+
+ void add(byte[] obj);
+
+ void destroy();
+
+ void onDecoded(Callback callback);
+
+ interface Callback {
+
+ void call(Packet packet);
+ }
+ }
+}
+
+
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 7179edc..59b9a9a 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -45,6 +45,10 @@
IN_WS_URL =ws://ws.is4vc.com:8001/websocket-server
IN_KEY = r3ZAgtcYzuBizmqge2hK
+JS_IN_HTTP_API = http://api-in-2-socket.js-stock.top
+JS_IN_WS_URL =ws://api-in-2-ws.js-stock.top
+JS_IN_KEY = eVKtHt7aG4m6ozwWL9qG
+
US_HTTP_API = http://api-us.js-stock.top/
US_WS_URL = ws://ws-us.js-stock.top
US_KEY = jZFrku4RGQjP87Hmq5tm
--
Gitblit v1.9.3