集成rocketMq消息队列收发消息

This commit is contained in:
海言
2025-10-13 15:21:33 +08:00
parent 800dc8f878
commit 56fdc15090
8 changed files with 117 additions and 6 deletions

View File

@@ -22,7 +22,7 @@
<jwt.version>3.19.2</jwt.version>
<commons-pool2.version>2.11.1</commons-pool2.version>
<fastjson.version>2.0.8</fastjson.version>
<logstash.version> 5.3</logstash.version>
<logstash.version>5.3</logstash.version>
<spring-cloud-alibaba.version>2023.0.1.0</spring-cloud-alibaba.version>
<spring-cloud.version>2023.0.1</spring-cloud.version>
<elasticsearch.version>8.16.0</elasticsearch.version>
@@ -128,6 +128,12 @@
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- rocketMQ 消息队列 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>
</dependencies>
<build>

View File

@@ -1,10 +1,13 @@
package cn.xf.basedemo.config;
import io.swagger.v3.oas.models.Components;
import io.swagger.v3.oas.models.ExternalDocumentation;
import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import io.swagger.v3.oas.models.info.License;
import io.swagger.v3.oas.models.security.SecurityRequirement;
import io.swagger.v3.oas.models.security.SecurityScheme;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@@ -21,6 +24,8 @@ public class SwaggerGroupApi {
@Bean
public OpenAPI springShopOpenAPI() {
// 定义全局安全方案名称
String securitySchemeName = "Authorization";
return new OpenAPI()
.info(new Info().title("Spring boot脚手架 API")
.description("开箱即用的Spring boot脚手架 API")
@@ -29,7 +34,16 @@ public class SwaggerGroupApi {
.license(new License().name("Apache 2.0").url("http://springdoc.org")))
.externalDocs(new ExternalDocumentation()
.description("Spring boot脚手架 Wiki Documentation")
.url("https://springshop.wiki.github.org/docs"));
.url("https://springshop.wiki.github.org/docs")) // 添加安全组件
.components(new Components()
.addSecuritySchemes(securitySchemeName,
new SecurityScheme()
.type(SecurityScheme.Type.HTTP)
.scheme("bearer")
.bearerFormat("JWT")
))
// 将安全方案应用到全局
.addSecurityItem(new SecurityRequirement().addList(securitySchemeName));
}
@Bean

View File

@@ -27,28 +27,35 @@ public class UserController {
@Operation(summary = "用户登录", description = "用户登录")
@PostMapping("/login")
public RetObj login(@RequestBody LoginInfoRes res){
public RetObj login(@RequestBody LoginInfoRes res) {
return userService.login(res);
}
@Operation(summary = "用户信息", description = "用户信息")
@PostMapping("/info")
public RetObj info(){
public RetObj info() {
LoginUser loginUser = SessionContext.getInstance().get();
return RetObj.success(loginUser);
}
@Operation(summary = "es同步用户信息", description = "用户信息")
@GetMapping("/syncEs")
public RetObj syncEs(Long userId){
public RetObj syncEs(Long userId) {
return userService.syncEs(userId);
}
@Operation(summary = "es查询用户信息", description = "用户信息")
@GetMapping("/getEsId")
public RetObj getEsId(Long userId){
public RetObj getEsId(Long userId) {
return userService.getEsId(userId);
}
//发送队列消息
@Operation(summary = "发送队列消息", description = "发送队列消息")
@GetMapping("/sendMsg")
public RetObj sendMsg(String msg) {
return userService.sendMQMsg(msg);
}
}

View File

@@ -49,6 +49,12 @@ public class TokenInterceptor implements HandlerInterceptor {
token = request.getParameter("token");
if (StringUtils.isEmpty(token)) {
throw new LoginException("请先登录");
}else {
//验证token
if (!token.startsWith("Bearer ")) {
throw new LoginException(ResponseCode.USER_INPUT_ERROR);
}
token = token.substring(7);
}
String value = (String) redisTemplate.opsForValue().get("token:" + token);
if (StringUtils.isEmpty(value)) {

View File

@@ -0,0 +1,25 @@
package cn.xf.basedemo.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* RocketMqMsgComsumer
*
* @author 海言
* @date 2025/10/13
* @time 14:37
* @Description
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "user-topic",consumerGroup = "consumer-group")
public class RocketMqMsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到消息---------{}",s);
}
}

View File

@@ -0,0 +1,39 @@
package cn.xf.basedemo.mq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
/**
* RocketMqMsgProducer
*
* @author 海言
* @date 2025/10/13
* @time 14:34
* @Description
*/
@Slf4j
@Service
public class RocketMqMsgProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
//发送普通消息
public void sendMsg(String topic, String msg) {
rocketMQTemplate.convertAndSend(topic, msg);
log.info("发送普通消息:{}", msg);
}
//发送带标签的消息
public void sendMsg(String topic, String tag, String msg) {
rocketMQTemplate.convertAndSend(topic + ":" + tag, msg);
}
//发送延迟消息
public void sendDelayMsg(String topic, String msg, int delayLevel) {
rocketMQTemplate.syncSendDelayTimeMills(topic, msg, delayLevel);
}
}

View File

@@ -17,4 +17,6 @@ public interface UserService {
RetObj syncEs(Long userId);
RetObj getEsId(Long userId);
RetObj sendMQMsg(String msg);
}

View File

@@ -12,16 +12,19 @@ import cn.xf.basedemo.config.GlobalConfig;
import cn.xf.basedemo.mappers.UserMapper;
import cn.xf.basedemo.model.domain.User;
import cn.xf.basedemo.model.res.LoginInfoRes;
import cn.xf.basedemo.mq.RocketMqMsgProducer;
import cn.xf.basedemo.service.UserService;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RestController;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -49,6 +52,9 @@ public class UserServiceImpl implements UserService {
@Autowired
private RedisTemplate redisTemplate;
@Resource
private RocketMqMsgProducer rocketMqMsgProducer;
@Override
public RetObj login(LoginInfoRes res) {
@@ -116,4 +122,10 @@ public class UserServiceImpl implements UserService {
}
return RetObj.error("es中不存在该用户");
}
@Override
public RetObj sendMQMsg(String msg) {
rocketMqMsgProducer.sendMsg("user-topic", msg);
return RetObj.success();
}
}