增加 StockStrategy 服务,数据表建立及批量更新方法重写。解决多数据源配置 mapper-locations

无法识别并读取到对应数据源的 mapper.xml 及其 statement 方法的问题
This commit is contained in:
2026-01-10 14:26:04 +08:00
parent 52f4dd5e83
commit b0093ccccb
27 changed files with 653 additions and 280 deletions

View File

@@ -14,11 +14,25 @@ import java.lang.annotation.Target;
* <p>
* 例:
* {@code StrategyAndPoolService#updateByQueryResponse(JsonNode)}
* @see quant.rich.emoney.service.RequestResponseInspectService
*/
@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface ResponseDecodeExtension {
/**
* 指定的 protocolId
* @return
*/
String protocolId();
/**
* inspect 的排序
* @return
*/
int order() default -1;
/**
* 是否在模拟客户端请求和返回时启用, 默认 true
* @return
*/
boolean client() default true;
}

View File

@@ -22,6 +22,7 @@ import quant.rich.emoney.exception.EmoneyDecodeException;
import quant.rich.emoney.exception.EmoneyIllegalRequestParamException;
import quant.rich.emoney.exception.EmoneyRequestException;
import quant.rich.emoney.exception.EmoneyResponseException;
import quant.rich.emoney.service.RequestResponseInspectService;
import quant.rich.emoney.service.sqlite.RequestInfoService;
import quant.rich.emoney.util.EncryptUtils;
import quant.rich.emoney.util.SpringContextHolder;
@@ -63,9 +64,11 @@ public class EmoneyClient implements Cloneable {
private static final String RELOGIN_X_PROTOCOL_ID = "user%2Fauth%2FReLogin";
private static volatile RequestInfoService requestInfoService;
private static volatile RequestResponseInspectService requestResponseInspectService;
/**
* 根据 protocolId 返回 URL
* <p>益盟操盘手对于不同的 protocolId 有不同的 URL 负责。在进行某类请求之前,请先通过调试 APP 进行确认,否则可能无法获取到相应内容
* @param protocolId
* @return
*/
@@ -87,7 +90,9 @@ public class EmoneyClient implements Cloneable {
}
/**
* 从 Spring 上下文中获取载入的请求配置
* 从 Spring 上下文中获取由 RequestInfoService 管理的默认请求配置
* @see RequestInfo
* @see RequestInfoService
* @return
*/
private static RequestInfo getDefaultRequestInfo() {
@@ -95,14 +100,32 @@ public class EmoneyClient implements Cloneable {
synchronized (EmoneyClient.class) {
requestInfoService = SpringContextHolder.getBean(RequestInfoService.class);
}
}
if (requestInfoService == null) {
log.warn("获取 RequestInfoService 实例失败");
return null;
}
}
return requestInfoService.getDefaultRequestInfo();
}
/**
* 从 Spring 上下文中获取 RequestResponseInspectService
* @see RequestResponseInspectService
* @return
*/
private static RequestResponseInspectService getRequestResponseInspectService() {
if (requestResponseInspectService == null) {
synchronized (EmoneyClient.class) {
requestResponseInspectService = SpringContextHolder.getBean(RequestResponseInspectService.class);
}
if (requestResponseInspectService == null) {
log.warn("获取 RequestResponseInspectService 实例失败");
return null;
}
}
return requestResponseInspectService;
}
private EmoneyClient() {}
/**
@@ -293,7 +316,7 @@ public class EmoneyClient implements Cloneable {
* @param nanoRequest
* @return
*/
public static <T extends MessageNano> BaseResponse.Base_Response post(
protected static <T extends MessageNano> BaseResponse.Base_Response post(
T nanoRequest,
Serializable xProtocolId,
Serializable xRequestId) {
@@ -373,6 +396,12 @@ public class EmoneyClient implements Cloneable {
U nanoResponse = (U) MessageNano.mergeFrom(
(MessageNano) clazz.getDeclaredConstructor().newInstance(), baseResponse.detail.getValue());
log.debug("执行 emoney 请求成功");
try {
getRequestResponseInspectService().inspectResponse(xProtocolId.toString(), nanoResponse, true);
}
catch (Exception e) {
log.warn("执行 Inspect response 时发生错误, protocolId={}", xProtocolId, e);
}
return nanoResponse;
} catch (Exception e) {
throw new EmoneyDecodeException("试图将返回数据解析成 " + clazz.getSimpleName() + " 时失败", e);

View File

@@ -1,13 +1,19 @@
package quant.rich.emoney.config;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import com.baomidou.mybatisplus.annotation.DbType;
@@ -22,12 +28,27 @@ public class PostgreMybatisConfig {
public static final String POSTGRE_TRANSACTION_MANAGER = "postgreTransactionManager";
@Value("${mybatis-plus.mapper-locations}")
private String[] mapperLocations;
@Bean("postgreSqlSessionFactory")
public SqlSessionFactory postgreSqlSessionFactory(
@Qualifier("postgreDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSource);
List<Resource> resources = new ArrayList<>();
for (String mapperLocation : mapperLocations) {
for (Resource resource : new PathMatchingResourcePatternResolver()
.getResources(mapperLocation)) {
resources.add(resource);
}
}
factory.setMapperLocations(
resources.toArray(new Resource[0])
);
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.POSTGRE_SQL));
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());

View File

@@ -4,15 +4,21 @@ import java.io.File;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import com.baomidou.mybatisplus.annotation.DbType;
@@ -34,6 +40,10 @@ public class SqliteMybatisConfig {
public static final String SQLITE_TRANSACTION_MANAGER = "sqliteTransactionManager";
@Value("${mybatis-plus.mapper-locations}")
private String[] mapperLocations;
/**
* 配置数据库连接
* <ul>
@@ -107,6 +117,17 @@ public class SqliteMybatisConfig {
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSource);
List<Resource> resources = new ArrayList<>();
for (String mapperLocation : mapperLocations) {
for (Resource resource : new PathMatchingResourcePatternResolver().getResources(mapperLocation)) {
resources.add(resource);
}
}
factory.setMapperLocations(
resources.toArray(new Resource[0])
);
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.SQLITE));
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());

View File

@@ -1,18 +1,27 @@
package quant.rich.emoney.controller.api;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.reflections.Reflections;
import lombok.extern.slf4j.Slf4j;
import quant.rich.emoney.interfaces.IQueryableEnum;
import quant.rich.emoney.service.postgre.StockStrategyService;
@RestController
@RequestMapping("/api/v1/common")
@@ -22,6 +31,20 @@ public class CommonAbilityControllerV1 {
@Autowired
Reflections reflections;
@Autowired
ObjectMapper objectMapper;
@Autowired
StockStrategyService stockStrategyService;
@GetMapping("/test")
void test() throws IOException, InterruptedException, ExecutionException {
String str = Files.readString(Path.of("C:\\Users\\Administrator\\Desktop\\stock_strategy.json"));
JsonNode node = objectMapper.readTree(str);
Future<Boolean> future = stockStrategyService.updateByQueryResponseAsync(node);
future.get();
}
@GetMapping("/getIQueryableEnum")
public Map<String, String> getIQueryableEnum(String enumName) {
Set<Class<? extends IQueryableEnum>> enums = reflections.getSubTypesOf(IQueryableEnum.class);

View File

@@ -1,16 +1,7 @@
package quant.rich.emoney.controller.api;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.NonNull;
import org.springframework.web.bind.annotation.PostMapping;
@@ -22,23 +13,15 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.nano.MessageNano;
import jakarta.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.reflections.Reflections;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nano.BaseResponse.Base_Response;
import quant.rich.emoney.annotation.ResponseDecodeExtension;
import quant.rich.emoney.entity.sqlite.ProtocolMatch;
import quant.rich.emoney.exception.RException;
import quant.rich.emoney.pojo.dto.EmoneyConvertResult;
import quant.rich.emoney.pojo.dto.EmoneyProtobufBody;
import quant.rich.emoney.service.RequestResponseInspectService;
import quant.rich.emoney.service.sqlite.ProtocolMatchService;
import quant.rich.emoney.util.SpringBeanDetector;
import quant.rich.emoney.util.SpringContextHolder;
/**
* 益盟 ProtocolBuf 报文解析 API 控制器
@@ -52,66 +35,10 @@ public class ProtoDecodeControllerV1 {
ProtocolMatchService protocolMatchService;
@Autowired
Reflections reflections;
RequestResponseInspectService requestResponseInspectService;
Map<String, List<MethodInfo>> responseDecodeExtensions = new HashMap<String, List<MethodInfo>>();
@Data
@RequiredArgsConstructor
private static class MethodInfo {
final Method method;
final Class<?> declaringClass;
final Integer order;
Object instance;
}
@PostConstruct
void postConstruct() {
// Reflections 扫描所有注解并根据 protocolId 和 order 排序
Set<Method> methods = reflections.getMethodsAnnotatedWith(ResponseDecodeExtension.class);
for (Method m : methods) {
MethodInfo info;
ResponseDecodeExtension ex = m.getAnnotation(ResponseDecodeExtension.class);
String protocolId = ex.protocolId();
Integer order = ex.order();
// 判断 method 是否为单参数接受 JsonNode 的方法
Class<?>[] parameterTypes = m.getParameterTypes();
Class<?> declaringClass = m.getDeclaringClass();
if (parameterTypes.length != 1 || parameterTypes[0] != JsonNode.class) {
log.warn("方法 {}#{} 不为类型为 JsonNode 的单参数方法,暂不支持作为解码额外选项",
declaringClass.getSimpleName(), m.getName(), declaringClass.getSimpleName());
continue;
}
// 判断 method 是否是静态
if (Modifier.isStatic(m.getModifiers())) {
info = new MethodInfo(m, null, order);
}
else {
if (!SpringBeanDetector.isSpringManagedClass(declaringClass)) {
log.warn("方法 {} 所属类 {} 不归属于 Spring 管理,目前暂不支持作为解码额外选项",
m.getName(), declaringClass.getSimpleName());
continue;
}
info = new MethodInfo(m, declaringClass, order);
}
List<MethodInfo> list = responseDecodeExtensions.get(protocolId);
if (list == null) {
list = new ArrayList<>();
list.add(info);
responseDecodeExtensions.put(protocolId, list);
}
else {
list.add(info);
}
}
for (List<MethodInfo> list : responseDecodeExtensions.values()) {
list.sort(Comparator.comparingInt(info -> info.getOrder()));
}
log.debug("ResponseDecodeExtension: 共载入 {} 个 ProtocolID 的 {} 个方法",
responseDecodeExtensions.keySet().size(), responseDecodeExtensions.values().size());
}
@Autowired
ObjectMapper objectMapper;
/**
* 解析 emoney protobuf 的请求
@@ -174,7 +101,7 @@ public class ProtoDecodeControllerV1 {
U nano = (U)MessageNano.mergeFrom((MessageNano)
clazz.getDeclaredConstructor().newInstance(), buf);
return EmoneyConvertResult
.ok(new ObjectMapper().valueToTree(nano))
.ok(objectMapper.valueToTree(nano))
.setProtocolId(protocolId)
.setSupposedClassName(className);
}
@@ -269,27 +196,13 @@ public class ProtoDecodeControllerV1 {
(MessageNano)clazz.getDeclaredConstructor().newInstance(),
baseResponse.detail.getValue());
JsonNode jo = new ObjectMapper().valueToTree(nano);
JsonNode jo = objectMapper.valueToTree(nano);
// 查找 ResponseDecodeExtension
List<MethodInfo> methodInfos = responseDecodeExtensions.get(protocolId.toString());
if (methodInfos != null) {
for (MethodInfo methodInfo : methodInfos) {
if (methodInfo.getInstance() != null) {
// instance 不为 null 则说明是已经取到的 spring bean, 直接调用
methodInfo.getMethod().invoke(methodInfo.getInstance(), jo);
}
else if (methodInfo.getDeclaringClass() != null) {
// 获取 spring 管理的实例类
Object instance = SpringContextHolder.getBean(methodInfo.getDeclaringClass());
methodInfo.getMethod().invoke(instance, jo);
methodInfo.setInstance(instance);
}
else {
// 静态方法直接 invoke
methodInfo.getMethod().invoke(null, jo);
}
try {
requestResponseInspectService.inspectResponse(protocolId.toString(), nano, false);
}
catch (Exception e) {
log.warn("执行 Inspect response 时发生错误, protocolId={}", protocolId, e);
}
return EmoneyConvertResult

View File

@@ -2,6 +2,8 @@ package quant.rich.emoney.entity.postgre;
import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -10,47 +12,21 @@ import lombok.experimental.Accessors;
*/
@Data
@Accessors(chain=true)
@TableName("stock_strategy")
public class StockStrategy {
private String tsCode;
private Integer goodsId ;
private LocalDateTime date;
private String strategyName;
private Integer strategyId;
private Integer poolId;
private String poolName;
private Integer strategyId;
private String strategyName;
private String type;
private Integer poolId;
public StockStrategy setTsCodeFromGoodsId(Integer goodsId) {
// 自动将益盟 goodsId 转换成 tsCode
// 1301325 -> 301325.SZ
// 600325 -> 600325.SH
// 1920009 -> 920009.BJ
String goodsIdStr = goodsId.toString();
RuntimeException e = new RuntimeException("无法将 goodsId " + goodsIdStr + " 转换为 tsCode");
if (goodsIdStr.length() == 6) {
// SH
return setTsCode(goodsIdStr + ".SH");
}
else if (goodsIdStr.length() == 7) {
if (goodsIdStr.charAt(0) != '1') {
throw e;
}
if (goodsIdStr.charAt(1) == '9') {
// BJ
return setTsCode(goodsIdStr.substring(1) + ".BJ");
}
// SZ
return setTsCode(goodsIdStr.substring(1) + ".SZ");
}
throw e;
}
}

View File

@@ -20,6 +20,15 @@ import lombok.experimental.Accessors;
import quant.rich.emoney.mybatis.typehandler.CommaListTypeHandler;
import quant.rich.emoney.mybatis.typehandler.JsonStringTypeHandler;
/**
* 目前能想到的计划任务有三种
* <ul>
* <li>指标的抓取
* <li>个股策略(/strategy, id=9400)的抓取
* <li>选股策略的抓取
* </ul>
* 未来可能存在更多的计划任务, 所以需要考虑如何设计才能更好地兼容后续添加的任务类型
*/
@Data
@Accessors(chain = true)
@TableName(value = "plan", autoResultMap = true)

View File

@@ -451,6 +451,11 @@ public class RequestInfo extends Model<RequestInfo> {
return node;
}
/**
* 当前请求配置是否是匿名配置
* <p>当用户名和密码任一为 blank 时视为匿名
* @return
*/
public boolean isAnonymous() {
return StringUtils.isAnyBlank(getUsername(), getPassword());
}

View File

@@ -1,12 +1,13 @@
package quant.rich.emoney.mapper.postgre;
import java.util.Collection;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import quant.rich.emoney.entity.postgre.EmoneyIndex;
import quant.rich.emoney.entity.postgre.StockStrategy;
@Component
@@ -14,4 +15,8 @@ import quant.rich.emoney.entity.postgre.StockStrategy;
@DS("postgre")
public interface StockStrategyMapper extends BaseMapper<StockStrategy> {
int insertOrUpdateBatch(
@Param("list") Collection<StockStrategy> list,
@Param("batchSize") int batchSize
);
}

View File

@@ -0,0 +1,158 @@
package quant.rich.emoney.service;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.reflections.Reflections;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.nano.MessageNano;
import jakarta.annotation.PostConstruct;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import quant.rich.emoney.annotation.ResponseDecodeExtension;
import quant.rich.emoney.util.SpringBeanDetector;
import quant.rich.emoney.util.SpringContextHolder;
/**
* 负责对请求和返回进行拦截和进一步处理
* @see ResponseDecodeExtension
* @see quant.rich.emoney.client.EmoneyClient
* @see quant.rich.emoney.controller.api.ProtoDecodeController
*/
@Service
@Slf4j
public class RequestResponseInspectService {
@Autowired
Reflections reflections;
@Autowired
ObjectMapper objectMapper;
Map<String, List<MethodInfo>> responseDecodeExtensions = new HashMap<String, List<MethodInfo>>();
@Data
@RequiredArgsConstructor
private static class MethodInfo {
/**
* 对应的 Inspect 方法
*/
final Method method;
/**
* 声明了 Inspect 方法的类
*/
final Class<?> declaringClass;
/**
* Inspect 顺序
*/
final Integer order;
/**
* 声明了 Inspect 方法的类的实例。当方法是成员方法且实例受 Spring 管理时,通过实例调用
*/
Object instance;
/**
* 是否在模拟客户端中启用。该参数从注解处填充
*/
boolean isClient = true;
}
@PostConstruct
void postConstruct() {
// Reflections 扫描所有注解并根据 protocolId 和 order 排序
Set<Method> methods = reflections.getMethodsAnnotatedWith(ResponseDecodeExtension.class);
for (Method m : methods) {
MethodInfo info;
ResponseDecodeExtension ex = m.getAnnotation(ResponseDecodeExtension.class);
String protocolId = ex.protocolId();
Integer order = ex.order();
// 判断 method 是否为单参数接受 JsonNode 的方法
Class<?>[] parameterTypes = m.getParameterTypes();
Class<?> declaringClass = m.getDeclaringClass();
if (parameterTypes.length != 1 || parameterTypes[0] != JsonNode.class) {
log.warn("方法 {}#{} 不为类型为 JsonNode 的单参数方法,暂不支持作为解码额外选项",
declaringClass.getSimpleName(), m.getName(), declaringClass.getSimpleName());
continue;
}
// 判断 method 是否是静态
if (Modifier.isStatic(m.getModifiers())) {
info = new MethodInfo(m, null, order);
}
else {
if (!SpringBeanDetector.isSpringManagedClass(declaringClass)) {
log.warn("方法 {} 所属类 {} 不归属于 Spring 管理,目前暂不支持作为解码额外选项",
m.getName(), declaringClass.getSimpleName());
continue;
}
info = new MethodInfo(m, declaringClass, order);
}
info.setClient(ex.client());
List<MethodInfo> list = responseDecodeExtensions.get(protocolId);
if (list == null) {
list = new ArrayList<>();
list.add(info);
responseDecodeExtensions.put(protocolId, list);
}
else {
list.add(info);
}
}
for (List<MethodInfo> list : responseDecodeExtensions.values()) {
list.sort(Comparator.comparingInt(info -> info.getOrder()));
}
log.debug("RequestResponseInspectService: 共载入 {} 个 ProtocolID 的 {} 个方法",
responseDecodeExtensions.keySet().size(), responseDecodeExtensions.values().size());
}
/**
* 对 Response 进行 inspect
* @param protocolId
* @param nano
* @param fromClient 该 inspect 来源是否是 client.
* @return
* @throws InvocationTargetException
* @throws IllegalAccessException
*/
public <U extends MessageNano> void inspectResponse(String protocolId, U nano, boolean fromClient) throws IllegalAccessException, InvocationTargetException {
// 查找 ResponseDecodeExtension
List<MethodInfo> methodInfos = responseDecodeExtensions.get(protocolId.toString());
JsonNode jo = objectMapper.valueToTree(nano);
if (methodInfos != null) {
for (MethodInfo methodInfo : methodInfos) {
if (fromClient && !methodInfo.isClient()) {
// 来自客户端请求但该方法未开启客户端 inspect, 忽略
continue;
}
if (methodInfo.getInstance() != null) {
// instance 不为 null 则说明是已经取到的 spring bean, 直接调用
methodInfo.getMethod().invoke(methodInfo.getInstance(), jo);
}
else if (methodInfo.getDeclaringClass() != null) {
// 获取 spring 管理的实例类
Object instance = SpringContextHolder.getBean(methodInfo.getDeclaringClass());
methodInfo.setInstance(instance);
methodInfo.getMethod().invoke(instance, jo);
}
else {
// 静态方法直接 invoke
methodInfo.getMethod().invoke(null, jo);
}
}
}
}
}

View File

@@ -1,15 +1,106 @@
package quant.rich.emoney.service.postgre;
import org.springframework.stereotype.Service;
import com.baomidou.dynamic.datasource.annotation.DS;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import lombok.extern.slf4j.Slf4j;
import quant.rich.emoney.annotation.ResponseDecodeExtension;
import quant.rich.emoney.config.PostgreMybatisConfig;
import quant.rich.emoney.entity.postgre.StockStrategy;
import quant.rich.emoney.mapper.postgre.StockStrategyMapper;
import quant.rich.emoney.util.DateUtils;
@DS("postgre")
@Service
@Slf4j
public class StockStrategyService extends PostgreServiceImpl<StockStrategyMapper, StockStrategy> {
/**
* 从请求响应的 JsonNode 转换出个股日策略标记信息列表
* @param jsonNode
* @return
*/
public List<StockStrategy> parseJsonToList(JsonNode jsonNode) {
if (jsonNode == null) {
log.warn("试图获取 StockStrategy list, 但提供的响应 json 为空");
return List.of();
}
JsonNode goodsIdNode = jsonNode.at("/data/input/goodsId");
if (goodsIdNode.isMissingNode() || !goodsIdNode.isInt()) {
log.warn("试图获取 StockStrategy list, 但 goodsId node 缺失或不为 integer");
return List.of();
}
Integer goodsId = goodsIdNode.asInt();
log.debug("StockStrategy json with goodsId: {}", goodsId);
JsonNode output = jsonNode.at("/data/output");
if (output.getNodeType() != JsonNodeType.ARRAY) {
log.warn("试图获取 StockStrategy list, 但提供的响应 json.data.output 为 null 或不为 ARRAY");
return List.of();
}
List<StockStrategy> stockStrategies = new ArrayList<>();
String[] types = new String[] {"band", "tech", "val"}; // 波段/技术/基本面(价值)
for (JsonNode node : output) {
JsonNode dateNode = node.get("date");
if (dateNode == null) {
log.warn("试图获取 StockStrategy node, 但缺失日期 date 节点");
continue;
}
LocalDateTime date = DateUtils.parse(String.valueOf(dateNode.asInt()), "yyyyMMdd").atStartOfDay();
for (String type : types) {
JsonNode strategies = node.get(type);
if (strategies != null && strategies.getNodeType() == JsonNodeType.ARRAY) {
for (JsonNode simpleStrategy : strategies) {
StockStrategy stockStrategy = new StockStrategy();
stockStrategy.setGoodsId(goodsId);
stockStrategy.setDate(date);
stockStrategy.setStrategyName(simpleStrategy.get("strategyName").asText());
stockStrategy.setStrategyId(simpleStrategy.get("strategyId").asInt());
stockStrategy.setPoolName(simpleStrategy.get("poolName").asText());
stockStrategy.setPoolId(simpleStrategy.get("poolId").asInt());
stockStrategy.setType(type);
stockStrategies.add(stockStrategy);
}
}
}
}
log.debug("共载入 {} 条 StockStrategy 数据", stockStrategies.size());
return stockStrategies;
}
/**
* 从 jsonNode 节点直接更新到数据库内
* @param jsonNode
* @return
*/
@ResponseDecodeExtension(protocolId="9400")
@Async
public Future<Boolean> updateByQueryResponseAsync(JsonNode jsonNode) {
List<StockStrategy> stockStrategies = parseJsonToList(jsonNode);
return CompletableFuture.completedFuture(saveOrUpdateBatch(stockStrategies));
}
@Override
@Transactional(transactionManager = PostgreMybatisConfig.POSTGRE_TRANSACTION_MANAGER, rollbackFor = Exception.class)
public boolean saveOrUpdateBatch(Collection<StockStrategy> entityList, int batchSize) {
return SqlHelper.retBool(baseMapper.insertOrUpdateBatch(entityList, batchSize));
}
}

View File

@@ -4,6 +4,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.baomidou.dynamic.datasource.annotation.DS;
@@ -24,7 +26,8 @@ import quant.rich.emoney.mapper.sqlite.StrategyAndPoolMapper;
@Slf4j
public class StrategyAndPoolService extends SqliteServiceImpl<StrategyAndPoolMapper, StrategyAndPool> {
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Autowired
ObjectMapper objectMapper;
/**
* 从数据库中导入所有已记录的策略和池,形成可供益盟 Fiddler 插件使用的 prepared_strategies.json
@@ -35,7 +38,7 @@ public class StrategyAndPoolService extends SqliteServiceImpl<StrategyAndPoolMap
Collections.sort(all);
Integer currentStrategyId = null;
ArrayNode preparedStrategies = OBJECT_MAPPER.createArrayNode();
ArrayNode preparedStrategies = objectMapper.createArrayNode();
ObjectNode current = null;
for (StrategyAndPool strategyAndPool : all) {
@@ -47,7 +50,7 @@ public class StrategyAndPoolService extends SqliteServiceImpl<StrategyAndPoolMap
currentStrategyId = strategyAndPool.getStrategyId();
// 意味着要新增一个 current 来容纳同 strategyId 的所有 pool
current = OBJECT_MAPPER.createObjectNode();
current = objectMapper.createObjectNode();
current.put("code", "strategy_" + strategyAndPool.getStrategyId());
current.put("name", strategyAndPool.getStrategyName() + "策略");
current.put("type", 0);
@@ -58,7 +61,7 @@ public class StrategyAndPoolService extends SqliteServiceImpl<StrategyAndPoolMap
current.put("endDate", 0);
current.putArray("fields");
}
ObjectNode pool = OBJECT_MAPPER.createObjectNode();
ObjectNode pool = objectMapper.createObjectNode();
pool.put("code", "pool_" + strategyAndPool.getPoolId());
pool.put("name", strategyAndPool.getPoolName());
pool.put("isLocked", false);
@@ -78,8 +81,8 @@ public class StrategyAndPoolService extends SqliteServiceImpl<StrategyAndPoolMap
*/
@ResponseDecodeExtension(protocolId="9400")
@Async
public void updateByQueryResponse(JsonNode jsonNode) {
// jsonNode.output[].band[]/.tech[]
public void updateByQueryResponseAsync(JsonNode jsonNode) {
// jsonNode.output[].band[]/.tech[]/.val[]
if (jsonNode == null) {
log.warn("试图更新 StrategyAndPool, 但提供的响应 json 为空");
@@ -94,7 +97,7 @@ public class StrategyAndPoolService extends SqliteServiceImpl<StrategyAndPoolMap
Set<StrategyAndPool> set = new HashSet<>();
String[] types = new String[] {"band", "tech", "val"}; // 波段/技术/基本面
String[] types = new String[] {"band", "tech", "val"}; // 波段/技术/基本面(价值)
for (JsonNode node : output) {
for (String type : types) {
JsonNode strategies = node.get(type);

View File

@@ -0,0 +1,58 @@
package quant.rich.emoney.util;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final class DateUtils {
// 缓存 formatter行为接近 fastjson2
private static final Map<String, DateTimeFormatter> FORMATTER_CACHE =
new ConcurrentHashMap<>();
private DateUtils() {
}
/**
* 格式化 LocalDateTime
*
* @param dateTime LocalDateTime
* @param pattern 日期格式,如 yyyy-MM-dd HH:mm:ss
* @return 格式化后的字符串
*/
public static String format(LocalDateTime dateTime, String pattern) {
if (dateTime == null) {
return null;
}
if (pattern == null || pattern.isEmpty()) {
throw new IllegalArgumentException("pattern must not be null or empty");
}
DateTimeFormatter formatter =
FORMATTER_CACHE.computeIfAbsent(pattern, DateTimeFormatter::ofPattern);
return dateTime.format(formatter);
}
/**
* 将给定字符串转换成 LocalDate
* @param value 如 "20260101"
* @param pattern 日期格式,如 yyyyMMdd
* @return
*/
public static LocalDate parse(String value, String pattern) {
if (value == null) {
return null;
}
if (pattern == null || pattern.isEmpty()) {
throw new IllegalArgumentException("pattern must not be null or empty");
}
DateTimeFormatter formatter =
FORMATTER_CACHE.computeIfAbsent(pattern, DateTimeFormatter::ofPattern);
LocalDate date = LocalDate.parse(value, formatter);
return date;
}
}

View File

@@ -0,0 +1,90 @@
package quant.rich.emoney.util;
public class StockCodeUtils {
/**
* 从 goodsId 转换为 tsCode
* @param goodsId
* @return
*/
public static String goodsIdToTsCode(Integer goodsId) {
if (goodsId == null || goodsId < 0) {
throw new IllegalArgumentException("goodsId 不能为空或为负数");
}
final String s = goodsId.toString();
final int len = s.length();
switch (len) {
case 6:
// 6 位默认 SH
return s + ".SH";
case 7:
// 7 位默认 SZ 或 BJ
if (!s.startsWith("1")) {
throw new IllegalArgumentException("无法将 goodsId " + s + " 转换为 tsCode");
}
char second = s.charAt(1);
String core = s.substring(1);
if (second == '9') {
return core + ".BJ";
}
return core + ".SZ";
default:
throw new IllegalArgumentException("无法将 goodsId " + s + " 转换为 tsCode");
}
}
/**
* 从 tsCode 转换成 goodsId
* @param tsCode
* @return
*/
public static Integer tsCodeToGoodsId(String tsCode) {
if (tsCode == null || tsCode.isBlank()) {
throw new IllegalArgumentException("tsCode 不能为空");
}
String s = tsCode.trim().toUpperCase();
// 期望格式XXX.SZ / XXX.SH / XXX.BJ
int dot = s.indexOf('.');
if (dot <= 0 || dot == s.length() - 1) {
throw new IllegalArgumentException("tsCode 格式无效: " + tsCode);
}
String code = s.substring(0, dot);
String market = s.substring(dot + 1);
if (!code.matches("\\d+")) {
throw new IllegalArgumentException("tsCode 数字部分无效: " + tsCode);
}
switch (market) {
case "SH":
if (code.length() == 6) {
return Integer.valueOf(code);
}
break;
case "SZ":
if (code.length() == 6) {
return Integer.valueOf("1" + code);
}
break;
case "BJ":
if (code.length() == 6) {
return Integer.valueOf("1" + code);
}
break;
}
throw new IllegalArgumentException("无法将 tsCode " + tsCode + " 转换为 goodsId");
}
}

View File

@@ -1,6 +1,6 @@
spring:
datasource:
url: jdbc:postgresql://localhost:5432/verich
url: jdbc:postgresql://localhost:5432/emoney
username: postgres
password: 123456
driver-class-name: org.postgresql.Driver

View File

@@ -2,7 +2,7 @@ spring:
cache.type: simple
datasource:
postgre:
jdbc-url: jdbc:postgresql://localhost:5432/verich
jdbc-url: jdbc:postgresql://localhost:5432/emoney
username: postgres
password: 123456
driver-class-name: org.postgresql.Driver

View File

@@ -40,9 +40,7 @@ mybatis-plus-join.banner: false
mybatis-plus:
global-config:
banner: false
mapper-locations:
- classpath*:mapper/postgre/*.xml
- classpath*:mapper/sqlite/*.xml
mapper-locations: classpath*:mapper/postgre/*.xml,classpath*:mapper/sqlite/*.xml
type-aliases-package:
- quant.rich.emoney.entity.postgre
- quant.rich.emoney.entity.sqlite

Binary file not shown.

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"https://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace="quant.rich.emoney.mapper.postgre.StockStrategyMapper">
<insert id="insertOrUpdateBatch">
<bind name="total" value="0"/>
<foreach collection="list" item="item" index="idx" separator=",">
<if test="idx % batchSize == 0">
INSERT INTO stock_strategy
(goods_id, date, pool_id, pool_name, strategy_id, strategy_name, type)
VALUES
</if>
(#{item.goodsId},
#{item.date},
#{item.poolId},
#{item.poolName},
#{item.strategyId},
#{item.strategyName},
#{item.type})
<if test="(idx + 1) % batchSize == 0 or (idx + 1) == list.size()">
ON CONFLICT (goods_id, date, pool_id)
DO UPDATE SET
pool_name = EXCLUDED.pool_name,
strategy_id = EXCLUDED.strategy_id,
strategy_name = EXCLUDED.strategy_name,
type = EXCLUDED.type
</if>
</foreach>
</insert>
</mapper>

View File

@@ -2,7 +2,6 @@ package quant.rich;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -14,7 +13,6 @@ import lombok.extern.slf4j.Slf4j;
import nano.CandleStickNewWithIndexExResponse.CandleStickNewWithIndexEx_Response;
import nano.CandleStickRequest.CandleStick_Request;
import nano.CandleStickWithIndexRequest.CandleStickWithIndex_Request;
import nano.CandleStickWithIndexRequest.CandleStickWithIndex_Request.IndexInfo;
import quant.rich.emoney.EmoneyAutoApplication;
import quant.rich.emoney.client.EmoneyClient;

View File

@@ -14,7 +14,9 @@ import nano.StrategyMarkRequest.StrategyMark_Request;
import nano.StrategyMarkResponse.StrategyMark_Response;
import quant.rich.emoney.client.EmoneyClient;
/**
* 测试获取策略标记数据。可以用这个做一个 MVP
*/
@SpringBootTest
@ContextConfiguration(classes = EmoneyAutoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)

View File

@@ -0,0 +1,44 @@
package quant.rich.emoney;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import quant.rich.emoney.service.postgre.StockStrategyService;
/**
* 测试从本地读取包含 stock strategy 的 json, 转换为列表并更新到数据库中
*/
@SpringBootTest
@ContextConfiguration(classes = EmoneyAutoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@Slf4j
public class StockStrategyJsonParseAndUpdateLocalTest {
@Autowired
ObjectMapper objectMapper;
@Autowired
StockStrategyService stockStrategyService;
@Test
void test() throws IOException, InterruptedException, ExecutionException {
String str = Files.readString(Path.of("C:\\Users\\Administrator\\Desktop\\stock_strategy.json"));
JsonNode node = objectMapper.readTree(str);
Future<Boolean> future = stockStrategyService.updateByQueryResponseAsync(node);
future.get();
}
}

View File

@@ -13,6 +13,10 @@ import lombok.extern.slf4j.Slf4j;
import quant.rich.emoney.tushare.StockInfo;
import quant.rich.emoney.tushare.TushareDataServiceClient;
/**
* 测试通过 Feign 从 tushare-data-service 获取数据
* @see TushareDataServiceClient
*/
@SpringBootTest
@ContextConfiguration(classes = EmoneyAutoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)

View File

@@ -1,20 +0,0 @@
spring:
datasource:
url: jdbc:postgresql://localhost:5432/verich
username: postgres
password: 123456
driver-class-name: org.postgresql.Driver
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 5
connection-test-query: SELECT 1
maximum-pool-size: 2000
auto-commit: true
idle-timeout: 30000
pool-name: SpringBootDemoHikariCP
max-lifetime: 60000
connection-timeout: 30000
sql:
init:
mode: always
continue-on-error: true

View File

@@ -1,29 +0,0 @@
spring:
cache.type: simple
datasource:
postgre:
jdbc-url: jdbc:postgresql://localhost:5432/verich
username: postgres
password: 123456
driver-class-name: org.postgresql.Driver
type: com.zaxxer.hikari.HikariDataSource
mapper-locations: classpath*:mapper/postgre/*.xml
type-aliases-package: quant.rich.emoney.entity.postgre
hikari:
minimum-idle: 5
connection-test-query: SELECT 1
maximum-pool-size: 2000
auto-commit: true
idle-timeout: 30000
pool-name: SpringBootDemoHikariCP
max-lifetime: 60000
connection-timeout: 30000
sqlite:
jdbc-url: jdbc:sqlite:E:/eclipse-workspace/emo-grab/src/main/resources/database.db
driver-class-name: org.sqlite.JDBC
mapper-locations: classpath*:mapper/sqlite/*.xml
type-aliases-package: quant.rich.emoney.entity.sqlite
sql:
init:
mode: always
continue-on-error: true

View File

@@ -1,75 +0,0 @@
server:
port: 7790
compression: #开启gzip压缩返回内容大于2k的才会进行压缩
enabled: true
mime-types: application/javascript,text/css,application/json,application/xml,text/html,text/xml,text/plain
min-response-size: 2048
logging.level:
'[quant.rich]': debug
'[org.springframework.security]': info
'[org.springframework.boot.devtools.restart]': debug
spring:
cache:
type: redis
redis:
key-prefix: 'emoney-auto:'
use-key-prefix: true
devtools:
restart:
enabled: true
additional-exclude:
- '**/*.html'
- '**/*.js'
- '**/*.css'
additional-paths: lib/
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: Asia/Shanghai
profiles.active: remote
session.timeout: 86400
thymeleaf:
prefix: classpath:/webpage/
suffix: .html
mode: HTML
encoding: UTF-8
cache: false
mybatis-plus-join.banner: false
mybatis-plus:
global-config:
banner: false
mapper-locations:
- classpath*:mapper/postgre/*.xml
- classpath*:mapper/sqlite/*.xml
type-aliases-package:
- quant.rich.emoney.entity.postgre
- quant.rich.emoney.entity.sqlite
type-handlers-package: quant.rich.emoney.mybatis.typehandler
configuration:
map-underscore-to-camel-case: true
default-enum-type-handler: org.apache.ibatis.type.EnumOrdinalTypeHandler
kaptcha:
border: "no"
image:
width: 130
height: 38
textproducer:
char:
length: 5
font:
color: 35,37,38,80
size: 30
names: Times New Roman,Sans,Microsoft Yahei UI,Consolas
session:
key: code
noise:
color: 35,37,38,80
# 程序默认配置,部分内容可以通过数据库覆写
# 当数据库不存在配置时默认加载文件内配置
emoney-auto-config:
username: admin
password: Em0nY_4u70~!