From 7bcae85b0f7ea58b927b7441f61b7f33b11646c2 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Thu, 16 May 2024 17:54:12 +0800
Subject: [PATCH] 1

---
 .idea/inspectionProfiles/Project_Default.xml            |   91 +
 src/main/java/com/nq/ws/WebsocketRunClient.java         |  121 +-
 src/main/java/com/nq/ws/parser/Parser.java              |   82 +
 src/main/java/com/nq/enums/EStockType.java              |    2 
 src/main/java/com/nq/ws/client/Url.java                 |   84 +
 src/main/java/com/nq/ws/hasbinary/HasBinary.java        |   63 +
 src/main/java/com/nq/ws/client/Ack.java                 |   11 
 src/main/java/com/nq/ws/parser/DecodingException.java   |    7 
 src/main/java/com/nq/ws/client/AckWithTimeout.java      |   35 
 src/main/java/com/nq/ws/client/SocketOptionBuilder.java |  196 ++++
 pom.xml                                                 |   16 
 src/main/java/com/nq/ws/WsClientConfig.java             |  147 +++
 src/main/java/com/nq/ws/client/Socket.java              |  574 ++++++++++++
 src/main/resources/application.properties               |    4 
 src/main/java/com/nq/ws/WebSocketClientBeanConfig.java  |   96 +-
 src/main/java/com/nq/ws/backo/Backoff.java              |   66 +
 src/main/java/com/nq/ws/client/On.java                  |   23 
 src/main/java/com/nq/ws/parser/IOParser.java            |  263 +++++
 src/main/java/com/nq/ws/parser/Binary.java              |  127 ++
 src/main/java/com/nq/ws/client/Manager.java             |  575 ++++++++++++
 src/main/java/com/nq/ws/client/SocketIOException.java   |   20 
 src/main/java/com/nq/ws/parser/Packet.java              |   22 
 src/main/java/com/nq/ws/client/IO.java                  |  118 ++
 23 files changed, 2,639 insertions(+), 104 deletions(-)

diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 6560a98..4cc2f57 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -1,6 +1,61 @@
 <component name="InspectionProjectProfileManager">
   <profile version="1.0">
     <option name="myName" value="Project Default" />
+    <inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAbstractClassShouldStartWithAbstractNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAbstractMethodOrInterfaceMethodMustUseJavadoc" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidApacheBeanUtilsCopy" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidCallStaticSimpleDateFormat" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidCommentBehindStatement" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidComplexCondition" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidConcurrentCompetitionRandom" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidDoubleOrFloatEqualCompare" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidManuallyCreateThread" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidMissUseOfMathRandom" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidNegationOperator" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidNewDateGetTime" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidPatternCompileInMethod" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidReturnInFinally" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidStartWithDollarAndUnderLineNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaAvoidUseTimer" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaBigDecimalAvoidDoubleConstructor" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaBooleanPropertyShouldNotStartWithIs" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaClassCastExceptionWithSubListToArrayList" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaClassCastExceptionWithToArray" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaClassMustHaveAuthor" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaClassNamingShouldBeCamel" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaCollectionInitShouldAssignCapacity" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaCommentsMustBeJavadocFormat" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaConcurrentExceptionWithModifyOriginSubList" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaConstantFieldShouldBeUpperCase" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaCountDownShouldInFinally" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaDontModifyInForeachCircle" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaEnumConstantsMustHaveComment" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaExceptionClassShouldEndWithException" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaIbatisMethodQueryForList" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaLockShouldWithTryFinally" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaLowerCamelCaseVariableNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaMethodReturnWrapperType" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaMethodTooLong" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaPackageNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaPojoMustOverrideToString" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaPojoMustUsePrimitiveField" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaPojoNoDefaultValue" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaRemoveCommentedCode" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaServiceOrDaoClassShouldEndWithImpl" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaSneakyThrowsWithoutExceptionType" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaStringConcat" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaSwitchExpression" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaSwitchStatement" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaTestClassShouldEndWithTestNaming" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaThreadLocalShouldRemove" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaThreadPoolCreation" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaThreadShouldSetName" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaTransactionMustHaveRollback" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaUndefineMagicConstant" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaUnsupportedExceptionWithModifyAsList" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaUseQuietReferenceNotation" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AlibabaUseRightCaseForDateFormat" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="JavaDoc" enabled="true" level="WARNING" enabled_by_default="true">
       <option name="TOP_LEVEL_CLASS_OPTIONS">
         <value>
@@ -32,5 +87,41 @@
       <option name="IGNORE_POINT_TO_ITSELF" value="false" />
       <option name="myAdditionalJavadocTags" value="date" />
     </inspection_tool>
+    <inspection_tool class="JavadocDeclaration" enabled="true" level="WARNING" enabled_by_default="true">
+      <option name="ADDITIONAL_TAGS" value="date" />
+    </inspection_tool>
+    <inspection_tool class="MissingJavadoc" enabled="true" level="WARNING" enabled_by_default="true">
+      <option name="PACKAGE_SETTINGS">
+        <Options>
+          <option name="ENABLED" value="false" />
+        </Options>
+      </option>
+      <option name="MODULE_SETTINGS">
+        <Options>
+          <option name="ENABLED" value="false" />
+        </Options>
+      </option>
+      <option name="TOP_LEVEL_CLASS_SETTINGS">
+        <Options>
+          <option name="ENABLED" value="false" />
+        </Options>
+      </option>
+      <option name="INNER_CLASS_SETTINGS">
+        <Options>
+          <option name="ENABLED" value="false" />
+        </Options>
+      </option>
+      <option name="METHOD_SETTINGS">
+        <Options>
+          <option name="REQUIRED_TAGS" value="@return@param@throws or @exception" />
+          <option name="ENABLED" value="false" />
+        </Options>
+      </option>
+      <option name="FIELD_SETTINGS">
+        <Options>
+          <option name="ENABLED" value="false" />
+        </Options>
+      </option>
+    </inspection_tool>
   </profile>
 </component>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 84a2b55..058a9cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,7 +197,21 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>io.socket</groupId>
+            <artifactId>engine.io-client</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.json</groupId>
+            <artifactId>json</artifactId>
+            <version>20090211</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.8</version>
+        </dependency>
 
 
     </dependencies>
diff --git a/src/main/java/com/nq/enums/EStockType.java b/src/main/java/com/nq/enums/EStockType.java
index 23029e6..6cc3f17 100644
--- a/src/main/java/com/nq/enums/EStockType.java
+++ b/src/main/java/com/nq/enums/EStockType.java
@@ -15,7 +15,7 @@
     HK("HK","香港股票","39",PropertiesUtil.getProperty("HK_HTTP_API"),PropertiesUtil.getProperty("HK_KEY"),"HKD","HK$"),
     MAS("MAS","马来西亚股票","42",PropertiesUtil.getProperty("MAS_HTTP_API"),PropertiesUtil.getProperty("MAS_KEY"),"MYR","RM"),
 
-    IN("IN","印度股票","14", PropertiesUtil.getProperty("IN_HTTP_API"),PropertiesUtil.getProperty("IN_KEY"),"INR","₹");
+    IN("IN","印度股票","14", PropertiesUtil.getProperty("IN_HTTP_API"),PropertiesUtil.getProperty("JS_IN_KEY"),"INR","₹");
 //    TH("TH","泰国股票","41",PropertiesUtil.getProperty("TH_HTTP_API"),PropertiesUtil.getProperty("TH_KEY")),
 //    HG("HG","韩国股票","11",PropertiesUtil.getProperty("HG_HTTP_API"),PropertiesUtil.getProperty("HG_KEY")),
 //    SZHB("SZHB","数字货币","41",PropertiesUtil.getProperty("SZHB_HTTP_API"),PropertiesUtil.getProperty("SZHB_KEY"));
diff --git a/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java b/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
index 53bf924..355774e 100644
--- a/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
+++ b/src/main/java/com/nq/ws/WebSocketClientBeanConfig.java
@@ -1,48 +1,48 @@
-package com.nq.ws;
-
-
-import com.nq.enums.EStockType;
-import com.nq.utils.PropertiesUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.java_websocket.client.WebSocketClient;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-@Slf4j
-@Configuration
-public class WebSocketClientBeanConfig {
-
-
-    @Bean
-    public Map<String, WebSocketClient> websocketRunClientMap() {
-
-        Map<String, WebSocketClient> retMap = new HashMap<>(2);
-        try {
-            WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(PropertiesUtil.getProperty("IN_WS_URL")),EStockType.IN);
-            websocketRunClient.connect();
-            websocketRunClient.setConnectionLostTimeout(0);
-            new Thread(() -> {
-                while (true) {
-                    try {
-                        Thread.sleep(8000);
-                        websocketRunClient.send("heartbeat".getBytes());
-                    } catch (Exception e) {
-                        websocketRunClient.reconnect();
-                        websocketRunClient.setConnectionLostTimeout(0);
-                    }
-                }
-            }).start();
-        } catch (Exception e) {
-        }
-
-
-
-        return retMap;
-    }
-
-
-}
+//package com.nq.ws;
+//
+//
+//import com.nq.enums.EStockType;
+//import com.nq.utils.PropertiesUtil;
+//import lombok.extern.slf4j.Slf4j;
+//import org.java_websocket.client.WebSocketClient;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//
+//import java.net.URI;
+//import java.util.HashMap;
+//import java.util.Map;
+//
+//@Slf4j
+//@Configuration
+//public class WebSocketClientBeanConfig {
+//
+//
+//    @Bean
+//    public Map<String, WebSocketClient> websocketRunClientMap() {
+//
+//        Map<String, WebSocketClient> retMap = new HashMap<>(2);
+//        try {
+//            WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(PropertiesUtil.getProperty("JS_IN_WS_URL")),EStockType.IN);
+//            websocketRunClient.connect();
+//            websocketRunClient.setConnectionLostTimeout(0);
+//            new Thread(() -> {
+//                while (true) {
+//                    try {
+//                        Thread.sleep(8000);
+//                        websocketRunClient.send("heartbeat".getBytes());
+//                    } catch (Exception e) {
+//                        websocketRunClient.reconnect();
+//                        websocketRunClient.setConnectionLostTimeout(0);
+//                    }
+//                }
+//            }).start();
+//        } catch (Exception e) {
+//        }
+//
+//
+//
+//        return retMap;
+//    }
+//
+//
+//}
diff --git a/src/main/java/com/nq/ws/WebsocketRunClient.java b/src/main/java/com/nq/ws/WebsocketRunClient.java
index 2030fdc..b782a65 100644
--- a/src/main/java/com/nq/ws/WebsocketRunClient.java
+++ b/src/main/java/com/nq/ws/WebsocketRunClient.java
@@ -1,54 +1,67 @@
-package com.nq.ws;
-
-import com.google.gson.Gson;
-import com.nq.enums.EStockType;
-import com.nq.pojo.StockRealTimeBean;
-import com.nq.service.IMandatoryLiquidationService;
-import com.nq.service.impl.MandatoryLiquidationService;
-import com.nq.utils.ApplicationContextRegisterUtil;
-import com.nq.utils.redis.RedisKeyUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-import org.springframework.context.ApplicationContext;
-
-import java.net.URI;
-
-@Slf4j
-public class WebsocketRunClient  extends WebSocketClient {
-
-    private EStockType eStockType;
-    public WebsocketRunClient(URI serverUri,
-    EStockType eStockType
-    ) {
-        super(serverUri);
-        this.eStockType = eStockType;
-    }
-
-    @Override
-    public void onOpen(ServerHandshake serverHandshake) {
-        send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
-    }
-
-    @Override
-    public void onMessage(String s) {
-        try {
-            ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
-            MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
-            StockRealTimeBean stockDetailBean =  new Gson().fromJson(s, StockRealTimeBean.class);
-            liquidationService.RealTimeDataProcess(eStockType,stockDetailBean);
-        }catch (Exception e){
-
-        }
-    }
-
-    @Override
-    public void onClose(int i, String s, boolean b) {
-        log.info("websocket  马来西亚  关闭"+1);
-    }
-
-    @Override
-    public void onError(Exception e) {
-        log.info("websocket 错误");
-    }
-}
+//package com.nq.ws;
+//
+//import com.google.gson.Gson;
+//import com.google.gson.reflect.TypeToken;
+//import com.nq.enums.EStockType;
+//import com.nq.pojo.StockRealTimeBean;
+//import com.nq.service.IMandatoryLiquidationService;
+//import com.nq.service.impl.MandatoryLiquidationService;
+//import com.nq.utils.ApplicationContextRegisterUtil;
+//import com.nq.utils.redis.RedisKeyUtil;
+//import lombok.extern.slf4j.Slf4j;
+//import org.java_websocket.client.WebSocketClient;
+//import org.java_websocket.handshake.ServerHandshake;
+//import org.springframework.context.ApplicationContext;
+//
+//import java.lang.reflect.Type;
+//import java.net.URI;
+//import java.util.Map;
+//
+//@Slf4j
+//public class WebsocketRunClient  extends WebSocketClient {
+//
+//    private EStockType eStockType;
+//    public WebsocketRunClient(URI serverUri,
+//    EStockType eStockType
+//    ) {
+//        super(serverUri);
+//        this.eStockType = eStockType;
+//    }
+//
+//    @Override
+//    public void onOpen(ServerHandshake serverHandshake) {
+//        send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
+//    }
+//
+//    @Override
+//    public void onMessage(String s) {
+//        Map<String, Object> map = jsonToMap(s);
+//        if(map.get("pid").equals("00000001")){
+//            System.out.println(s);
+//        }
+//        try {
+//            ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
+//            MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
+//            StockRealTimeBean stockDetailBean =  new Gson().fromJson(s, StockRealTimeBean.class);
+//            liquidationService.RealTimeDataProcess(eStockType,stockDetailBean);
+//        }catch (Exception e){
+//
+//        }
+//    }
+//
+//    public static Map<String, Object> jsonToMap(String json) {
+//        Gson gson = new Gson();
+//        Type type = new TypeToken<Map<String, Object>>(){}.getType();
+//        return gson.fromJson(json, type);
+//    }
+//
+//    @Override
+//    public void onClose(int i, String s, boolean b) {
+//        log.info("websocket  马来西亚  关闭"+1);
+//    }
+//
+//    @Override
+//    public void onError(Exception e) {
+//        log.info("websocket 错误");
+//    }
+//}
diff --git a/src/main/java/com/nq/ws/WsClientConfig.java b/src/main/java/com/nq/ws/WsClientConfig.java
new file mode 100644
index 0000000..61a2f51
--- /dev/null
+++ b/src/main/java/com/nq/ws/WsClientConfig.java
@@ -0,0 +1,147 @@
+package com.nq.ws;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import com.nq.enums.EStockType;
+import com.nq.pojo.StockRealTimeBean;
+import com.nq.service.IMandatoryLiquidationService;
+import com.nq.service.impl.MandatoryLiquidationService;
+import com.nq.utils.ApplicationContextRegisterUtil;
+import com.nq.utils.PropertiesUtil;
+import com.nq.ws.client.IO;
+import com.nq.ws.client.Socket;
+import io.socket.emitter.Emitter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Configuration
+public class WsClientConfig {
+
+    private static final String SERVER_URL = PropertiesUtil.getProperty("JS_IN_HTTP_API");
+    private static final String AUTH_KEY = PropertiesUtil.getProperty("JS_IN_KEY");
+    private static final String ROOM_ID = "14";
+    private static Socket socket;
+
+    @Bean
+    public void websocketRunClientMap() {
+        connectToServer();
+    }
+
+    private static void connectToServer() {
+        IO.Options options = new IO.Options();
+        options.reconnection = true;
+        options.reconnectionDelay = 1000;
+
+        try {
+            socket = IO.socket(new URI(SERVER_URL), options);
+        } catch (URISyntaxException e) {
+            log.error("Invalid URI", e);
+            return;
+        }
+
+        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
+            @Override
+            public void call(Object... args) {
+                authenticate();
+            }
+        });
+
+        socket.on("marketData", new Emitter.Listener() {
+            @Override
+            public void call(Object... args) {
+                    Map<String, Object> map = jsonToMap(args[0].toString());
+                    if(map.get("pid").equals("00000001")){
+                        System.out.println("接收时间:"  +  new SimpleDateFormat("HH:mm:ss").format(new  Date())  +  "  "  +  args[0].toString());
+                    }
+                try {
+                    ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext();
+                    MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class);
+                    StockRealTimeBean stockDetailBean =  new Gson().fromJson(args[0].toString(), StockRealTimeBean.class);
+                    liquidationService.RealTimeDataProcess(EStockType.IN,stockDetailBean);
+                }catch (Exception e){
+                    log.error("socket数据存入缓存错误:", e.getMessage());
+                }
+            }
+        });
+
+        socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
+            @Override
+            public void call(Object... args) {
+                log.error("socketIo连接错误: " + args[0]);
+            }
+        });
+
+        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
+            @Override
+            public void call(Object... args) {
+                log.error("socketIo 断开连接: " + args[0]);
+                if (!socket.connected()) {
+                    log.error("socketIo 开始重连: " + args[0]);
+                    reconnect();
+                }
+            }
+        });
+
+        socket.connect();
+    }
+
+    public static Map<String, Object> jsonToMap(String json) {
+        Gson gson = new Gson();
+        Type type = new TypeToken<Map<String, Object>>(){}.getType();
+        return gson.fromJson(json, type);
+    }
+
+    private static void reconnect() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!socket.connected()) {
+                    try {
+                        log.error("socketIo 开始重连");
+                        socket.connect();
+                        Thread.sleep(5000); // 重连间隔,单位毫秒
+                    } catch (InterruptedException e) {
+                        log.error("重连被中断", e);
+                    }
+                }
+            }
+        }).start();
+    }
+
+    private static void authenticate() {
+        log.info("socketIo---->开始连接");
+        if (socket != null && socket.connected()) {
+            JSONObject authData = new JSONObject();
+            try {
+                authData.put("key", AUTH_KEY);
+                authData.put("roomId", ROOM_ID);
+                socket.emit("authenticate", authData);
+                log.info("socketIo---->连接成功");
+            } catch (JSONException e) {
+                log.error("socketIo认证错误:"+e.getMessage(), e);
+            }
+        }
+    }
+}
diff --git a/src/main/java/com/nq/ws/backo/Backoff.java b/src/main/java/com/nq/ws/backo/Backoff.java
new file mode 100644
index 0000000..4f440af
--- /dev/null
+++ b/src/main/java/com/nq/ws/backo/Backoff.java
@@ -0,0 +1,66 @@
+package com.nq.ws.backo;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * Imported from https://github.com/mokesmokes/backo
+ */
+public class Backoff {
+
+    private long ms = 100;
+    private long max = 10000;
+    private int factor = 2;
+    private double jitter;
+    private int attempts;
+
+    public Backoff() {}
+
+    public long duration() {
+        BigInteger ms = BigInteger.valueOf(this.ms)
+                .multiply(BigInteger.valueOf(this.factor).pow(this.attempts++));
+        if (jitter != 0.0) {
+            double rand = Math.random();
+            BigInteger deviation = BigDecimal.valueOf(rand)
+                    .multiply(BigDecimal.valueOf(jitter))
+                    .multiply(new BigDecimal(ms)).toBigInteger();
+            ms = (((int) Math.floor(rand * 10)) & 1) == 0 ? ms.subtract(deviation) : ms.add(deviation);
+        }
+        return ms
+                .min(BigInteger.valueOf(this.max))
+                .max(BigInteger.valueOf(this.ms))
+                .longValue();
+    }
+
+    public void reset() {
+        this.attempts = 0;
+    }
+
+    public Backoff setMin(long min) {
+        this.ms = min;
+        return this;
+    }
+
+    public Backoff setMax(long max) {
+        this.max = max;
+        return this;
+    }
+
+    public Backoff setFactor(int factor) {
+        this.factor = factor;
+        return this;
+    }
+
+    public Backoff setJitter(double jitter) {
+        boolean isValid = jitter >= 0 && jitter < 1;
+        if (!isValid) {
+            throw new IllegalArgumentException("jitter must be between 0 and 1");
+        }
+        this.jitter = jitter;
+        return this;
+    }
+
+    public int getAttempts() {
+        return this.attempts;
+    }
+}
diff --git a/src/main/java/com/nq/ws/client/Ack.java b/src/main/java/com/nq/ws/client/Ack.java
new file mode 100644
index 0000000..d055cfc
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Ack.java
@@ -0,0 +1,11 @@
+package com.nq.ws.client;
+
+/**
+ * Acknowledgement.
+ */
+public interface Ack {
+
+    void call(Object... args);
+
+}
+
diff --git a/src/main/java/com/nq/ws/client/AckWithTimeout.java b/src/main/java/com/nq/ws/client/AckWithTimeout.java
new file mode 100644
index 0000000..e7374ff
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/AckWithTimeout.java
@@ -0,0 +1,35 @@
+package com.nq.ws.client;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public abstract class AckWithTimeout implements Ack {
+    private final long timeout;
+    private final Timer timer = new Timer();
+
+    /**
+     *
+     * @param timeout delay in milliseconds
+     */
+    public AckWithTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    @Override
+    public final void call(Object... args) {
+        this.timer.cancel();
+        this.onSuccess(args);
+    }
+
+    public final void schedule(TimerTask task) {
+        this.timer.schedule(task, this.timeout);
+    }
+
+    public final void cancelTimer() {
+        this.timer.cancel();
+    }
+
+    public abstract void onSuccess(Object... args);
+    public abstract void onTimeout();
+
+}
diff --git a/src/main/java/com/nq/ws/client/IO.java b/src/main/java/com/nq/ws/client/IO.java
new file mode 100644
index 0000000..ae4abc7
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/IO.java
@@ -0,0 +1,118 @@
+package com.nq.ws.client;
+
+
+import com.nq.ws.parser.Parser;
+import okhttp3.Call;
+import okhttp3.WebSocket;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+public class IO {
+
+    private static final Logger logger = Logger.getLogger(IO.class.getName());
+
+    private static final ConcurrentHashMap<String, Manager> managers = new ConcurrentHashMap<>();
+
+    /**
+     * Protocol version.
+     */
+    public static int protocol = Parser.protocol;
+
+    public static void setDefaultOkHttpWebSocketFactory(WebSocket.Factory factory) {
+        Manager.defaultWebSocketFactory = factory;
+    }
+
+    public static void setDefaultOkHttpCallFactory(Call.Factory factory) {
+        Manager.defaultCallFactory = factory;
+    }
+
+    private IO() {}
+
+    public static Socket socket(String uri) throws URISyntaxException {
+        return socket(uri, null);
+    }
+
+    public static Socket socket(String uri, Options opts) throws URISyntaxException {
+        return socket(new URI(uri), opts);
+    }
+
+    public static Socket socket(URI uri) {
+        return socket(uri, null);
+    }
+
+    /**
+     * Initializes a {@link Socket} from an existing {@link Manager} for multiplexing.
+     *
+     * @param uri uri to connect.
+     * @param opts options for socket.
+     * @return {@link Socket} instance.
+     */
+    public static Socket socket(URI uri, Options opts) {
+        if (opts == null) {
+            opts = new Options();
+        }
+
+        Url.ParsedURI parsed = Url.parse(uri);
+        URI source = parsed.uri;
+        String id = parsed.id;
+
+        boolean sameNamespace = managers.containsKey(id)
+                && managers.get(id).nsps.containsKey(source.getPath());
+        boolean newConnection = opts.forceNew || !opts.multiplex || sameNamespace;
+        Manager io;
+
+        String query = source.getQuery();
+        if (query != null && (opts.query == null || opts.query.isEmpty())) {
+            opts.query = query;
+        }
+
+        if (newConnection) {
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine(String.format("ignoring socket cache for %s", source));
+            }
+            io = new Manager(source, opts);
+        } else {
+            if (!managers.containsKey(id)) {
+                if (logger.isLoggable(Level.FINE)) {
+                    logger.fine(String.format("new io instance for %s", source));
+                }
+                managers.putIfAbsent(id, new Manager(source, opts));
+            }
+            io = managers.get(id);
+        }
+
+        return io.socket(source.getPath(), opts);
+    }
+
+
+    public static class Options extends Manager.Options {
+
+        public boolean forceNew;
+
+        /**
+         * Whether to enable multiplexing. Default is true.
+         */
+        public boolean multiplex = true;
+
+        /**
+         * <p>
+         *   Retrieve new builder class that helps creating socket option as builder pattern.
+         *   This method returns exactly same result as :
+         * </p>
+         * <code>
+         * SocketOptionBuilder builder = SocketOptionBuilder.builder();
+         * </code>
+         *
+         * @return builder class that helps creating socket option as builder pattern.
+         * @see SocketOptionBuilder#builder()
+         */
+        public static SocketOptionBuilder builder() {
+            return SocketOptionBuilder.builder();
+        }
+    }
+}
diff --git a/src/main/java/com/nq/ws/client/Manager.java b/src/main/java/com/nq/ws/client/Manager.java
new file mode 100644
index 0000000..75e2562
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Manager.java
@@ -0,0 +1,575 @@
+package com.nq.ws.client;
+
+import com.nq.ws.backo.Backoff;
+import com.nq.ws.parser.DecodingException;
+import com.nq.ws.parser.IOParser;
+import com.nq.ws.parser.Packet;
+import com.nq.ws.parser.Parser;
+import io.socket.emitter.Emitter;
+import io.socket.thread.EventThread;
+import okhttp3.Call;
+import okhttp3.WebSocket;
+
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manager class represents a connection to a given Socket.IO server.
+ */
+public class Manager extends Emitter {
+
+    private static final Logger logger = Logger.getLogger(Manager.class.getName());
+
+    /*package*/ enum ReadyState {
+        CLOSED, OPENING, OPEN
+    }
+
+    /**
+     * Called on a successful connection.
+     */
+    public static final String EVENT_OPEN = "open";
+
+    /**
+     * Called on a disconnection.
+     */
+    public static final String EVENT_CLOSE = "close";
+
+    public static final String EVENT_PACKET = "packet";
+    public static final String EVENT_ERROR = "error";
+
+    /**
+     * Called on a successful reconnection.
+     */
+    public static final String EVENT_RECONNECT = "reconnect";
+
+    /**
+     * Called on a reconnection attempt error.
+     */
+    public static final String EVENT_RECONNECT_ERROR = "reconnect_error";
+
+    public static final String EVENT_RECONNECT_FAILED = "reconnect_failed";
+
+    public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";
+
+    /**
+     * Called when a new transport is created. (experimental)
+     */
+    public static final String EVENT_TRANSPORT = Engine.EVENT_TRANSPORT;
+
+    /*package*/ static WebSocket.Factory defaultWebSocketFactory;
+    /*package*/ static Call.Factory defaultCallFactory;
+
+    /*package*/ ReadyState readyState;
+
+    private boolean _reconnection;
+    private boolean skipReconnect;
+    private boolean reconnecting;
+    private boolean encoding;
+    private int _reconnectionAttempts;
+    private long _reconnectionDelay;
+    private long _reconnectionDelayMax;
+    private double _randomizationFactor;
+    private Backoff backoff;
+    private long _timeout;
+    private URI uri;
+    private List<Packet> packetBuffer;
+    private Queue<On.Handle> subs;
+    private Options opts;
+    /*package*/ io.socket.engineio.client.Socket engine;
+    private Parser.Encoder encoder;
+    private Parser.Decoder decoder;
+
+    /**
+     * This HashMap can be accessed from outside of EventThread.
+     */
+    /*package*/ ConcurrentHashMap<String, Socket> nsps;
+
+
+    public Manager() {
+        this(null, null);
+    }
+
+    public Manager(URI uri) {
+        this(uri, null);
+    }
+
+    public Manager(Options opts) {
+        this(null, opts);
+    }
+
+    public Manager(URI uri, Options opts) {
+        if (opts == null) {
+            opts = new Options();
+        }
+        if (opts.path == null) {
+            opts.path = "/socket.io";
+        }
+        if (opts.webSocketFactory == null) {
+            opts.webSocketFactory = defaultWebSocketFactory;
+        }
+        if (opts.callFactory == null) {
+            opts.callFactory = defaultCallFactory;
+        }
+        this.opts = opts;
+        this.nsps = new ConcurrentHashMap<>();
+        this.subs = new LinkedList<>();
+        this.reconnection(opts.reconnection);
+        this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
+        this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
+        this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
+        this.randomizationFactor(opts.randomizationFactor != 0.0 ? opts.randomizationFactor : 0.5);
+        this.backoff = new Backoff()
+                .setMin(this.reconnectionDelay())
+                .setMax(this.reconnectionDelayMax())
+                .setJitter(this.randomizationFactor());
+        this.timeout(opts.timeout);
+        this.readyState = ReadyState.CLOSED;
+        this.uri = uri;
+        this.encoding = false;
+        this.packetBuffer = new ArrayList<>();
+        this.encoder = opts.encoder != null ? opts.encoder : new IOParser.Encoder();
+        this.decoder = opts.decoder != null ? opts.decoder : new IOParser.Decoder();
+    }
+
+    public boolean reconnection() {
+        return this._reconnection;
+    }
+
+    public Manager reconnection(boolean v) {
+        this._reconnection = v;
+        return this;
+    }
+
+    public boolean isReconnecting() {
+        return reconnecting;
+    }
+
+    public int reconnectionAttempts() {
+        return this._reconnectionAttempts;
+    }
+
+    public Manager reconnectionAttempts(int v) {
+        this._reconnectionAttempts = v;
+        return this;
+    }
+
+    public final long reconnectionDelay() {
+        return this._reconnectionDelay;
+    }
+
+    public Manager reconnectionDelay(long v) {
+        this._reconnectionDelay = v;
+        if (this.backoff != null) {
+            this.backoff.setMin(v);
+        }
+        return this;
+    }
+
+    public final double randomizationFactor() {
+        return this._randomizationFactor;
+    }
+
+    public Manager randomizationFactor(double v) {
+        this._randomizationFactor = v;
+        if (this.backoff != null) {
+            this.backoff.setJitter(v);
+        }
+        return this;
+    }
+
+    public final long reconnectionDelayMax() {
+        return this._reconnectionDelayMax;
+    }
+
+    public Manager reconnectionDelayMax(long v) {
+        this._reconnectionDelayMax = v;
+        if (this.backoff != null) {
+            this.backoff.setMax(v);
+        }
+        return this;
+    }
+
+    public long timeout() {
+        return this._timeout;
+    }
+
+    public Manager timeout(long v) {
+        this._timeout = v;
+        return this;
+    }
+
+    private void maybeReconnectOnOpen() {
+        // Only try to reconnect if it's the first time we're connecting
+        if (!this.reconnecting && this._reconnection && this.backoff.getAttempts() == 0) {
+            this.reconnect();
+        }
+    }
+
+    public Manager open(){
+        return open(null);
+    }
+
+    /**
+     * Connects the client.
+     *
+     * @param fn callback.
+     * @return a reference to this object.
+     */
+    public Manager open(final OpenCallback fn) {
+        EventThread.exec(new Runnable() {
+            @Override
+            public void run() {
+                if (logger.isLoggable(Level.FINE)) {
+                    logger.fine(String.format("readyState %s", Manager.this.readyState));
+                }
+                if (Manager.this.readyState == ReadyState.OPEN || Manager.this.readyState == ReadyState.OPENING) return;
+
+                if (logger.isLoggable(Level.FINE)) {
+                    logger.fine(String.format("opening %s", Manager.this.uri));
+                }
+                Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
+                final io.socket.engineio.client.Socket socket = Manager.this.engine;
+                final Manager self = Manager.this;
+                Manager.this.readyState = ReadyState.OPENING;
+                Manager.this.skipReconnect = false;
+
+                // propagate transport event.
+                socket.on(Engine.EVENT_TRANSPORT, new Listener() {
+                    @Override
+                    public void call(Object... args) {
+                        self.emit(Manager.EVENT_TRANSPORT, args);
+                    }
+                });
+
+                final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
+                    @Override
+                    public void call(Object... objects) {
+                        self.onopen();
+                        if (fn != null) fn.call(null);
+                    }
+                });
+
+                On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
+                    @Override
+                    public void call(Object... objects) {
+                        Object data = objects.length > 0 ? objects[0] : null;
+                        logger.fine("connect_error");
+                        self.cleanup();
+                        self.readyState = ReadyState.CLOSED;
+                        self.emit(EVENT_ERROR, data);
+                        if (fn != null) {
+                            Exception err = new SocketIOException("Connection error",
+                                    data instanceof Exception ? (Exception) data : null);
+                            fn.call(err);
+                        } else {
+                            // Only do this if there is no fn to handle the error
+                            self.maybeReconnectOnOpen();
+                        }
+                    }
+                });
+
+                final long timeout = Manager.this._timeout;
+                final Runnable onTimeout = new Runnable() {
+                    @Override
+                    public void run() {
+                        logger.fine(String.format("connect attempt timed out after %d", timeout));
+                        openSub.destroy();
+                        socket.close();
+                        socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
+                    }
+                };
+
+                if (timeout == 0) {
+                    EventThread.exec(onTimeout);
+                    return;
+                } else if (Manager.this._timeout > 0) {
+                    logger.fine(String.format("connection attempt will timeout after %d", timeout));
+
+                    final Timer timer = new Timer();
+                    timer.schedule(new TimerTask() {
+                        @Override
+                        public void run() {
+                            EventThread.exec(onTimeout);
+                        }
+                    }, timeout);
+
+                    Manager.this.subs.add(new On.Handle() {
+                        @Override
+                        public void destroy() {
+                            timer.cancel();
+                        }
+                    });
+                }
+
+                Manager.this.subs.add(openSub);
+                Manager.this.subs.add(errorSub);
+
+                Manager.this.engine.open();
+            }
+        });
+        return this;
+    }
+
+    private void onopen() {
+        logger.fine("open");
+
+        this.cleanup();
+
+        this.readyState = ReadyState.OPEN;
+        this.emit(EVENT_OPEN);
+
+        final io.socket.engineio.client.Socket socket = this.engine;
+        this.subs.add(On.on(socket, Engine.EVENT_DATA, new Listener() {
+            @Override
+            public void call(Object... objects) {
+                Object data = objects[0];
+                try {
+                    if (data instanceof String) {
+                        Manager.this.decoder.add((String) data);
+                    } else if (data instanceof byte[]) {
+                        Manager.this.decoder.add((byte[]) data);
+                    }
+                } catch (DecodingException e) {
+                    logger.fine("error while decoding the packet: " + e.getMessage());
+                }
+            }
+        }));
+        this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
+            @Override
+            public void call(Object... objects) {
+                Manager.this.onerror((Exception)objects[0]);
+            }
+        }));
+        this.subs.add(On.on(socket, Engine.EVENT_CLOSE, new Listener() {
+            @Override
+            public void call(Object... objects) {
+                Manager.this.onclose((String)objects[0]);
+            }
+        }));
+        this.decoder.onDecoded(new Parser.Decoder.Callback() {
+            @Override
+            public void call (Packet packet) {
+                Manager.this.ondecoded(packet);
+            }
+        });
+    }
+
+    private void ondecoded(Packet packet) {
+        this.emit(EVENT_PACKET, packet);
+    }
+
+    private void onerror(Exception err) {
+        logger.log(Level.FINE, "error", err);
+        this.emit(EVENT_ERROR, err);
+    }
+
+    /**
+     * Initializes {@link Socket} instances for each namespaces.
+     *
+     * @param nsp namespace.
+     * @param opts options.
+     * @return a socket instance for the namespace.
+     */
+    public Socket socket(final String nsp, Options opts) {
+        synchronized (this.nsps) {
+            Socket socket = this.nsps.get(nsp);
+            if (socket == null) {
+                socket = new Socket(this, nsp, opts);
+                this.nsps.put(nsp, socket);
+            }
+            return socket;
+        }
+    }
+
+    public Socket socket(String nsp) {
+        return socket(nsp, null);
+    }
+
+    /*package*/ void destroy() {
+        synchronized (this.nsps) {
+            for (Socket socket : this.nsps.values()) {
+                if (socket.isActive()) {
+                    logger.fine("socket is still active, skipping close");
+                    return;
+                }
+            }
+
+            this.close();
+        }
+    }
+
+    /*package*/ void packet(Packet packet) {
+        if (logger.isLoggable(Level.FINE)) {
+            logger.fine(String.format("writing packet %s", packet));
+        }
+        final Manager self = this;
+
+        if (!self.encoding) {
+            self.encoding = true;
+            this.encoder.encode(packet, new Parser.Encoder.Callback() {
+                @Override
+                public void call(Object[] encodedPackets) {
+                    for (Object packet : encodedPackets) {
+                        if (packet instanceof String) {
+                            self.engine.write((String)packet);
+                        } else if (packet instanceof byte[]) {
+                            self.engine.write((byte[])packet);
+                        }
+                    }
+                    self.encoding = false;
+                    self.processPacketQueue();
+                }
+            });
+        } else {
+            self.packetBuffer.add(packet);
+        }
+    }
+
+    private void processPacketQueue() {
+        if (!this.packetBuffer.isEmpty() && !this.encoding) {
+            Packet pack = this.packetBuffer.remove(0);
+            this.packet(pack);
+        }
+    }
+
+    private void cleanup() {
+        logger.fine("cleanup");
+
+        On.Handle sub;
+        while ((sub = this.subs.poll()) != null) sub.destroy();
+        this.decoder.onDecoded(null);
+
+        this.packetBuffer.clear();
+        this.encoding = false;
+
+        this.decoder.destroy();
+    }
+
+    /*package*/ void close() {
+        logger.fine("disconnect");
+        this.skipReconnect = true;
+        this.reconnecting = false;
+        if (this.readyState != ReadyState.OPEN) {
+            // `onclose` will not fire because
+            // an open event never happened
+            this.cleanup();
+        }
+        this.backoff.reset();
+        this.readyState = ReadyState.CLOSED;
+        if (this.engine != null) {
+            this.engine.close();
+        }
+    }
+
+    private void onclose(String reason) {
+        logger.fine("onclose");
+        this.cleanup();
+        this.backoff.reset();
+        this.readyState = ReadyState.CLOSED;
+        this.emit(EVENT_CLOSE, reason);
+
+        if (this._reconnection && !this.skipReconnect) {
+            this.reconnect();
+        }
+    }
+
+    private void reconnect() {
+        if (this.reconnecting || this.skipReconnect) return;
+
+        final Manager self = this;
+
+        if (this.backoff.getAttempts() >= this._reconnectionAttempts) {
+            logger.fine("reconnect failed");
+            this.backoff.reset();
+            this.emit(EVENT_RECONNECT_FAILED);
+            this.reconnecting = false;
+        } else {
+            long delay = this.backoff.duration();
+            logger.fine(String.format("will wait %dms before reconnect attempt", delay));
+
+            this.reconnecting = true;
+            final Timer timer = new Timer();
+            timer.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    EventThread.exec(new Runnable() {
+                        @Override
+                        public void run() {
+                            if (self.skipReconnect) return;
+
+                            logger.fine("attempting reconnect");
+                            int attempts = self.backoff.getAttempts();
+                            self.emit(EVENT_RECONNECT_ATTEMPT, attempts);
+
+                            // check again for the case socket closed in above events
+                            if (self.skipReconnect) return;
+
+                            self.open(new OpenCallback() {
+                                @Override
+                                public void call(Exception err) {
+                                    if (err != null) {
+                                        logger.fine("reconnect attempt error");
+                                        self.reconnecting = false;
+                                        self.reconnect();
+                                        self.emit(EVENT_RECONNECT_ERROR, err);
+                                    } else {
+                                        logger.fine("reconnect success");
+                                        self.onreconnect();
+                                    }
+                                }
+                            });
+                        }
+                    });
+                }
+            }, delay);
+
+            this.subs.add(new On.Handle() {
+                @Override
+                public void destroy() {
+                    timer.cancel();
+                }
+            });
+        }
+    }
+
+    private void onreconnect() {
+        int attempts = this.backoff.getAttempts();
+        this.reconnecting = false;
+        this.backoff.reset();
+        this.emit(EVENT_RECONNECT, attempts);
+    }
+
+
+    public interface OpenCallback {
+
+        void call(Exception err);
+    }
+
+
+    private static class Engine extends io.socket.engineio.client.Socket {
+
+        Engine(URI uri, Options opts) {
+            super(uri, opts);
+        }
+    }
+
+    public static class Options extends io.socket.engineio.client.Socket.Options {
+
+        public boolean reconnection = true;
+        public int reconnectionAttempts;
+        public long reconnectionDelay;
+        public long reconnectionDelayMax;
+        public double randomizationFactor;
+        public Parser.Encoder encoder;
+        public Parser.Decoder decoder;
+        public Map<String, String> auth;
+
+        /**
+         * Connection timeout (ms). Set -1 to disable.
+         */
+        public long timeout = 20000;
+    }
+}
diff --git a/src/main/java/com/nq/ws/client/On.java b/src/main/java/com/nq/ws/client/On.java
new file mode 100644
index 0000000..2cf4dad
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/On.java
@@ -0,0 +1,23 @@
+package com.nq.ws.client;
+
+import io.socket.emitter.Emitter;
+
+public class On {
+
+    private On() {}
+
+    public static Handle on(final Emitter obj, final String ev, final Emitter.Listener fn) {
+        obj.on(ev, fn);
+        return new Handle() {
+            @Override
+            public void destroy() {
+                obj.off(ev, fn);
+            }
+        };
+    }
+
+    public interface Handle {
+
+        void destroy();
+    }
+}
diff --git a/src/main/java/com/nq/ws/client/Socket.java b/src/main/java/com/nq/ws/client/Socket.java
new file mode 100644
index 0000000..facda91
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Socket.java
@@ -0,0 +1,574 @@
+package com.nq.ws.client;
+
+import com.nq.ws.parser.Packet;
+import com.nq.ws.parser.Parser;
+import io.socket.emitter.Emitter;
+import io.socket.thread.EventThread;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The socket class for Socket.IO Client.
+ */
+public class Socket extends Emitter {
+
+    private static final Logger logger = Logger.getLogger(Socket.class.getName());
+
+    /**
+     * Called on a connection.
+     */
+    public static final String EVENT_CONNECT = "connect";
+
+    /**
+     * Called on a disconnection.
+     */
+    public static final String EVENT_DISCONNECT = "disconnect";
+
+    /**
+     * Called on a connection error.
+     *
+     * <p>Parameters:</p>
+     * <ul>
+     *   <li>(Exception) error data.</li>
+     * </ul>
+     */
+    public static final String EVENT_CONNECT_ERROR = "connect_error";
+
+    static final String EVENT_MESSAGE = "message";
+
+    protected static Map<String, Integer> RESERVED_EVENTS = new HashMap<String, Integer>() {{
+        put(EVENT_CONNECT, 1);
+        put(EVENT_CONNECT_ERROR, 1);
+        put(EVENT_DISCONNECT, 1);
+        // used on the server-side
+        put("disconnecting", 1);
+        put("newListener", 1);
+        put("removeListener", 1);
+    }};
+
+    /*package*/ String id;
+
+    private volatile boolean connected;
+    private int ids;
+    private String nsp;
+    private Manager io;
+    private Map<String, String> auth;
+    private Map<Integer, Ack> acks = new HashMap<>();
+    private Queue<On.Handle> subs;
+    private final Queue<List<Object>> receiveBuffer = new LinkedList<>();
+    private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<>();
+
+    private ConcurrentLinkedQueue<Listener> onAnyIncomingListeners = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Listener> onAnyOutgoingListeners = new ConcurrentLinkedQueue<>();
+
+    public Socket(Manager io, String nsp, Manager.Options opts) {
+        this.io = io;
+        this.nsp = nsp;
+        if (opts != null) {
+            this.auth = opts.auth;
+        }
+    }
+
+    private void subEvents() {
+        if (this.subs != null) return;
+
+        final Manager io = Socket.this.io;
+        Socket.this.subs = new LinkedList<On.Handle>() {{
+            add(On.on(io, Manager.EVENT_OPEN, new Listener() {
+                @Override
+                public void call(Object... args) {
+                    Socket.this.onopen();
+                }
+            }));
+            add(On.on(io, Manager.EVENT_PACKET, new Listener() {
+                @Override
+                public void call(Object... args) {
+                    Socket.this.onpacket((Packet<?>) args[0]);
+                }
+            }));
+            add(On.on(io, Manager.EVENT_ERROR, new Listener() {
+                @Override
+                public void call(Object... args) {
+                    if (!Socket.this.connected) {
+                        Socket.super.emit(EVENT_CONNECT_ERROR, args[0]);
+                    }
+                }
+            }));
+            add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
+                @Override
+                public void call(Object... args) {
+                    Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
+                }
+            }));
+        }};
+    }
+
+    public boolean isActive() {
+        return this.subs != null;
+    }
+
+    /**
+     * Connects the socket.
+     */
+    public Socket open() {
+        EventThread.exec(new Runnable() {
+            @Override
+            public void run() {
+                if (Socket.this.connected || Socket.this.io.isReconnecting()) return;
+
+                Socket.this.subEvents();
+                Socket.this.io.open(); // ensure open
+                if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
+            }
+        });
+        return this;
+    }
+
+    /**
+     * Connects the socket.
+     */
+    public Socket connect() {
+        return this.open();
+    }
+
+    /**
+     * Send messages.
+     *
+     * @param args data to send.
+     * @return a reference to this object.
+     */
+    public Socket send(final Object... args) {
+        EventThread.exec(new Runnable() {
+            @Override
+            public void run() {
+                Socket.this.emit(EVENT_MESSAGE, args);
+            }
+        });
+        return this;
+    }
+
+    /**
+     * Emits an event. When you pass {@link Ack} at the last argument, then the acknowledge is done.
+     *
+     * @param event an event name.
+     * @param args data to send.
+     * @return a reference to this object.
+     */
+    @Override
+    public Emitter emit(final String event, final Object... args) {
+        if (RESERVED_EVENTS.containsKey(event)) {
+            throw new RuntimeException("'" + event + "' is a reserved event name");
+        }
+
+        EventThread.exec(new Runnable() {
+            @Override
+            public void run() {
+                Ack ack;
+                Object[] _args;
+                int lastIndex = args.length - 1;
+
+                if (args.length > 0 && args[lastIndex] instanceof Ack) {
+                    _args = new Object[lastIndex];
+                    for (int i = 0; i < lastIndex; i++) {
+                        _args[i] = args[i];
+                    }
+                    ack = (Ack) args[lastIndex];
+                } else {
+                    _args = args;
+                    ack = null;
+                }
+
+                emit(event, _args, ack);
+            }
+        });
+        return this;
+    }
+
+    /**
+     * Emits an event with an acknowledge.
+     *
+     * @param event an event name
+     * @param args data to send.
+     * @param ack the acknowledgement to be called
+     * @return a reference to this object.
+     */
+    public Emitter emit(final String event, final Object[] args, final Ack ack) {
+        EventThread.exec(new Runnable() {
+            @Override
+            public void run() {
+                JSONArray jsonArgs = new JSONArray();
+                jsonArgs.put(event);
+
+                if (args != null) {
+                    for (Object arg : args) {
+                        jsonArgs.put(arg);
+                    }
+                }
+
+                Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);
+
+                if (ack != null) {
+                    final int ackId = Socket.this.ids;
+
+                    logger.fine(String.format("emitting packet with ack id %d", ackId));
+
+                    if (ack instanceof AckWithTimeout) {
+                        final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack;
+                        ackWithTimeout.schedule(new TimerTask() {
+                            @Override
+                            public void run() {
+                                // remove the ack from the map (to prevent an actual acknowledgement)
+                                acks.remove(ackId);
+
+                                // remove the packet from the buffer (if applicable)
+                                Iterator<Packet<JSONArray>> iterator = sendBuffer.iterator();
+                                while (iterator.hasNext()) {
+                                    if (iterator.next().id == ackId) {
+                                        iterator.remove();
+                                    }
+                                }
+
+                                ackWithTimeout.onTimeout();
+                            }
+                        });
+                    }
+
+                    Socket.this.acks.put(ackId, ack);
+                    packet.id = ids++;
+                }
+
+                if (Socket.this.connected) {
+                    Socket.this.packet(packet);
+                } else {
+                    Socket.this.sendBuffer.add(packet);
+                }
+            }
+        });
+        return this;
+    }
+
+    private void packet(Packet packet) {
+        if (packet.type == Parser.EVENT) {
+            if (!onAnyOutgoingListeners.isEmpty()) {
+                Object[] argsAsArray = toArray((JSONArray) packet.data);
+                for (Listener listener : onAnyOutgoingListeners) {
+                    listener.call(argsAsArray);
+                }
+            }
+        }
+        packet.nsp = this.nsp;
+        this.io.packet(packet);
+    }
+
+    private void onopen() {
+        logger.fine("transport is open - connecting");
+
+        if (this.auth != null) {
+            this.packet(new Packet<>(Parser.CONNECT, new JSONObject(this.auth)));
+        } else {
+            this.packet(new Packet<>(Parser.CONNECT));
+        }
+    }
+
+    private void onclose(String reason) {
+        if (logger.isLoggable(Level.FINE)) {
+            logger.fine(String.format("close (%s)", reason));
+        }
+        this.connected = false;
+        this.id = null;
+        super.emit(EVENT_DISCONNECT, reason);
+    }
+
+    private void onpacket(Packet<?> packet) {
+        if (!this.nsp.equals(packet.nsp)) return;
+
+        switch (packet.type) {
+            case Parser.CONNECT: {
+                if (packet.data instanceof JSONObject && ((JSONObject) packet.data).has("sid")) {
+                    try {
+                        this.onconnect(((JSONObject) packet.data).getString("sid"));
+                        return;
+                    } catch (JSONException e) {}
+                } else {
+                    super.emit(EVENT_CONNECT_ERROR, new SocketIOException("It seems you are trying to reach a Socket.IO server in v2.x with a v3.x client, which is not possible"));
+                }
+                break;
+            }
+
+            case Parser.EVENT: {
+                @SuppressWarnings("unchecked")
+                Packet<JSONArray> p = (Packet<JSONArray>) packet;
+                this.onevent(p);
+                break;
+            }
+
+            case Parser.BINARY_EVENT: {
+                @SuppressWarnings("unchecked")
+                Packet<JSONArray> p = (Packet<JSONArray>) packet;
+                this.onevent(p);
+                break;
+            }
+
+            case Parser.ACK: {
+                @SuppressWarnings("unchecked")
+                Packet<JSONArray> p = (Packet<JSONArray>) packet;
+                this.onack(p);
+                break;
+            }
+
+            case Parser.BINARY_ACK: {
+                @SuppressWarnings("unchecked")
+                Packet<JSONArray> p = (Packet<JSONArray>) packet;
+                this.onack(p);
+                break;
+            }
+
+            case Parser.DISCONNECT:
+                this.ondisconnect();
+                break;
+
+            case Parser.CONNECT_ERROR:
+                this.destroy();
+                super.emit(EVENT_CONNECT_ERROR, packet.data);
+                break;
+        }
+    }
+
+    private void onevent(Packet<JSONArray> packet) {
+        List<Object> args = new ArrayList<>(Arrays.asList(toArray(packet.data)));
+        if (logger.isLoggable(Level.FINE)) {
+            logger.fine(String.format("emitting event %s", args));
+        }
+
+        if (packet.id >= 0) {
+            logger.fine("attaching ack callback to event");
+            args.add(this.ack(packet.id));
+        }
+
+        if (this.connected) {
+            if (args.isEmpty()) return;
+            if (!this.onAnyIncomingListeners.isEmpty()) {
+                Object[] argsAsArray = args.toArray();
+                for (Listener listener : this.onAnyIncomingListeners) {
+                    listener.call(argsAsArray);
+                }
+            }
+            String event = args.remove(0).toString();
+            super.emit(event, args.toArray());
+        } else {
+            this.receiveBuffer.add(args);
+        }
+    }
+
+    private Ack ack(final int id) {
+        final Socket self = this;
+        final boolean[] sent = new boolean[] {false};
+        return new Ack() {
+            @Override
+            public void call(final Object... args) {
+                EventThread.exec(new Runnable() {
+                    @Override
+                    public void run() {
+                        if (sent[0]) return;
+                        sent[0] = true;
+                        if (logger.isLoggable(Level.FINE)) {
+                            logger.fine(String.format("sending ack %s", args.length != 0 ? args : null));
+                        }
+
+                        JSONArray jsonArgs = new JSONArray();
+                        for (Object arg : args) {
+                            jsonArgs.put(arg);
+                        }
+
+                        Packet<JSONArray> packet = new Packet<>(Parser.ACK, jsonArgs);
+                        packet.id = id;
+                        self.packet(packet);
+                    }
+                });
+            }
+        };
+    }
+
+    private void onack(Packet<JSONArray> packet) {
+        Ack fn = this.acks.remove(packet.id);
+        if (fn != null) {
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine(String.format("calling ack %s with %s", packet.id, packet.data));
+            }
+            fn.call(toArray(packet.data));
+        } else {
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine(String.format("bad ack %s", packet.id));
+            }
+        }
+    }
+
+    private void onconnect(String id) {
+        this.connected = true;
+        this.id = id;
+        this.emitBuffered();
+        super.emit(EVENT_CONNECT);
+    }
+
+    private void emitBuffered() {
+        List<Object> data;
+        while ((data = this.receiveBuffer.poll()) != null) {
+            String event = (String)data.get(0);
+            super.emit(event, data.toArray());
+        }
+        this.receiveBuffer.clear();
+
+        Packet<JSONArray> packet;
+        while ((packet = this.sendBuffer.poll()) != null) {
+            this.packet(packet);
+        }
+        this.sendBuffer.clear();
+    }
+
+    private void ondisconnect() {
+        if (logger.isLoggable(Level.FINE)) {
+            logger.fine(String.format("server disconnect (%s)", this.nsp));
+        }
+        this.destroy();
+        this.onclose("io server disconnect");
+    }
+
+    private void destroy() {
+        if (this.subs != null) {
+            // clean subscriptions to avoid reconnection
+            for (On.Handle sub : this.subs) {
+                sub.destroy();
+            }
+            this.subs = null;
+        }
+
+        for (Ack ack : acks.values()) {
+            if (ack instanceof AckWithTimeout) {
+                ((AckWithTimeout) ack).cancelTimer();
+            }
+        }
+
+        this.io.destroy();
+    }
+
+    /**
+     * Disconnects the socket.
+     *
+     * @return a reference to this object.
+     */
+    public Socket close() {
+        EventThread.exec(new Runnable() {
+            @Override
+            public void run() {
+                if (Socket.this.connected) {
+                    if (logger.isLoggable(Level.FINE)) {
+                        logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp));
+                    }
+                    Socket.this.packet(new Packet(Parser.DISCONNECT));
+                }
+
+                Socket.this.destroy();
+
+                if (Socket.this.connected) {
+                    Socket.this.onclose("io client disconnect");
+                }
+            }
+        });
+        return this;
+    }
+
+    /**
+     * Disconnects the socket.
+     *
+     * @return a reference to this object.
+     */
+    public Socket disconnect() {
+        return this.close();
+    }
+
+    public Manager io() {
+        return this.io;
+    }
+
+    public boolean connected() {
+        return this.connected;
+    }
+
+    /**
+     * A property on the socket instance that is equal to the underlying engine.io socket id.
+     *
+     * The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
+     *
+     * @return a socket id
+     */
+    public String id() {
+        return this.id;
+    }
+
+    private static Object[] toArray(JSONArray array) {
+        int length = array.length();
+        Object[] data = new Object[length];
+        for (int i = 0; i < length; i++) {
+            Object v;
+            try {
+                v = array.get(i);
+            } catch (JSONException e) {
+                logger.log(Level.WARNING, "An error occured while retrieving data from JSONArray", e);
+                v = null;
+            }
+            data[i] = JSONObject.NULL.equals(v) ? null : v;
+        }
+        return data;
+    }
+
+    public Socket onAnyIncoming(Listener fn) {
+        this.onAnyIncomingListeners.add(fn);
+        return this;
+    }
+
+    public Socket offAnyIncoming() {
+        this.onAnyIncomingListeners.clear();
+        return this;
+    }
+
+    public Socket offAnyIncoming(Listener fn) {
+        Iterator<Listener> it = this.onAnyIncomingListeners.iterator();
+        while (it.hasNext()) {
+            Listener listener = it.next();
+            if (listener == fn) {
+                it.remove();
+                break;
+            }
+        }
+        return this;
+    }
+
+    public Socket onAnyOutgoing(Listener fn) {
+        this.onAnyOutgoingListeners.add(fn);
+        return this;
+    }
+
+    public Socket offAnyOutgoing() {
+        this.onAnyOutgoingListeners.clear();
+        return this;
+    }
+
+    public Socket offAnyOutgoing(Listener fn) {
+        Iterator<Listener> it = this.onAnyOutgoingListeners.iterator();
+        while (it.hasNext()) {
+            Listener listener = it.next();
+            if (listener == fn) {
+                it.remove();
+                break;
+            }
+        }
+        return this;
+    }
+}
+
diff --git a/src/main/java/com/nq/ws/client/SocketIOException.java b/src/main/java/com/nq/ws/client/SocketIOException.java
new file mode 100644
index 0000000..0808387
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/SocketIOException.java
@@ -0,0 +1,20 @@
+package com.nq.ws.client;
+
+public class SocketIOException extends Exception {
+
+    public SocketIOException() {
+        super();
+    }
+
+    public SocketIOException(String message) {
+        super(message);
+    }
+
+    public SocketIOException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SocketIOException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/src/main/java/com/nq/ws/client/SocketOptionBuilder.java b/src/main/java/com/nq/ws/client/SocketOptionBuilder.java
new file mode 100644
index 0000000..6df7eab
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/SocketOptionBuilder.java
@@ -0,0 +1,196 @@
+package com.nq.ws.client;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Convenient builder class that helps creating
+ * {@link io.socket.client.IO.Options Client Option} object as builder pattern.
+ * Finally, you can get option object with call {@link #build()} method.
+ *
+ * @author junbong
+ */
+public class SocketOptionBuilder {
+    /**
+     * Construct new builder with default preferences.
+     *
+     * @return new builder object
+     * @see SocketOptionBuilder#builder(IO.Options)
+     */
+    public static SocketOptionBuilder builder() {
+        return new SocketOptionBuilder();
+    }
+
+
+    /**
+     * Construct this builder from specified option object.
+     * The option that returned from {@link #build()} method
+     * is not equals with given option.
+     * In other words, builder creates new option object
+     * and copy all preferences from given option.
+     *
+     * @param options option object which to copy preferences
+     * @return new builder object
+     */
+    public static SocketOptionBuilder builder(IO.Options options) {
+        return new SocketOptionBuilder(options);
+    }
+
+
+    private final IO.Options options = new IO.Options();
+
+
+    /**
+     * Construct new builder with default preferences.
+     */
+    protected SocketOptionBuilder() {
+        this(null);
+    }
+
+
+    /**
+     * Construct this builder from specified option object.
+     * The option that returned from {@link #build()} method
+     * is not equals with given option.
+     * In other words, builder creates new option object
+     * and copy all preferences from given option.
+     *
+     * @param options option object which to copy preferences. Null-ok.
+     */
+    protected SocketOptionBuilder(IO.Options options) {
+        if (options != null) {
+            this.setForceNew(options.forceNew)
+                .setMultiplex(options.multiplex)
+                .setReconnection(options.reconnection)
+                .setReconnectionAttempts(options.reconnectionAttempts)
+                .setReconnectionDelay(options.reconnectionDelay)
+                .setReconnectionDelayMax(options.reconnectionDelayMax)
+                .setRandomizationFactor(options.randomizationFactor)
+                .setTimeout(options.timeout)
+                .setTransports(options.transports)
+                .setUpgrade(options.upgrade)
+                .setRememberUpgrade(options.rememberUpgrade)
+                .setHost(options.host)
+                .setHostname(options.hostname)
+                .setPort(options.port)
+                .setPolicyPort(options.policyPort)
+                .setSecure(options.secure)
+                .setPath(options.path)
+                .setQuery(options.query)
+                .setAuth(options.auth)
+                .setExtraHeaders(options.extraHeaders);
+        }
+    }
+
+    public SocketOptionBuilder setForceNew(boolean forceNew) {
+        this.options.forceNew = forceNew;
+        return this;
+    }
+
+    public SocketOptionBuilder setMultiplex(boolean multiplex) {
+        this.options.multiplex = multiplex;
+        return this;
+    }
+
+    public SocketOptionBuilder setReconnection(boolean reconnection) {
+        this.options.reconnection = reconnection;
+        return this;
+    }
+
+    public SocketOptionBuilder setReconnectionAttempts(int reconnectionAttempts) {
+        this.options.reconnectionAttempts = reconnectionAttempts;
+        return this;
+    }
+
+    public SocketOptionBuilder setReconnectionDelay(long reconnectionDelay) {
+        this.options.reconnectionDelay = reconnectionDelay;
+        return this;
+    }
+
+    public SocketOptionBuilder setReconnectionDelayMax(long reconnectionDelayMax) {
+        this.options.reconnectionDelayMax = reconnectionDelayMax;
+        return this;
+    }
+
+
+    public SocketOptionBuilder setRandomizationFactor(double randomizationFactor) {
+        this.options.randomizationFactor = randomizationFactor;
+        return this;
+    }
+
+    public SocketOptionBuilder setTimeout(long timeout) {
+        this.options.timeout = timeout;
+        return this;
+    }
+
+    public SocketOptionBuilder setTransports(String[] transports) {
+        this.options.transports = transports;
+        return this;
+    }
+
+    public SocketOptionBuilder setUpgrade(boolean upgrade) {
+        this.options.upgrade = upgrade;
+        return this;
+    }
+
+    public SocketOptionBuilder setRememberUpgrade(boolean rememberUpgrade) {
+        this.options.rememberUpgrade = rememberUpgrade;
+        return this;
+    }
+
+    public SocketOptionBuilder setHost(String host) {
+        this.options.host = host;
+        return this;
+    }
+
+    public SocketOptionBuilder setHostname(String hostname) {
+        this.options.hostname = hostname;
+        return this;
+    }
+
+    public SocketOptionBuilder setPort(int port) {
+        this.options.port = port;
+        return this;
+    }
+
+    public SocketOptionBuilder setPolicyPort(int policyPort) {
+        this.options.policyPort = policyPort;
+        return this;
+    }
+
+    public SocketOptionBuilder setQuery(String query) {
+        this.options.query = query;
+        return this;
+    }
+
+    public SocketOptionBuilder setSecure(boolean secure) {
+        this.options.secure = secure;
+        return this;
+    }
+
+    public SocketOptionBuilder setPath(String path) {
+        this.options.path = path;
+        return this;
+    }
+
+    public SocketOptionBuilder setAuth(Map<String, String> auth) {
+        this.options.auth = auth;
+        return this;
+    }
+
+    public SocketOptionBuilder setExtraHeaders(Map<String, List<String>> extraHeaders) {
+        this.options.extraHeaders = extraHeaders;
+        return this;
+    }
+
+    /**
+     * Finally retrieve {@link io.socket.client.IO.Options} object
+     * from this builder.
+     *
+     * @return option that built from this builder
+     */
+    public IO.Options build() {
+        return this.options;
+    }
+}
diff --git a/src/main/java/com/nq/ws/client/Url.java b/src/main/java/com/nq/ws/client/Url.java
new file mode 100644
index 0000000..5efa349
--- /dev/null
+++ b/src/main/java/com/nq/ws/client/Url.java
@@ -0,0 +1,84 @@
+package com.nq.ws.client;
+
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Url {
+
+    /**
+     * Expected format: "[id:password@]host[:port]"
+     */
+    private static Pattern PATTERN_AUTHORITY = Pattern.compile("^(.*@)?([^:]+)(:\\d+)?$");
+
+    private Url() {}
+
+    static class ParsedURI {
+        public final URI uri;
+        public final String id;
+
+        public ParsedURI(URI uri, String id) {
+            this.uri = uri;
+            this.id = id;
+        }
+    }
+
+    public static ParsedURI parse(URI uri) {
+        String protocol = uri.getScheme();
+        if (protocol == null || !protocol.matches("^https?|wss?$")) {
+            protocol = "https";
+        }
+
+        int port = uri.getPort();
+        if (port == -1) {
+            if ("http".equals(protocol) || "ws".equals(protocol)) {
+                port = 80;
+            } else if ("https".equals(protocol) || "wss".equals(protocol)) {
+                port = 443;
+            }
+        }
+
+        String path = uri.getRawPath();
+        if (path == null || path.length() == 0) {
+            path = "/";
+        }
+
+        String userInfo = uri.getRawUserInfo();
+        String query = uri.getRawQuery();
+        String fragment = uri.getRawFragment();
+        String _host = uri.getHost();
+        if (_host == null) {
+            // might happen on some of Samsung Devices such as S4.
+            _host = extractHostFromAuthorityPart(uri.getRawAuthority());
+        }
+        URI completeUri = URI.create(protocol + "://"
+                + (userInfo != null ? userInfo + "@" : "")
+                + _host
+                + (port != -1 ? ":" + port : "")
+                + path
+                + (query != null ? "?" + query : "")
+                + (fragment != null ? "#" + fragment : ""));
+        String id = protocol + "://" + _host + ":" + port;
+
+        return new ParsedURI(completeUri, id);
+    }
+
+
+    private static String extractHostFromAuthorityPart(String authority)
+    {
+        if (authority == null) {
+            throw new RuntimeException("unable to parse the host from the authority");
+        }
+
+        Matcher matcher = PATTERN_AUTHORITY.matcher(authority);
+
+        // If the authority part does not match the expected format.
+        if (!matcher.matches()) {
+            throw new RuntimeException("unable to parse the host from the authority");
+        }
+
+        // Return the host part.
+        return matcher.group(2);
+    }
+
+}
diff --git a/src/main/java/com/nq/ws/hasbinary/HasBinary.java b/src/main/java/com/nq/ws/hasbinary/HasBinary.java
new file mode 100644
index 0000000..08c4e58
--- /dev/null
+++ b/src/main/java/com/nq/ws/hasbinary/HasBinary.java
@@ -0,0 +1,63 @@
+package com.nq.ws.hasbinary;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class HasBinary {
+	
+    private static final Logger logger = Logger.getLogger(HasBinary.class.getName());
+	
+    private HasBinary() {}
+
+    public static boolean hasBinary(Object data) {
+        return _hasBinary(data);
+    }
+
+    private static boolean _hasBinary(Object obj) {
+        if (obj == null) return false;
+
+        if (obj instanceof byte[]) {
+            return true;
+        }
+
+        if (obj instanceof JSONArray) {
+            JSONArray _obj = (JSONArray)obj;
+            int length = _obj.length();
+            for (int i = 0; i < length; i++) {
+                Object v;
+                try {
+                    v = _obj.isNull(i) ? null : _obj.get(i);
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while retrieving data from JSONArray", e);
+                    return false;
+                }
+                if (_hasBinary(v)) {
+                    return true;
+                }
+            }
+        } else if (obj instanceof JSONObject) {
+            JSONObject _obj = (JSONObject)obj;
+            Iterator keys = _obj.keys();
+            while (keys.hasNext()) {
+                String key = (String)keys.next();
+                Object v;
+                try {
+                    v = _obj.get(key);
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while retrieving data from JSONObject", e);
+                    return false;       
+                }
+                if (_hasBinary(v)) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+}
diff --git a/src/main/java/com/nq/ws/parser/Binary.java b/src/main/java/com/nq/ws/parser/Binary.java
new file mode 100644
index 0000000..ac2618f
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/Binary.java
@@ -0,0 +1,127 @@
+package com.nq.ws.parser;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class Binary {
+
+    private static final String KEY_PLACEHOLDER = "_placeholder";
+
+    private static final String KEY_NUM = "num";
+    
+    private static final Logger logger = Logger.getLogger(Binary.class.getName());
+
+    @SuppressWarnings("unchecked")
+    public static DeconstructedPacket deconstructPacket(Packet packet) {
+        List<byte[]> buffers = new ArrayList<>();
+
+        packet.data = _deconstructPacket(packet.data, buffers);
+        packet.attachments = buffers.size();
+
+        DeconstructedPacket result = new DeconstructedPacket();
+        result.packet = packet;
+        result.buffers = buffers.toArray(new byte[buffers.size()][]);
+        return result;
+    }
+
+    private static Object _deconstructPacket(Object data, List<byte[]> buffers) {
+        if (data == null) return null;
+
+        if (data instanceof byte[]) {
+            JSONObject placeholder = new JSONObject();
+            try {
+                placeholder.put(KEY_PLACEHOLDER, true);
+                placeholder.put(KEY_NUM, buffers.size());
+            } catch (JSONException e) {
+                logger.log(Level.WARNING, "An error occured while putting data to JSONObject", e);
+                return null;
+            }
+            buffers.add((byte[])data);
+            return placeholder;
+        } else if (data instanceof JSONArray) {
+            JSONArray newData = new JSONArray();
+            JSONArray _data = (JSONArray)data;
+            int len = _data.length();
+            for (int i = 0; i < len; i ++) {
+                try {
+                    newData.put(i, _deconstructPacket(_data.get(i), buffers));
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while putting packet data to JSONObject", e);
+                    return null;
+                }
+            }
+            return newData;
+        } else if (data instanceof JSONObject) {
+            JSONObject newData = new JSONObject();
+            JSONObject _data = (JSONObject)data;
+            Iterator<?> iterator = _data.keys();
+            while (iterator.hasNext()) {
+                String key = (String)iterator.next();
+                try {
+                    newData.put(key, _deconstructPacket(_data.get(key), buffers));
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while putting data to JSONObject", e);
+                    return null;
+                }
+            }
+            return newData;
+        }
+        return data;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Packet reconstructPacket(Packet packet, byte[][] buffers) {
+        packet.data = _reconstructPacket(packet.data, buffers);
+        packet.attachments = -1;
+       return packet;
+    }
+
+    private static Object _reconstructPacket(Object data, byte[][] buffers) {
+        if (data instanceof JSONArray) {
+            JSONArray _data = (JSONArray)data;
+            int len = _data.length();
+            for (int i = 0; i < len; i ++) {
+                try {
+                    _data.put(i, _reconstructPacket(_data.get(i), buffers));
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while putting packet data to JSONObject", e);
+                    return null;
+                }
+            }
+            return _data;
+        } else if (data instanceof JSONObject) {
+            JSONObject _data = (JSONObject)data;
+            if (_data.optBoolean(KEY_PLACEHOLDER)) {
+                int num = _data.optInt(KEY_NUM, -1);
+                return num >= 0 && num < buffers.length ? buffers[num] : null;
+            }
+            Iterator<?> iterator = _data.keys();
+            while (iterator.hasNext()) {
+                String key = (String)iterator.next();
+                try {
+                    _data.put(key, _reconstructPacket(_data.get(key), buffers));
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while putting data to JSONObject", e);
+                    return null;
+                }
+            }
+            return _data;
+        }
+        return data;
+    }
+
+    public static class DeconstructedPacket {
+
+        public Packet packet;
+        public byte[][] buffers;
+    }
+}
+
+
diff --git a/src/main/java/com/nq/ws/parser/DecodingException.java b/src/main/java/com/nq/ws/parser/DecodingException.java
new file mode 100644
index 0000000..67806ea
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/DecodingException.java
@@ -0,0 +1,7 @@
+package com.nq.ws.parser;
+
+public class DecodingException extends RuntimeException {
+    public DecodingException(String message) {
+        super(message);
+    }
+}
diff --git a/src/main/java/com/nq/ws/parser/IOParser.java b/src/main/java/com/nq/ws/parser/IOParser.java
new file mode 100644
index 0000000..ec93478
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/IOParser.java
@@ -0,0 +1,263 @@
+package com.nq.ws.parser;
+
+import com.nq.ws.hasbinary.HasBinary;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+final public class IOParser implements Parser {
+
+    private static final Logger logger = Logger.getLogger(IOParser.class.getName());
+
+    private IOParser() {}
+
+    final public static class Encoder implements Parser.Encoder {
+
+        public Encoder() {}
+
+        @Override
+        public void encode(Packet obj, Callback callback) {
+            if ((obj.type == EVENT || obj.type == ACK) && HasBinary.hasBinary(obj.data)) {
+                obj.type = obj.type == EVENT ? BINARY_EVENT : BINARY_ACK;
+            }
+
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine(String.format("encoding packet %s", obj));
+            }
+
+            if (BINARY_EVENT == obj.type || BINARY_ACK == obj.type) {
+                encodeAsBinary(obj, callback);
+            } else {
+                String encoding = encodeAsString(obj);
+                callback.call(new String[] {encoding});
+            }
+        }
+
+        private String encodeAsString(Packet obj) {
+            StringBuilder str = new StringBuilder("" + obj.type);
+
+            if (BINARY_EVENT == obj.type || BINARY_ACK == obj.type) {
+                str.append(obj.attachments);
+                str.append("-");
+            }
+
+            if (obj.nsp != null && obj.nsp.length() != 0 && !"/".equals(obj.nsp)) {
+                str.append(obj.nsp);
+                str.append(",");
+            }
+
+            if (obj.id >= 0) {
+                str.append(obj.id);
+            }
+
+            if (obj.data != null) {
+                str.append(obj.data);
+            }
+
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine(String.format("encoded %s as %s", obj, str));
+            }
+            return str.toString();
+        }
+
+        private void encodeAsBinary(Packet obj, Callback callback) {
+            Binary.DeconstructedPacket deconstruction = Binary.deconstructPacket(obj);
+            String pack = encodeAsString(deconstruction.packet);
+            List<Object> buffers = new ArrayList<Object>(Arrays.asList(deconstruction.buffers));
+
+            buffers.add(0, pack);
+            callback.call(buffers.toArray());
+        }
+    }
+
+    final public static class Decoder implements Parser.Decoder {
+
+        /*package*/ BinaryReconstructor reconstructor;
+
+        private Decoder.Callback onDecodedCallback;
+
+        public Decoder() {
+            this.reconstructor = null;
+        }
+
+        @Override
+        public void add(String obj) {
+            Packet packet = decodeString(obj);
+            if (BINARY_EVENT == packet.type || BINARY_ACK == packet.type) {
+                this.reconstructor = new BinaryReconstructor(packet);
+
+                if (this.reconstructor.reconPack.attachments == 0) {
+                    if (this.onDecodedCallback != null) {
+                        this.onDecodedCallback.call(packet);
+                    }
+                }
+            } else {
+                if (this.onDecodedCallback != null) {
+                    this.onDecodedCallback.call(packet);
+                }
+            }
+        }
+
+        @Override
+        public void add(byte[] obj) {
+            if (this.reconstructor == null) {
+                throw new RuntimeException("got binary data when not reconstructing a packet");
+            } else {
+                Packet packet = this.reconstructor.takeBinaryData(obj);
+                if (packet != null) {
+                    this.reconstructor = null;
+                    if (this.onDecodedCallback != null) {
+                        this.onDecodedCallback.call(packet);
+                    }
+                }
+            }
+        }
+
+        private static Packet decodeString(String str) {
+            int i = 0;
+            int length = str.length();
+
+            Packet<Object> p = new Packet<>(Character.getNumericValue(str.charAt(0)));
+
+            if (p.type < 0 || p.type > types.length - 1) {
+                throw new DecodingException("unknown packet type " + p.type);
+            }
+
+            if (BINARY_EVENT == p.type || BINARY_ACK == p.type) {
+                if (!str.contains("-") || length <= i + 1) {
+                    throw new DecodingException("illegal attachments");
+                }
+                StringBuilder attachments = new StringBuilder();
+                while (str.charAt(++i) != '-') {
+                    attachments.append(str.charAt(i));
+                }
+                p.attachments = Integer.parseInt(attachments.toString());
+            }
+
+            if (length > i + 1 && '/' == str.charAt(i + 1)) {
+                StringBuilder nsp = new StringBuilder();
+                while (true) {
+                    ++i;
+                    char c = str.charAt(i);
+                    if (',' == c) break;
+                    nsp.append(c);
+                    if (i + 1 == length) break;
+                }
+                p.nsp = nsp.toString();
+            } else {
+                p.nsp = "/";
+            }
+
+            if (length > i + 1){
+                Character next = str.charAt(i + 1);
+                if (Character.getNumericValue(next) > -1) {
+                    StringBuilder id = new StringBuilder();
+                    while (true) {
+                        ++i;
+                        char c = str.charAt(i);
+                        if (Character.getNumericValue(c) < 0) {
+                            --i;
+                            break;
+                        }
+                        id.append(c);
+                        if (i + 1 == length) break;
+                    }
+                    try {
+                        p.id = Integer.parseInt(id.toString());
+                    } catch (NumberFormatException e){
+                        throw new DecodingException("invalid payload");
+                    }
+                }
+            }
+
+            if (length > i + 1){
+                try {
+                    str.charAt(++i);
+                    p.data = new JSONTokener(str.substring(i)).nextValue();
+                } catch (JSONException e) {
+                    logger.log(Level.WARNING, "An error occured while retrieving data from JSONTokener", e);
+                    throw new DecodingException("invalid payload");
+                }
+                if (!isPayloadValid(p.type, p.data)) {
+                    throw new DecodingException("invalid payload");
+                }
+            }
+
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine(String.format("decoded %s as %s", str, p));
+            }
+            return p;
+        }
+
+        private static boolean isPayloadValid(int type, Object payload) {
+            switch (type) {
+                case Parser.CONNECT:
+                case Parser.CONNECT_ERROR:
+                    return payload instanceof JSONObject;
+                case Parser.DISCONNECT:
+                    return payload == null;
+                case Parser.EVENT:
+                case Parser.BINARY_EVENT:
+                    return payload instanceof JSONArray
+                            && ((JSONArray) payload).length() > 0
+                            && !((JSONArray) payload).isNull(0);
+                case Parser.ACK:
+                case Parser.BINARY_ACK:
+                    return payload instanceof JSONArray;
+                default:
+                    return false;
+            }
+        }
+
+        @Override
+        public void destroy() {
+            if (this.reconstructor != null) {
+                this.reconstructor.finishReconstruction();
+            }
+            this.onDecodedCallback = null;
+        }
+
+        @Override
+        public void onDecoded (Callback callback) {
+            this.onDecodedCallback = callback;
+        }
+    }
+
+
+    /*package*/ static class BinaryReconstructor {
+
+        public Packet reconPack;
+
+        /*package*/ List<byte[]> buffers;
+
+        BinaryReconstructor(Packet packet) {
+            this.reconPack = packet;
+            this.buffers = new ArrayList<>();
+        }
+
+        public Packet takeBinaryData(byte[] binData) {
+            this.buffers.add(binData);
+            if (this.buffers.size() == this.reconPack.attachments) {
+                Packet packet = Binary.reconstructPacket(this.reconPack,
+                        this.buffers.toArray(new byte[this.buffers.size()][]));
+                this.finishReconstruction();
+                return packet;
+            }
+            return null;
+        }
+
+        public void finishReconstruction () {
+            this.reconPack = null;
+            this.buffers = new ArrayList<>();
+        }
+    }
+}
+
+
diff --git a/src/main/java/com/nq/ws/parser/Packet.java b/src/main/java/com/nq/ws/parser/Packet.java
new file mode 100644
index 0000000..e9be057
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/Packet.java
@@ -0,0 +1,22 @@
+package com.nq.ws.parser;
+
+
+public class Packet<T> {
+
+    public int type = -1;
+    public int id = -1;
+    public String nsp;
+    public T data;
+    public int attachments;
+
+    public Packet() {}
+
+    public Packet(int type) {
+        this.type = type;
+    }
+
+    public Packet(int type, T data) {
+        this.type = type;
+        this.data = data;
+    }
+}
diff --git a/src/main/java/com/nq/ws/parser/Parser.java b/src/main/java/com/nq/ws/parser/Parser.java
new file mode 100644
index 0000000..f5f0c32
--- /dev/null
+++ b/src/main/java/com/nq/ws/parser/Parser.java
@@ -0,0 +1,82 @@
+package com.nq.ws.parser;
+
+public interface Parser {
+
+    /**
+     * Packet type `connect`.
+     */
+    int CONNECT = 0;
+
+    /**
+     * Packet type `disconnect`.
+     */
+    int DISCONNECT = 1;
+
+    /**
+     * Packet type `event`.
+     */
+    int EVENT = 2;
+
+    /**
+     * Packet type `ack`.
+     */
+    int ACK = 3;
+
+    /**
+     * Packet type `error`.
+     */
+    int CONNECT_ERROR = 4;
+
+    /**
+     * Packet type `binary event`.
+     */
+    int BINARY_EVENT = 5;
+
+    /**
+     * Packet type `binary ack`.
+     */
+    int BINARY_ACK = 6;
+
+    int protocol = 5;
+
+    /**
+     * Packet types.
+     */
+    String[] types = new String[] {
+        "CONNECT",
+        "DISCONNECT",
+        "EVENT",
+        "ACK",
+        "ERROR",
+        "BINARY_EVENT",
+        "BINARY_ACK"
+    };
+
+    interface Encoder {
+
+        void encode(Packet obj, Callback callback);
+
+        interface Callback {
+
+            void call(Object[] data);
+        }
+    }
+
+    interface Decoder {
+
+        void add(String obj);
+
+        void add(byte[] obj);
+
+        void destroy();
+
+        void onDecoded(Callback callback);
+
+        interface Callback {
+
+            void call(Packet packet);
+        }
+    }
+}
+
+
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 7179edc..59b9a9a 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -45,6 +45,10 @@
 IN_WS_URL =ws://ws.is4vc.com:8001/websocket-server
 IN_KEY = r3ZAgtcYzuBizmqge2hK
 
+JS_IN_HTTP_API = http://api-in-2-socket.js-stock.top
+JS_IN_WS_URL =ws://api-in-2-ws.js-stock.top
+JS_IN_KEY = eVKtHt7aG4m6ozwWL9qG
+
 US_HTTP_API = http://api-us.js-stock.top/
 US_WS_URL = ws://ws-us.js-stock.top
 US_KEY = jZFrku4RGQjP87Hmq5tm

--
Gitblit v1.9.3