.idea/vcs.xml
New file @@ -0,0 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <project version="4"> <component name="VcsDirectoryMappings"> <mapping directory="" vcs="Git" /> </component> </project> websocketSerivce/src/main/java/org/example/constant/StockConstant.java
@@ -22,6 +22,7 @@ public static String US_KEY = "F03fXyNJKeFiTGsaoXHg"; public static String IPO_HTTP_API = "http://test.js-stock.top/"; } websocketSerivce/src/main/java/org/example/controller/ApiController.java
@@ -20,24 +20,6 @@ @RequestMapping("/api/all") public class ApiController { @Autowired JournalismMapper journalismMapper; @Autowired StockMarketNewMapper stockMarketNewMapper; @GetMapping("JournalismAll") public ServerResponse JournalismAll(){ LambdaQueryWrapper<Journalism> queryWrapper = new LambdaQueryWrapper<>(); return ServerResponse.createBySuccess(journalismMapper.selectList(queryWrapper)); } @GetMapping("StockMarketNew") public ServerResponse StockMarketNew(){ LambdaQueryWrapper<StockMarketNew> queryWrapper = new LambdaQueryWrapper<>(); return ServerResponse.createBySuccess(stockMarketNewMapper.selectList(queryWrapper)); } /*查询股票日线*/ @RequestMapping({"getKData.do"}) @@ -48,9 +30,9 @@ @RequestParam("stockType") String stockType ) { EStockType eStockType = null; if(stockType.equals("US")){ if (stockType.equals("US")) { eStockType = EStockType.US; }else{ } else { eStockType = EStockType.IN; } return HttpUtil.get(eStockType.stockUrl + "kline?pid=" + pid + "&interval=" + interval + "&key=" + eStockType.stockKey); websocketSerivce/src/main/java/org/example/controller/JournalismController.java
New file @@ -0,0 +1,37 @@ package org.example.controller; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import org.example.common.ServerResponse; import org.example.dao.JournalismMapper; import org.example.pojo.Journalism; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @program: webSocketProject * @description: 新闻 * @create: 2024-03-27 14:03 **/ @RestController @RequestMapping("/api/news/") public class JournalismController { @Autowired JournalismMapper journalismMapper; @GetMapping("getNewsListAll.do") public ServerResponse JournalismAll(){ LambdaQueryWrapper<Journalism> queryWrapper = new LambdaQueryWrapper<>(); return ServerResponse.createBySuccess(journalismMapper.selectList(queryWrapper)); } @GetMapping("getNewsList.do") public ServerResponse JournalismNumber(){ LambdaQueryWrapper<Journalism> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.orderByDesc(Journalism::getTime).last("limit 20"); return ServerResponse.createBySuccess(journalismMapper.selectList(queryWrapper)); } } websocketSerivce/src/main/java/org/example/controller/StockMarketNewController.java
New file @@ -0,0 +1,31 @@ //package org.example.controller; // //import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; //import org.example.common.ServerResponse; //import org.example.dao.StockMarketNewMapper; //import org.example.pojo.StockMarketNew; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.web.bind.annotation.GetMapping; //import org.springframework.web.bind.annotation.RequestMapping; //import org.springframework.web.bind.annotation.RestController; // ///** // * @program: webSocketProject // * @description: // * @create: 2024-03-27 14:07 // **/ //@RestController //@RequestMapping("/api/StockMarketNew") //public class StockMarketNewController { // // // @Autowired // StockMarketNewMapper stockMarketNewMapper; // // @GetMapping("StockMarketNewAll") // public ServerResponse StockMarketNew(){ // LambdaQueryWrapper<StockMarketNew> queryWrapper = new LambdaQueryWrapper<>(); // return ServerResponse.createBySuccess(stockMarketNewMapper.selectList(queryWrapper)); // } // //} websocketSerivce/src/main/java/org/example/controller/StockNewShareController.java
New file @@ -0,0 +1,68 @@ package org.example.controller; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.common.ServerResponse; import org.example.dao.StockMarketNewMapper; import org.example.dao.StockNewShareMapper; import org.example.enums.EStockType; import org.example.pojo.StockMarketNew; import org.example.pojo.StockNewShare; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.List; /** * @program: webSocketProject * @description: 新股 * @create: 2024-03-27 15:27 **/ @RestController @RequestMapping("/api/stock") public class StockNewShareController { @Autowired StockNewShareMapper stockNewShareMapper; @Autowired StockMarketNewMapper stockMarketNewMapper; @GetMapping("getStock.do") public ServerResponse StockMarketNew(@RequestParam(value = "stockType", required = false) String stockType){ // 将输入的股票类型转换为大写 String upperCase = stockType.toUpperCase(); // 根据代码获取对应的枚举类型 EStockType code = EStockType.getEsByCode(upperCase); if(code == null){ return ServerResponse.createBySuccessMsg("请输入正确的stockType"); } // 根据枚举类型进行不同的操作 switch(code){ case XG: // 查询新股市场数据 return getStockData(stockNewShareMapper); case IN: // 查询股票数据 return getStockData(stockMarketNewMapper); default: return ServerResponse.createBySuccessMsg("未找到对应的股票数据"); } } // 通用方法,根据传入的mapper查询数据 private <T> ServerResponse getStockData(BaseMapper<T> mapper){ LambdaQueryWrapper<T> wrapper = new LambdaQueryWrapper<>(); List<T> list = mapper.selectList(wrapper); if(list.isEmpty()){ return ServerResponse.createByErrorMsg("查询结果为空"); } return ServerResponse.createBySuccess(list); } } websocketSerivce/src/main/java/org/example/dao/DataServiceKeyMapper.java
New file @@ -0,0 +1,9 @@ package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; import org.example.pojo.DataServiceKey; @Mapper public interface DataServiceKeyMapper extends BaseMapper<DataServiceKey> { } websocketSerivce/src/main/java/org/example/dao/StockNewShareMapper.java
New file @@ -0,0 +1,9 @@ package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; import org.example.pojo.StockNewShare; @Mapper public interface StockNewShareMapper extends BaseMapper<StockNewShare> { } websocketSerivce/src/main/java/org/example/enums/EStockType.java
@@ -11,6 +11,7 @@ IN("IN","印度股票","14", StockConstant.HTTP_API, StockConstant.KEY), XG("XG","新股","14", StockConstant.HTTP_API, StockConstant.KEY), US("US","美国股票","5",StockConstant.US_API_URL,StockConstant.US_KEY); private String code; private String typeDesc; @@ -36,6 +37,15 @@ } } public static EStockType getEsByCode(String code) { for (EStockType type : EStockType.values()) { if (type.getCode().equals(code)) { return type; } } return null; } public String getContryId() { return contryId; } websocketSerivce/src/main/java/org/example/pojo/DataServiceKey.java
New file @@ -0,0 +1,58 @@ package org.example.pojo; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.io.Serializable; import java.util.Date; /** * @program: webSocketProject * @description: * @create: 2024-03-27 17:01 **/ @Data @TableName("data_service_key") public class DataServiceKey implements Serializable { private static final long serialVersionUID = 1L; /** * id */ @TableId(type = IdType.AUTO) private Integer id; /** * key */ private String tokenKey; /** * 是否可用 0:否 1: 是 */ private Integer isAvailable; /** * 到期时间 */ private Date expirationTime; /** * 添加时间 */ private Date startTime; /** * 修改时间 */ private Date updateTime; /** * 备注 */ private String remark; public DataServiceKey() {} } websocketSerivce/src/main/java/org/example/pojo/StockNewShare.java
New file @@ -0,0 +1,59 @@ package org.example.pojo; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import java.io.Serializable; /** * @program: webSocketProject * @description: * @create: 2024-03-27 13:34 **/ @Data @TableName("stock_new_share") public class StockNewShare implements Serializable { private static final long serialVersionUID = 1L; /** * id */ @TableId(type = IdType.AUTO) private Integer id; /** * 产品id */ private String pid; /** * 发现价格 */ private String ipoPrice; /** * 发行市价 */ private String ipoValue; /** * 交易平台 */ private String exchange; /** * 公司名称 */ private String company; /** * 上市时间 */ private String iopListing; public StockNewShare() {} } websocketSerivce/src/main/java/org/example/timedTask/JournalismTask.java
File was renamed from websocketSerivce/src/main/java/org/example/timedTask/NewsTask.java @@ -1,6 +1,5 @@ package org.example.timedTask; import org.example.enums.EStockType; import org.example.server.ISiteNewsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,8 +13,8 @@ @Component public class NewsTask { private static final Logger log = LoggerFactory.getLogger(NewsTask.class); public class JournalismTask { private static final Logger log = LoggerFactory.getLogger(JournalismTask.class); @Autowired ISiteNewsService iSiteNewsService; websocketSerivce/src/main/java/org/example/timedTask/NewShareTask.java
New file @@ -0,0 +1,80 @@ package org.example.timedTask; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSONArray; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.example.dao.StockNewShareMapper; import org.example.enums.EStockType; import org.example.pojo.StockNewShare; import org.example.util.HttpClientRequest; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; /** * @program: webSocketProject * @description: 新股 * @create: 2024-03-27 13:14 **/ @Component @Slf4j public class NewShareTask { private final Lock lock = new ReentrantLock(); @Autowired StockNewShareMapper stockNewShareMapper; /* * ipo、新股日历抓取 * */ @Scheduled(cron = "0 0/1 * * * ? ") public void get() { if (lock.tryLock()) { log.info("ipo、新股日历抓取--------->开始"); try { newShare(EStockType.IN); } finally { lock.unlock(); log.info("ipo、新股日历抓取--------->结束"); } } else { log.info("ipo、新股日历抓取--------->上次任务还未执行完成,本次任务忽略"); } } public void newShare(EStockType e) { String result = HttpClientRequest.doGet(e.stockUrl + "new-stock?country_id=" + e.getContryId() + "&key=" + e.stockKey); JSONArray jsonArray = JSONArray.parseArray(result); List<StockNewShare> list = jsonArray.stream().map(stock -> JSONUtil.toBean(stock.toString(), StockNewShare.class)).collect(Collectors.toList()); List<String> pidList = list.stream().map(StockNewShare::getPid).collect(Collectors.toList()); List<StockNewShare> shareList = stockNewShareMapper.selectList(new LambdaQueryWrapper<StockNewShare>().in(StockNewShare::getPid, pidList)); Map<String, StockNewShare> resultMap = shareList.stream() .collect(Collectors.toMap(StockNewShare::getPid, Function.identity())); list.forEach(f -> { StockNewShare share = resultMap.get(f.getPid()); if (share == null) { stockNewShareMapper.insert(f); } else { f.setId(share.getId()); stockNewShareMapper.updateById(f); } }); } } websocketSerivce/src/main/java/org/example/util/ApplicationContextRegisterUtil.java
New file @@ -0,0 +1,27 @@ package org.example.util; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; /** * @program: webSocketProject * @description: * @create: 2024-03-27 17:55 **/ @Component @Lazy(false) public class ApplicationContextRegisterUtil implements ApplicationContextAware { private static ApplicationContext APPLICATION_CONTEXT; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { APPLICATION_CONTEXT = applicationContext; } public static ApplicationContext getApplicationContext() { return APPLICATION_CONTEXT; } } websocketSerivce/src/main/java/org/example/websocket/controller/GenerateKey.java
New file @@ -0,0 +1,50 @@ package org.example.websocket.controller; import cn.hutool.core.date.DateUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import org.example.common.ServerResponse; import org.example.dao.DataServiceKeyMapper; import org.example.pojo.DataServiceKey; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.UUID; /** * @program: webSocketProject * @description: 生成key * @create: 2024-03-27 19:38 **/ @RestController @RequestMapping("/api") public class GenerateKey { @Autowired DataServiceKeyMapper dataServiceKeyMapper; @PostMapping("/creationKey") public ServerResponse sendNotification(@RequestParam("time") Date time) { String randomKey = UUID.randomUUID().toString(); try { Long count = dataServiceKeyMapper.selectCount(new LambdaQueryWrapper<DataServiceKey>().eq(DataServiceKey::getTokenKey, randomKey)); if(count > 0){ return ServerResponse.createByErrorMsg("请重新生成"); } System.out.println(randomKey); DataServiceKey dataServiceKey = new DataServiceKey(); dataServiceKey.setTokenKey(randomKey); dataServiceKey.setExpirationTime(time); dataServiceKey.setStartTime(DateUtil.date()); dataServiceKeyMapper.insert(dataServiceKey); } catch (Exception e) { e.printStackTrace(); } return ServerResponse.createBySuccessMsg(randomKey); } } websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -1,15 +1,35 @@ package org.example.websocket.server; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.io.unit.DataUnit; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.example.dao.DataServiceKeyMapper; import org.example.pojo.DataServiceKey; import org.example.util.ApplicationContextRegisterUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @ClassDescription: websocket服务端 @@ -18,12 +38,11 @@ */ @Slf4j @Component //@RestController @ServerEndpoint("/websocket-server") //@ServerEndpoint("/") public class WsServer { private Session session; /** * 记录在线连接客户端数量 */ @@ -31,41 +50,127 @@ /** * 存放每个连接进来的客户端对应的websocketServer对象,用于后面群发消息 */ private static CopyOnWriteArrayList<WsServer> wsServers = new CopyOnWriteArrayList<>(); private static ConcurrentHashMap<String, Session> webSocketMap = new ConcurrentHashMap<>(); private final Lock lock = new ReentrantLock(); /** * 关闭过期链接 */ @Scheduled(cron = "0 0/1 * * * ?") public void clearExpiration() { if (lock.tryLock()) { log.info("webSocket关闭过期链接--------->开始"); try { ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext(); DataServiceKeyMapper mapper = act.getBean(DataServiceKeyMapper.class); LambdaQueryWrapper<DataServiceKey> queryWrapper = new LambdaQueryWrapper<>(); List<DataServiceKey> keyList = mapper.selectList(queryWrapper.lt(DataServiceKey::getExpirationTime, DateUtil.date()) .or().eq(DataServiceKey::getIsAvailable, 0)); keyList.forEach(f -> { // 在锁内操作 Session sessions = webSocketMap.get(f.getTokenKey()); if (null != sessions) { try { sendInfo(sessions, 0); f.setIsAvailable(0); mapper.updateById(f); } catch (Exception e) { throw new RuntimeException(e); } } log.info("webSocket关闭过期链接---key:"+f.getTokenKey()); }); } catch (Exception e) { throw new RuntimeException(e); } finally { lock.unlock(); log.info("webSocket关闭过期链接--------->结束"); } } else { log.info("webSocket关闭过期链接--------->上次任务还未执行完成,本次任务忽略"); } } /** * 服务端与客户端连接成功时执行 * * @param session 会话 */ @OnOpen public void onOpen(Session session){ this.session = session; //接入的客户端+1 int count = onlineCount.incrementAndGet(); //集合中存入客户端对象+1 wsServers.add(this); log.info("与客户端连接成功,当前连接的客户端数量为:{}", count); public void onOpen(Session session) throws Exception { try { this.session = session; //查询该用户有没有权限 Map<String, List<String>> params = session.getRequestParameterMap(); List<String> funcTypes = params.get("key"); // 取出funcType参数的值 if (null == funcTypes) { sendInfo(session, 1); } String key = funcTypes.get(0); if (StringUtils.isEmpty(key)) { sendInfo(session, 1); } ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext(); DataServiceKeyMapper mapper = act.getBean(DataServiceKeyMapper.class); DataServiceKey dataServiceKey = mapper.selectOne(new LambdaQueryWrapper<DataServiceKey>() .eq(DataServiceKey::getTokenKey, key) .gt(DataServiceKey::getExpirationTime, DateUtil.date()) .eq(DataServiceKey::getIsAvailable, 1)); if (null != dataServiceKey) { if (webSocketMap.containsKey(key)) { webSocketMap.put(key, session); } else { //接入的客户端+1 onlineCount.incrementAndGet(); webSocketMap.put(key, session); } log.info("与客户端连接成功,当前连接的客户端数量为:{}", onlineCount); } else { sendInfo(session, 1); } } catch (Exception e) { sendInfo(session, 1); log.error("客户端连接错误:" + e.getMessage()); } } /** * 关闭连接 */ public void sendInfo(Session session, int type) throws Exception { if (session.getBasicRemote() != null) { if (type == 1) { session.getBasicRemote().sendText("订阅key不存在!"); } session.close(); } } /** * 收到客户端的消息时执行 * * @param message 消息 * @param session 会话 */ @OnMessage public void onMessage(String message, Session session){ public void onMessage(Session session, String message) { log.info("收到来自客户端的消息,客户端地址:{},消息内容:{}", session.getMessageHandlers(), message); //业务逻辑,对消息的处理 // sendMessageToAll("群发消息的内容"); } /** * 连接发生报错时执行 * @param session 会话 * * @param session 会话 * @param throwable 报错 */ @OnError public void onError(Session session, @NonNull Throwable throwable){ public void onError(Session session, @NonNull Throwable throwable) { log.error("连接发生报错"); throwable.printStackTrace(); } @@ -74,53 +179,36 @@ * 连接断开时执行 */ @OnClose public void onClose(){ public void onClose() { //接入客户端连接数-1 int count = onlineCount.decrementAndGet(); if (webSocketMap.get(this.session) != null) { onlineCount.decrementAndGet(); } //集合中的客户端对象-1 wsServers.remove(this); log.info("服务端断开连接,当前连接的客户端数量为:{}", count); webSocketMap.remove(this); log.info("服务端断开连接,当前连接的客户端数量为:{}", onlineCount.get()); } /** * 向客户端推送消息 * @param message 消息 */ public void sendMessage(String message){ this.session.getAsyncRemote().sendText(message); log.info("推送消息给客户端:{},消息内容为:{}", this.session.getMessageHandlers(), message); } // @PostMapping("/send2c") // public void sendMessage1(@RequestBody String message){ // this.session.getAsyncRemote().sendText(message); // try { // this.session.getBasicRemote().sendText(message); // } catch (IOException e) { // throw new RuntimeException(e); // } // log.info("推送消息给客户端,消息内容为:{}", message); // } /** * 群发消息 * * @param message 消息 */ public void sendMessageToAll(String message){ CopyOnWriteArrayList<WsServer> ws = wsServers; for (WsServer wsServer : ws){ wsServer.sendMessage(message); } public void sendMessageToAll(String message) { // 发送消息给所有客户端 webSocketMap.values().forEach(session -> { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("发送消息给客户端失败,客户端ID为:{}", session.getId(), e); } }); log.info("推送消息给所有客户端,消息内容为:{}", message); } @PostMapping("/send2AllC") public void sendMessageToAll1(@RequestBody String message){ CopyOnWriteArrayList<WsServer> ws = wsServers; for (WsServer wsServer : ws){ wsServer.sendMessage(message); } public void sendMessageToAll1(@RequestBody String message) { // 发送消息给所有客户端 sendMessageToAll(message); }