| | |
| | | package org.example.websocket.server; |
| | | |
| | | import cn.hutool.json.JSONUtil; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.core.type.TypeReference; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.google.common.reflect.TypeToken; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.reflect.TypeToken; |
| | | import com.google.gson.GsonBuilder; |
| | | import lombok.NonNull; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.example.pojo.ConfigCurrency; |
| | | import org.example.pojo.MarketDataOut; |
| | | import org.example.pojo.bo.WsBo; |
| | | import org.example.util.RedisUtil; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.CollectionUtils; |
| | | import org.springframework.web.bind.annotation.PostMapping; |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | | |
| | |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.io.IOException; |
| | | import java.lang.reflect.Type; |
| | | import java.math.BigDecimal; |
| | | import java.nio.ByteBuffer; |
| | | import java.text.SimpleDateFormat; |
| | | import java.util.*; |
| | |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.stream.Collectors; |
| | | import java.util.zip.Deflater; |
| | | import java.util.zip.DeflaterOutputStream; |
| | | |
| | |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session) { |
| | | WsBo wsBo = getWsBoForSession(session.getId()); |
| | | String s = RedisUtil.get("user_" + wsBo.getUserId()); |
| | | if(null == s || s.isEmpty() && !wsBo.getToken().equals(s)){ |
| | | closeSession(session, "用户未登录"); |
| | | } |
| | | this.session = session; |
| | | int count = onlineCount.incrementAndGet(); |
| | | wsServers.add(this); |
| | |
| | | |
| | | private boolean hasReceivedSubscription(Session session) { |
| | | WsBo wsBo = getWsBoForSession(session.getId()); |
| | | return wsBo != null; // 简化逻辑 |
| | | return wsBo != null; |
| | | } |
| | | |
| | | @OnError |
| | |
| | | ScheduledFuture<?> future = scheduledTasks.remove(this.session.getId()); |
| | | if (future != null) { |
| | | future.cancel(true); // 取消定时任务 |
| | | } |
| | | ScheduledFuture<?> task = pushTasks.remove(this.session.getId()); |
| | | if (task != null) { |
| | | task.cancel(true); // 取消推送任务 |
| | | } |
| | | } |
| | | |
| | |
| | | Lock sessionLock = getSessionLock(session.getId()); |
| | | sessionLock.lock(); |
| | | try { |
| | | WsBo wsBo = getWsBoForSession(session.getId()); |
| | | if (wsBo != null) { |
| | | int intervalSeconds = wsBo.getTime(); |
| | | pushMessageWithInterval(session, message, intervalSeconds); |
| | | } |
| | | schedulePushMessage(session, message); |
| | | } catch (Exception e) { |
| | | log.error("发送消息时出现异常: {}", e.getMessage()); |
| | | } finally { |
| | |
| | | return threadLocalData.get(sessionId); |
| | | } |
| | | |
| | | private Map<String, ScheduledFuture<?>> pushTasks = new ConcurrentHashMap<>(); |
| | | ScheduledExecutorService pushScheduler = Executors.newScheduledThreadPool(1); |
| | | private Map<Session, Long> lastMessageTimeMap = new ConcurrentHashMap<>(); |
| | | |
| | | private void pushMessageWithInterval(Session session, String message, int intervalSeconds) { |
| | | // 创建一个可以控制任务状态的 AtomicBoolean |
| | | intervalSeconds = 5; |
| | | AtomicBoolean isActive = new AtomicBoolean(true); |
| | | ScheduledFuture<?> future = pushScheduler.scheduleAtFixedRate(() -> { |
| | | if (isActive.get()) { // 检查是否应该继续执行 |
| | | try { |
| | | pushMessage(session, message); |
| | | } catch (Exception e) { |
| | | log.error("推送消息时出现异常: {}", e.getMessage()); |
| | | isActive.set(false); // 出现异常则停止任务 |
| | | } |
| | | private void schedulePushMessage(Session session, String message) throws JsonProcessingException { |
| | | WsBo wsBo = getWsBoForSession(session.getId()); |
| | | if (wsBo != null) { |
| | | long currentTime = System.currentTimeMillis(); |
| | | long lastMessageTime = lastMessageTimeMap.getOrDefault(session, 0L); |
| | | int time = wsBo.getTime(); |
| | | message = megFiltration(wsBo,message); |
| | | if (currentTime - lastMessageTime >= time * 1000) { |
| | | // 时间间隔达到要求,可以发送消息 |
| | | pushMessage(session, message); |
| | | lastMessageTimeMap.put(session, currentTime); // 更新最后发送时间 |
| | | } else { |
| | | // 时间间隔未达到,不发送消息,可以记录日志或者其他操作 |
| | | log.info("距离上次发送消息时间未达到指定间隔,不发送消息。"); |
| | | } |
| | | }, 0, intervalSeconds, TimeUnit.SECONDS); |
| | | } |
| | | } |
| | | private static final Gson gson = new Gson(); |
| | | private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException { |
| | | List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType()); |
| | | |
| | | // 保存任务的引用,以便后续取消 |
| | | pushTasks.put(session.getId(), future); |
| | | String key = "config_"; |
| | | String value = RedisUtil.get(key + wsBo.getUserId()); |
| | | List<ConfigCurrency> currencies = null; |
| | | if(null != value && !value.isEmpty()){ |
| | | ObjectMapper objectMapper = new ObjectMapper(); |
| | | currencies = objectMapper.readValue(value, new TypeReference<List<ConfigCurrency>>() {}); |
| | | } |
| | | if(!CollectionUtils.isEmpty(currencies)){ |
| | | List<String> currency = currencies.stream().map(ConfigCurrency::getCurrency).collect(Collectors.toList()); |
| | | List<String> buy = currencies.stream().map(ConfigCurrency::getBuy).collect(Collectors.toList()); |
| | | List<String> sell = currencies.stream().map(ConfigCurrency::getSell).collect(Collectors.toList()); |
| | | redisValueMap = redisValueMap.stream() |
| | | .filter(data -> !currency.contains(data.getBaseAsset()) && !buy.contains(data.getBuyingPlatform()) && !sell.contains(data.getSellPlatform())) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | |
| | | //查询币种 |
| | | if(null != wsBo.getCurrency()){ |
| | | redisValueMap = redisValueMap.stream() |
| | | .filter(data -> wsBo.getCurrency().equals(data.getBaseAsset())) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | //价差 |
| | | if(wsBo.getSpread() > 0){ |
| | | redisValueMap = redisValueMap.stream() |
| | | .filter(data -> Double.parseDouble(data.getSpread()) >= wsBo.getSpread()) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | //最低金额 |
| | | if(null != wsBo.getMinAmount()){ |
| | | redisValueMap = redisValueMap.stream() |
| | | .filter(data -> new BigDecimal(data.getSellTotalPrice()).compareTo(new BigDecimal(wsBo.getMinAmount())) >= 0 ) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | //过滤平台 |
| | | if(null != wsBo.getPlatformList()){ |
| | | List<String> list = Arrays.asList(wsBo.getPlatformList().split(",")); |
| | | redisValueMap = redisValueMap.stream() |
| | | .filter(data -> !list.contains(data.getBuyingPlatform()) && !list.contains(data.getSellPlatform())) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | //自选标记 |
| | | if(null != wsBo.getIsMarker()){ |
| | | List<String> list = Arrays.asList(wsBo.getIsMarker().split(",")); |
| | | redisValueMap.stream() |
| | | .filter(data -> list.contains(data.getBaseAsset())) |
| | | .forEach(data -> data.setMarker(true)); |
| | | } |
| | | Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| | | String json = gson.toJson(redisValueMap); |
| | | return json; |
| | | } |
| | | |
| | | public void pushMessage(Session session, String message) throws IOException { |
| | | session.getBasicRemote().sendText(message); |
| | | private void pushMessage(Session session, String message) { |
| | | try { |
| | | if (session != null && session.isOpen()) { |
| | | session.getBasicRemote().sendText(message); |
| | | } else { |
| | | log.error("会话不存在或已关闭,无法推送消息"); |
| | | } |
| | | } catch (IOException e) { |
| | | log.error("推送消息时出现IO异常: {}", e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | // 关闭会话的方法 |