From 5ce072de78ae263b52dc0e2be5c7391b23c093cf Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sun, 27 Apr 2025 18:29:22 +0800
Subject: [PATCH] 1
---
src/main/java/com/nq/utils/task/stock/StockTask.java | 486 +++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 425 insertions(+), 61 deletions(-)
diff --git a/src/main/java/com/nq/utils/task/stock/StockTask.java b/src/main/java/com/nq/utils/task/stock/StockTask.java
index 9d11dee..08db5ed 100644
--- a/src/main/java/com/nq/utils/task/stock/StockTask.java
+++ b/src/main/java/com/nq/utils/task/stock/StockTask.java
@@ -1,26 +1,53 @@
package com.nq.utils.task.stock;
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.json.JSONArray;
import com.alibaba.fastjson2.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
-import com.nq.dao.*;
+import com.nq.common.ServerResponse;
+import com.nq.dao.StockMapper;
+import com.nq.dao.UserMapper;
+import com.nq.dao.UserPendingorderMapper;
+import com.nq.dao.UserPositionMapper;
+import com.nq.enums.EConfigKey;
import com.nq.enums.EStockType;
+import com.nq.enums.EUserAssets;
import com.nq.pojo.*;
-import com.nq.service.IMandatoryLiquidationService;
-import com.nq.service.IStockService;
+import com.nq.pojo.reponse.kResponse;
+import com.nq.service.*;
+import com.nq.service.impl.StockServiceImpl;
+import com.nq.service.impl.UserServiceImpl;
+import com.nq.utils.ConverterUtil;
+import com.nq.utils.KeyUtils;
import com.nq.utils.http.HttpClientRequest;
+import com.nq.utils.redis.RedisKeyConstant;
import com.nq.utils.redis.RedisKeyUtil;
-import com.nq.utils.stock.BuyAndSellUtils;
+import com.nq.utils.redis.RedisShardedPoolUtils;
+import com.nq.utils.stock.GeneratePosition;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import java.math.RoundingMode;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.text.DecimalFormat;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
@Component
@@ -30,77 +57,385 @@
@Autowired
StockMapper stockMapper;
+ @Autowired
+ IUserPositionService userPositionService;
@Autowired
+ UserPositionMapper userPositionMapper;
+
+ @Autowired
+ private UserPendingorderService userPendingorderService;
+ @Autowired
IMandatoryLiquidationService mandatoryLiquidationService;
+ @Autowired
+ IUserAssetsServices iUserAssetsServices;
+ @Autowired
+ IStockConfigServices iStockConfigServices;
+
+ @Autowired
+ IPriceServices priceServices;
+ @Autowired
+ UserMapper userMapper;
private static final Logger log = LoggerFactory.getLogger(StockTask.class);
+
+ private final AtomicBoolean syncINStockData = new AtomicBoolean(false);
+
+ private final Lock syncINStockDataLock = new ReentrantLock();
+
/**
* 同步系统所需要的股票
- * */
- @Scheduled(cron = "0 0/3 * * * ?")
+ */
+ @Scheduled(cron = "0 0/1 * * * ?")
public void syncINStockData() {
- loadAllStock(EStockType.US);
- loadAllStock(EStockType.MAS);
-// loadAllStock(EStockType.HK);
- }
-
-
-
-
- /**
- * 同步美国股票
- * */
- @Scheduled(cron = "0 0/30 * * * ?")
- public void loadStockCompanies(){
- loadAllCompanies();
- }
-
-
-
-
- /**
- * 加载公司信息
- * */
- public void loadAllCompanies(){
- List<Stock> list = stockMapper.findStockList();
- for (int i = 0; i <list.size() ; i++) {
- Stock stock = list.get(i);
- EStockType eStockType = EStockType.getEStockTypeByCode(stock.getStockType());
- String result = HttpClientRequest.doGet(eStockType.stockUrl+"companies?pid=+"+stock.getStockCode()+"+country_id="+eStockType.getContryId()+"&size=1&page=1&key="+eStockType.stockKey);
+ if (syncINStockData.get()) { // 判断任务是否在处理中
+ return;
+ }
+ if (syncINStockDataLock.tryLock()) {
try {
- JSONObject jsonObject = JSONObject.parseObject(result);
- JSONObject companiesInfo = jsonObject.getJSONArray("data").getJSONObject(0);
- RedisKeyUtil.setCacheCompanies(stock,new Gson().toJson(companiesInfo));
- }catch (Exception e){
- log.info("");
-
+ syncINStockData.set(true); // 设置处理中标识为true
+ loadAllStock(EStockType.ST);
+ } finally {
+ syncINStockDataLock.unlock();
+ syncINStockData.set(false); // 设置处理中标识为false
}
}
-
}
+
+
+ /**
+ * 挂单
+ */
+ @Scheduled(cron = "0/10 * * * * ?")
+ public void pendingOrder() {
+ ReentrantLock lock = new ReentrantLock();
+ if (lock.tryLock()) { // 尝试获取锁
+ try {
+ pending();
+ }catch (Exception e){
+ e.printStackTrace();
+ log.error("挂单定时任务报错:",e.getMessage());
+ }
+ finally {
+ lock.unlock(); // 释放锁
+ }
+ } else {
+ // 如果锁不可用,可以选择不执行或打印日志等
+ System.out.println("挂单定时任务 任务正在执行,跳过本次执行。");
+ }
+ }
+
+ public void pending(){
+ List<UserPendingorder> list = userPendingorderService.list(new LambdaQueryWrapper<>(UserPendingorder.class).eq(UserPendingorder::getPositionType, 1));
+ if(CollectionUtil.isNotEmpty(list)){
+ list.forEach(f->{
+ if(f.getStockGid().equals("ST")){
+ //获取当前价格
+ //股票类型 现价 数据源的处理
+ BigDecimal nowPrice = priceServices.getNowPrice(f.getStockCode());
+ if(f.getBuyOrderPrice().compareTo(nowPrice) <= 0){
+ stockTransferPositions(f);
+ }
+ }else{
+ String price = RedisShardedPoolUtils.get(RedisKeyConstant.getRedisKey(f.getStockName()));
+ if(f.getBuyOrderPrice().compareTo(new BigDecimal(price)) <= 0){
+ hjTransferPositions(f);
+ }
+ }
+ });
+ }
+ }
+ public void hjTransferPositions(UserPendingorder userPendingorder){
+ User user = userMapper.selectById(userPendingorder.getUserId());
+ BigDecimal siteSettingBuyFee = new BigDecimal(iStockConfigServices.queryByKey(EConfigKey.BUY_HANDLING_CHARGE.getCode()).getCValue()) ;
+ BigDecimal orderFree = siteSettingBuyFee.multiply(userPendingorder.getOrderTotalPrice());
+ UserPosition userPosition = ConverterUtil.convert(userPendingorder, UserPosition.class);
+ userPosition.setPositionType(user.getAccountType());
+ userPosition.setId(null);
+ userPendingorder.setPositionType(0);
+ userPendingorderService.updateById(userPendingorder);
+ userPositionMapper.insert(userPosition);
+ iUserAssetsServices.availablebalanceChange("USDT", user.getId(), EUserAssets.HANDLING_CHARGE, orderFree, "", "");
+ }
+
+ public void stockTransferPositions(UserPendingorder userPendingorder){
+ // 手续费率
+ BigDecimal siteSettingBuyFee = new BigDecimal(iStockConfigServices.queryByKey(EConfigKey.BUY_HANDLING_CHARGE.getCode()).getCValue()) ;
+ BigDecimal orderFree = siteSettingBuyFee.multiply(userPendingorder.getOrderTotalPrice());
+ User user = userMapper.selectById(userPendingorder.getUserId());
+ UserPosition userPosition = ConverterUtil.convert(userPendingorder, UserPosition.class);
+ userPosition.setPositionType(user.getAccountType());
+ userPosition.setId(null);
+ userPendingorder.setPositionType(0);
+ userPendingorderService.updateById(userPendingorder);
+ userPositionMapper.insert(userPosition);
+ //挂单成功扣除手续费
+ iUserAssetsServices.availablebalanceChange("ST", user.getId(), EUserAssets.HANDLING_CHARGE, orderFree, "", "");
+ }
+
+ //最新价格url
+ private static final String PRICE_URL = "https://quote.alltick.io/quote-b-api/trade-tick?token=794ea65dc03c5af582cddec476fbb0d7-c-app&query=";
+ //k线url
+ private static final String K_URL = "https://quote.alltick.io/quote-b-api/kline?token=794ea65dc03c5af582cddec476fbb0d7-c-app&query=";
+
+
+ public String getPriceData() {
+ // 创建查询数据
+ JSONObject data = new JSONObject();
+
+ // 用于存储符号的列表
+ JSONArray symbolList = new JSONArray();
+
+ // 需要查询的符号列表(按你的要求,这里符号从 ["857.HK", "UNH.US"] 开始)
+ String[] symbols = {"GOLD", "USOIL"};
+
+ // 遍历符号列表,构建 JSON 对象
+ for (String symbol : symbols) {
+ JSONObject symbolObject = new JSONObject();
+ symbolObject.put("code", symbol);
+ symbolList.put(symbolObject);
+ }
+
+ // 将 symbol_list 放入 data 对象中
+ data.put("symbol_list", symbolList);
+
+ // 创建查询请求对象
+ JSONObject query = new JSONObject();
+ query.put("trace", generateTraceCode()); // 假设 generateTraceCode() 是生成唯一追踪代码的方法
+ query.put("data", data);
+ // 返回构建的查询对象
+ return buildUrlWithQuery(query);
+ }
+
+ // 生成唯一追踪码
+ public static String generateTraceCode() {
+ // 获取当前时间戳
+ long timestamp = System.currentTimeMillis();
+
+ // 生成唯一的 UUID
+ String uuid = UUID.randomUUID().toString();
+
+ // 组合时间戳和 UUID,保证唯一性
+ return uuid + "-" + timestamp;
+ }
+
+ @Scheduled(cron = "0/2 * * * * ?") // 每6秒执行一次
+ public void sync() throws InterruptedException {
+ ReentrantLock lock = new ReentrantLock();
+ if (lock.tryLock()) { // 尝试获取锁
+ try {
+ gold();
+ Thread.sleep(2000);
+ getKDate();
+ } finally {
+ lock.unlock(); // 释放锁
+ }
+ } else {
+ // 如果锁不可用,可以选择不执行或打印日志等
+ System.out.println("任务正在执行,跳过本次执行。");
+ }
+ }
+
+ public void gold() {
+ try {
+ String url = PRICE_URL+getPriceData();
+ URL obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ int responseCode = con.getResponseCode();
+ BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ if(responseCode == 200){
+ // 创建 ObjectMapper 对象
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // 解析 JSON 字符串
+ JsonNode rootNode = objectMapper.readTree(response.toString());
+
+ // 获取 "data" 节点下的 "tick_list" 数组
+ JsonNode tickList = rootNode.path("data").path("tick_list");
+
+ // 遍历 tick_list 数组并提取 code 和 price
+ for (JsonNode tick : tickList) {
+ String code = tick.path("code").asText().equals("GOLD") ? "XAUUSD" : tick.path("code").asText();
+ String price = tick.path("price").asText();
+ RedisShardedPoolUtils.set(RedisKeyConstant.getRedisKey(code), price);
+ }
+ }else{
+ log.info("黄金原油获实时价格定时任务------没有接收到数据:"+response);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static String buildUrlWithQuery(JSONObject jsonObject) {
+ try {
+ // 将 JSON 对象转换为字符串
+ String jsonString = jsonObject.toString();
+ // 使用 URLEncoder 编码 JSON 字符串
+ String encodedJson = URLEncoder.encode(jsonString, "UTF-8");
+ return encodedJson;
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+
+ public void getKDate() throws InterruptedException {
+ // 创建Map对象
+ Map<String, Integer> map = new HashMap<>();
+
+ // 写死键值对
+ map.put("d", 8);
+ map.put("w", 9);
+ map.put("m", 10);
+ map.put("1", 1);
+ map.put("5", 2);
+ map.put("30", 4);
+
+ for (Map.Entry<String, Integer> entry : map.entrySet()) {
+ String gold = getResp(K_URL + getKQueryData(entry.getValue(), "XAUUSD"));
+ if(StringUtils.isNotEmpty(gold)){
+ if(entry.getKey().equals("d")){
+ priceLimit(gold,"XAUUSD");
+ }
+ RedisShardedPoolUtils.set("k_gold_"+entry.getKey(), gold);
+ }
+
+ Thread.sleep(2000);
+ String c = getResp(K_URL+getKQueryData(entry.getValue(),"USOIL"));
+ if(StringUtils.isNotEmpty(c)){
+ if(entry.getKey().equals("d")) {
+ priceLimit(c, "USOIL");
+ }
+ RedisShardedPoolUtils.set("k_crude_oil_"+entry.getKey(), c);
+ }
+ Thread.sleep(2000);
+ }
+ }
+
+ @lombok.Data
+ class kData {
+ long t;
+ String c;
+ String o;
+ String h;
+ String l;
+ String v;
+ String vo;
+ }
+ public void priceLimit(String data,String key){
+ List<kData> kDataList = new ArrayList<>();
+ // 使用 Gson 解析 JSON 字符串
+ Gson gson = new Gson();
+ kResponse kResponse = gson.fromJson(data, kResponse.class);
+
+ // 打印 kline_list 的内容
+ for (kResponse.KlineData item : kResponse.getData().getKlineList()) {
+ kData kData = new kData();
+ kData.setT(item.getTimestamp());
+ kData.setC(item.getClose_price());
+ kData.setO(item.getOpen_price());
+ kData.setH(item.getHigh_price());
+ kData.setL(item.getLow_price());
+ kData.setV(item.getVolume());
+ kData.setVo(item.getTurnover());
+ kDataList.add(kData);
+ }
+ double oneC = Double.valueOf(kDataList.get(kDataList.size() - 1).getC());
+ double twoC = Double.valueOf(kDataList.get(kDataList.size() - 2).getC());
+ String h = String.valueOf(((oneC - twoC) / oneC * 100));
+ BigDecimal bd = new BigDecimal(h);
+ bd = bd.setScale(2, RoundingMode.DOWN); // RoundingMode.DOWN 表示截断而非四舍五入
+ RedisShardedPoolUtils.set(key+"_H",bd.toString());
+ RedisShardedPoolUtils.set(key+"_H",bd.toString());
+ }
+
+ public String getResp(String url){
+ try {
+ URL obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ int responseCode = con.getResponseCode();
+ BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ if(responseCode == 200){
+ return response.toString();
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ log.error("k线请求错误!", e.getMessage());
+ }
+ return null;
+ }
+
+ //k线请求组装数据
+ public String getKQueryData(Integer k_data,String code){
+ JSONObject queryData = new JSONObject();
+ queryData.put("trace", generateTraceCode());
+ JSONObject data = new JSONObject();
+ data.put("code", code);//产品code
+ /**
+ * k线类型
+ * 1、1是1分钟K,2是5分钟K,3是15分钟K,4是30分钟K,5是小时K,6是2小时K(股票不支持2小时),7是4小时K(股票不支持4小时),8是日K,9是周K,10是月K (注:股票不支持2小时K、4小时K)
+ * 2、最短的k线只支持1分钟
+ * 3、查询昨日收盘价,kline_type 传8
+ */
+ data.put("kline_type", k_data);
+ /**
+ * 从指定时间往前查询K线
+ * 1、传0表示从当前最新的交易日往前查k线
+ * 2、指定时间请传时间戳,传时间戳表示从该时间戳往前查k线
+ * 3、只有外汇贵金属加密货币支持传时间戳,股票类的code不支持
+ */
+ data.put("kline_timestamp_end", 0);
+ /**
+ * 1、表示查询多少根K线,每次最大请求1000根,可根据时间戳循环往前请求
+ * 2、通过该字段可查询昨日收盘价,kline_type 传8,query_kline_num传2,返回2根k线数据中,时间戳较小的数据是昨日收盘价
+ */
+ data.put("query_kline_num", 200);
+ /*
+ 复权类型,对于股票类的code才有效,例如:0:除权,1:前复权,目前仅支持0
+ */
+ data.put("adjust_type", 0);
+ queryData.put("data", data);
+ return buildUrlWithQuery(queryData);
+ }
+
/**
* 加载所有股票数据
- * */
- public void loadAllStock(EStockType eStockType){
- log.info("同步股票 数据 {}",eStockType.getCode());
+ */
+ public void loadAllStock(EStockType eStockType) {
+ log.info("同步股票 数据 {}", eStockType.getCode());
List<DataStockBean> list = new ArrayList<>();
int totleStock = 1;
- int page =0;
+ int page = 0;
try {
- while (totleStock>list.size()){
- try{
- String result = HttpClientRequest.doGet(eStockType.stockUrl+"list?country_id="+eStockType.getContryId()+"&size=1000&page="+page+"&key="+eStockType.stockKey);
- ReponseBase reponseBase = new Gson().fromJson(result,ReponseBase.class);
+ while (totleStock > list.size()) {
+ try {
+ String result = HttpClientRequest.doGet(eStockType.stockUrl + "list?country_id=" + eStockType.getContryId() + "&size=1000&page=" + page + "&key=" + eStockType.stockKey);
+ ReponseBase reponseBase = new Gson().fromJson(result, ReponseBase.class);
list.addAll(reponseBase.getData());
page++;
totleStock = reponseBase.getTotal();
- }catch (Exception e){
+ } catch (Exception e) {
e.printStackTrace();
break;
}
@@ -112,9 +447,9 @@
stock.setStockCode(o.getId());
stock.setStockName(o.getName());
stock.setStockType(eStockType.getCode());
- if(o.getType() == null){
+ if (o.getType() == null) {
stock.setStockGid(eStockType.getCode());
- }else{
+ } else {
stock.setStockGid(o.getType());
}
stock.setStockSpell(o.getSymbol());
@@ -123,13 +458,13 @@
stock.setDataBase(0);
stock.setAddTime(new Date());
stockMapper.insert1(stock);
- }else{
+ } else {
stock.setStockCode(o.getId());
stock.setStockName(o.getName());
stock.setStockType(eStockType.getCode());
- if(o.getType() == null){
+ if (o.getType() == null) {
stock.setStockGid(eStockType.getCode());
- }else{
+ } else {
stock.setStockGid(o.getType());
}
stock.setStockSpell(o.getSymbol());
@@ -139,12 +474,41 @@
stock.setAddTime(new Date());
stockMapper.updateById(stock);
}
- RedisKeyUtil.setCaCheKeyBaseStock(eStockType,o);
+ RedisKeyUtil.setCaCheKeyBaseStock(eStockType, o);
}
- log.info("同步股票 数据 成功 {} 总共同步数据 {}",eStockType.getCode(),list.size());
+ log.info("同步股票 数据 成功 {} 总共同步数据 {}", eStockType.getCode(), list.size());
} catch (
Exception e) {
log.error("同步出错", e);
}
}
+
+ private final AtomicBoolean stockConstraint = new AtomicBoolean(false);
+
+ /**
+ * 强制平仓
+ */
+// @Scheduled(cron = "0/1 * * * * ?")
+// public void stockConstraint() {
+// if (stockConstraint.get()) { // 判断任务是否在处理中
+// return;
+// }
+// if (stockConstraintLock.tryLock()) {
+// try {
+// stockConstraint.set(true); // 设置处理中标识为true
+// List<UserPosition> userPositions = userPositionMapper.selectList(new LambdaQueryWrapper<UserPosition>().isNull(UserPosition::getSellOrderId));
+// if (CollectionUtils.isNotEmpty(userPositions)) {
+// userPositionService.stockConstraint(userPositions);
+// }
+// } catch (Exception e) {
+// e.printStackTrace();
+// log.error("强制平仓任务错误:" + e.getMessage());
+// } finally {
+// stockConstraintLock.unlock();
+// stockConstraint.set(false); // 设置处理中标识为false
+// }
+// } else {
+// log.info("强制平仓任务--------->上次任务还未执行完成,本次任务忽略");
+// }
+// }
}
--
Gitblit v1.9.3