mirror of
https://github.com/nagisa77/OpenIsle.git
synced 2026-02-18 21:10:57 +08:00
feat:Websocket服务拆到单独服务,主后台保持单工通信
This commit is contained in:
204
backend/src/main/java/com/openisle/config/RabbitMQConfig.java
Normal file
204
backend/src/main/java/com/openisle/config/RabbitMQConfig.java
Normal file
@@ -0,0 +1,204 @@
|
||||
package com.openisle.config;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class RabbitMQConfig {
|
||||
|
||||
public static final String EXCHANGE_NAME = "openisle-exchange";
|
||||
// 保持向后兼容的常量
|
||||
public static final String QUEUE_NAME = "notifications-queue";
|
||||
public static final String ROUTING_KEY = "notifications.routingkey";
|
||||
|
||||
// 硬编码为16以匹配ShardingStrategy中的十六进制分片逻辑
|
||||
private final int queueCount = 16;
|
||||
|
||||
@Value("${rabbitmq.queue.durable}")
|
||||
private boolean queueDurable;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
System.out.println("RabbitMQ配置初始化: 队列数量=" + queueCount + ", 持久化=" + queueDurable);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TopicExchange exchange() {
|
||||
return new TopicExchange(EXCHANGE_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建所有分片队列, 使用十六进制后缀 (0-f)
|
||||
*/
|
||||
@Bean
|
||||
public List<Queue> shardedQueues() {
|
||||
System.out.println("开始创建分片队列 Bean...");
|
||||
|
||||
List<Queue> queues = new ArrayList<>();
|
||||
for (int i = 0; i < queueCount; i++) {
|
||||
String shardKey = Integer.toHexString(i);
|
||||
String queueName = "notifications-queue-" + shardKey;
|
||||
Queue queue = new Queue(queueName, queueDurable);
|
||||
queues.add(queue);
|
||||
}
|
||||
|
||||
System.out.println("分片队列 Bean 创建完成,总数: " + queues.size());
|
||||
return queues;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建所有分片绑定, 使用十六进制路由键 (notifications.shard.0 - notifications.shard.f)
|
||||
*/
|
||||
@Bean
|
||||
public List<Binding> shardedBindings(TopicExchange exchange, @Qualifier("shardedQueues") List<Queue> shardedQueues) {
|
||||
System.out.println("开始创建分片绑定 Bean...");
|
||||
List<Binding> bindings = new ArrayList<>();
|
||||
if (shardedQueues != null) {
|
||||
for (Queue queue : shardedQueues) {
|
||||
String queueName = queue.getName();
|
||||
String shardKey = queueName.substring("notifications-queue-".length());
|
||||
String routingKey = "notifications.shard." + shardKey;
|
||||
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
|
||||
bindings.add(binding);
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("分片绑定 Bean 创建完成,总数: " + bindings.size());
|
||||
return bindings;
|
||||
}
|
||||
|
||||
/**
|
||||
* 保持向后兼容的单队列配置(可选)
|
||||
*/
|
||||
@Bean
|
||||
public Queue legacyQueue() {
|
||||
return new Queue(QUEUE_NAME, queueDurable);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保持向后兼容的单队列绑定(可选)
|
||||
*/
|
||||
@Bean
|
||||
public Binding legacyBinding(Queue legacyQueue, TopicExchange exchange) {
|
||||
return BindingBuilder.bind(legacyQueue).to(exchange).with(ROUTING_KEY);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Jackson2JsonMessageConverter messageConverter() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
|
||||
objectMapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
|
||||
return new Jackson2JsonMessageConverter(objectMapper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
||||
return new RabbitAdmin(connectionFactory);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
|
||||
RabbitTemplate template = new RabbitTemplate(connectionFactory);
|
||||
template.setMessageConverter(messageConverter());
|
||||
return template;
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用 CommandLineRunner 确保在应用完全启动后声明队列到 RabbitMQ
|
||||
* 这样可以确保 RabbitAdmin 和所有 Bean 都已正确初始化
|
||||
*/
|
||||
@Bean
|
||||
@DependsOn({"rabbitAdmin", "shardedQueues", "exchange"})
|
||||
public CommandLineRunner queueDeclarationRunner(RabbitAdmin rabbitAdmin,
|
||||
@Qualifier("shardedQueues") List<Queue> shardedQueues,
|
||||
TopicExchange exchange,
|
||||
Queue legacyQueue,
|
||||
@Qualifier("shardedBindings") List<Binding> shardedBindings,
|
||||
Binding legacyBinding) {
|
||||
return args -> {
|
||||
System.out.println("=== 开始主动声明 RabbitMQ 组件 ===");
|
||||
|
||||
try {
|
||||
// 声明交换
|
||||
rabbitAdmin.declareExchange(exchange);
|
||||
|
||||
// 声明分片队列 - 检查存在性
|
||||
System.out.println("开始检查并声明 " + shardedQueues.size() + " 个分片队列...");
|
||||
int successCount = 0;
|
||||
int skippedCount = 0;
|
||||
|
||||
for (Queue queue : shardedQueues) {
|
||||
String queueName = queue.getName();
|
||||
try {
|
||||
// 使用 declareQueue 的返回值判断队列是否已存在
|
||||
// 如果队列已存在且配置匹配,declareQueue 会返回现有队列信息
|
||||
// 如果不匹配或不存在,会创建新队列
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
successCount++;
|
||||
} catch (org.springframework.amqp.AmqpIOException e) {
|
||||
if (e.getMessage().contains("PRECONDITION_FAILED") && e.getMessage().contains("durable")) {
|
||||
skippedCount++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("队列声明失败: " + queueName + ", 错误: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
System.out.println("分片队列处理完成: 成功 " + successCount + ", 跳过 " + skippedCount + ", 总数 " + shardedQueues.size());
|
||||
|
||||
// 声明分片绑定
|
||||
System.out.println("开始声明 " + shardedBindings.size() + " 个分片绑定...");
|
||||
int bindingSuccessCount = 0;
|
||||
for (Binding binding : shardedBindings) {
|
||||
try {
|
||||
rabbitAdmin.declareBinding(binding);
|
||||
bindingSuccessCount++;
|
||||
} catch (Exception e) {
|
||||
System.err.println("绑定声明失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
System.out.println("分片绑定声明完成: 成功 " + bindingSuccessCount + "/" + shardedBindings.size());
|
||||
|
||||
// 声明遗留队列和绑定 - 检查存在性
|
||||
try {
|
||||
rabbitAdmin.declareQueue(legacyQueue);
|
||||
rabbitAdmin.declareBinding(legacyBinding);
|
||||
System.out.println("遗留队列和绑定就绪: " + QUEUE_NAME + " (已存在或新创建)");
|
||||
} catch (org.springframework.amqp.AmqpIOException e) {
|
||||
if (e.getMessage().contains("PRECONDITION_FAILED") && e.getMessage().contains("durable")) {
|
||||
System.out.println("遗留队列已存在但 durable 设置不匹配: " + QUEUE_NAME + ", 保持现有队列");
|
||||
} else {
|
||||
System.err.println("遗留队列声明失败: " + QUEUE_NAME + ", 错误: " + e.getMessage());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("遗留队列声明失败: " + QUEUE_NAME + ", 错误: " + e.getMessage());
|
||||
}
|
||||
|
||||
System.out.println("=== RabbitMQ 组件声明完成 ===");
|
||||
System.out.println("请检查 RabbitMQ 管理界面确认队列已正确创建");
|
||||
|
||||
} catch (Exception e) {
|
||||
System.err.println("RabbitMQ 组件声明过程中发生严重错误:");
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -74,10 +74,14 @@ public class SecurityConfig {
|
||||
CorsConfiguration cfg = new CorsConfiguration();
|
||||
cfg.setAllowedOrigins(List.of(
|
||||
"http://127.0.0.1:8080",
|
||||
"http://127.0.0.1:8081",
|
||||
"http://127.0.0.1:8082",
|
||||
"http://127.0.0.1:3000",
|
||||
"http://127.0.0.1:3001",
|
||||
"http://127.0.0.1",
|
||||
"http://localhost:8080",
|
||||
"http://localhost:8081",
|
||||
"http://localhost:8082",
|
||||
"http://localhost:3000",
|
||||
"http://localhost:3001",
|
||||
"http://localhost",
|
||||
|
||||
14
backend/src/main/java/com/openisle/config/ShardInfo.java
Normal file
14
backend/src/main/java/com/openisle/config/ShardInfo.java
Normal file
@@ -0,0 +1,14 @@
|
||||
package com.openisle.config;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class ShardInfo {
|
||||
private int shardIndex;
|
||||
private String queueName;
|
||||
private String routingKey;
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
package com.openisle.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class ShardingStrategy {
|
||||
|
||||
// 固定为16以匹配RabbitMQConfig中的十六进制分片逻辑
|
||||
private static final int QUEUE_COUNT = 16;
|
||||
|
||||
// 分片分布统计
|
||||
private final Map<Integer, AtomicLong> shardCounts = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 根据用户名获取分片信息(基于哈希值首字符)
|
||||
*/
|
||||
public ShardInfo getShardInfo(String username) {
|
||||
if (username == null || username.isEmpty()) {
|
||||
// 空用户名默认分到第0个分片
|
||||
return getShardInfoByIndex(0);
|
||||
}
|
||||
|
||||
// 计算用户名的哈希值并转为十六进制字符串
|
||||
String hash = Integer.toHexString(Math.abs(username.hashCode()));
|
||||
|
||||
// 取哈希值的第一个字符 (0-9, a-f)
|
||||
char firstChar = hash.charAt(0);
|
||||
|
||||
// 十六进制字符映射到队列
|
||||
int shard = getShardFromHexChar(firstChar);
|
||||
recordShardUsage(shard);
|
||||
|
||||
log.debug("Username '{}' -> hash '{}' -> firstChar '{}' -> shard {}",
|
||||
username, hash, firstChar, shard);
|
||||
|
||||
return getShardInfoByIndex(shard);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将十六进制字符映射到分片索引
|
||||
*/
|
||||
private int getShardFromHexChar(char hexChar) {
|
||||
int charValue;
|
||||
if (hexChar >= '0' && hexChar <= '9') {
|
||||
charValue = hexChar - '0'; // 0-9
|
||||
} else if (hexChar >= 'a' && hexChar <= 'f') {
|
||||
charValue = hexChar - 'a' + 10; // 10-15
|
||||
} else {
|
||||
// 异常情况,默认为0
|
||||
charValue = 0;
|
||||
}
|
||||
|
||||
// 映射到队列数量范围内
|
||||
return charValue % QUEUE_COUNT;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据分片索引获取分片信息
|
||||
*/
|
||||
private ShardInfo getShardInfoByIndex(int shard) {
|
||||
String shardKey = Integer.toHexString(shard);
|
||||
return new ShardInfo(
|
||||
shard,
|
||||
"notifications-queue-" + shardKey,
|
||||
"notifications.shard." + shardKey
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录分片使用统计
|
||||
*/
|
||||
private void recordShardUsage(int shard) {
|
||||
shardCounts.computeIfAbsent(shard, k -> new AtomicLong(0)).incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,110 +0,0 @@
|
||||
package com.openisle.config;
|
||||
|
||||
import com.openisle.service.JwtService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
import org.springframework.messaging.simp.stomp.StompCommand;
|
||||
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
|
||||
import org.springframework.messaging.support.ChannelInterceptor;
|
||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
||||
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.security.core.userdetails.UserDetailsService;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
|
||||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
|
||||
|
||||
@Configuration
|
||||
@EnableWebSocketMessageBroker
|
||||
@RequiredArgsConstructor
|
||||
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
|
||||
|
||||
private final JwtService jwtService;
|
||||
private final UserDetailsService userDetailsService;
|
||||
@Value("${app.website-url}")
|
||||
private String websiteUrl;
|
||||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry config) {
|
||||
// Enable a simple memory-based message broker to carry the messages back to the client on destinations prefixed with "/topic" and "/queue"
|
||||
config.enableSimpleBroker("/topic", "/queue");
|
||||
// Set user destination prefix for personal messages
|
||||
config.setUserDestinationPrefix("/user");
|
||||
// Designates the "/app" prefix for messages that are bound for @MessageMapping-annotated methods.
|
||||
config.setApplicationDestinationPrefixes("/app");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerStompEndpoints(StompEndpointRegistry registry) {
|
||||
// 1) 原生 WebSocket(不带 SockJS)
|
||||
registry.addEndpoint("/api/ws")
|
||||
.setAllowedOriginPatterns(
|
||||
"https://staging.open-isle.com",
|
||||
"https://www.staging.open-isle.com",
|
||||
websiteUrl,
|
||||
websiteUrl.replace("://www.", "://"),
|
||||
"http://localhost:*",
|
||||
"http://127.0.0.1:*",
|
||||
"http://192.168.7.98:*",
|
||||
"http://30.211.97.238:*"
|
||||
);
|
||||
|
||||
// 2) SockJS 回退:单独路径
|
||||
registry.addEndpoint("/api/sockjs")
|
||||
.setAllowedOriginPatterns(
|
||||
"https://staging.open-isle.com",
|
||||
"https://www.staging.open-isle.com",
|
||||
websiteUrl,
|
||||
websiteUrl.replace("://www.", "://"),
|
||||
"http://localhost:*",
|
||||
"http://127.0.0.1:*",
|
||||
"http://192.168.7.98:*",
|
||||
"http://30.211.97.238:*"
|
||||
)
|
||||
.withSockJS()
|
||||
.setWebSocketEnabled(true)
|
||||
.setSessionCookieNeeded(false);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
registration.interceptors(new ChannelInterceptor() {
|
||||
@Override
|
||||
public Message<?> preSend(Message<?> message, MessageChannel channel) {
|
||||
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
||||
|
||||
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
|
||||
System.out.println("WebSocket CONNECT command received");
|
||||
String authHeader = accessor.getFirstNativeHeader("Authorization");
|
||||
System.out.println("Authorization header: " + (authHeader != null ? "present" : "missing"));
|
||||
|
||||
if (authHeader != null && authHeader.startsWith("Bearer ")) {
|
||||
String token = authHeader.substring(7);
|
||||
try {
|
||||
String username = jwtService.validateAndGetSubject(token);
|
||||
System.out.println("JWT validated for user: " + username);
|
||||
var userDetails = userDetailsService.loadUserByUsername(username);
|
||||
Authentication auth = new UsernamePasswordAuthenticationToken(
|
||||
userDetails, null, userDetails.getAuthorities());
|
||||
accessor.setUser(auth);
|
||||
System.out.println("WebSocket user set: " + username);
|
||||
} catch (Exception e) {
|
||||
System.err.println("JWT validation failed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
} else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
|
||||
System.out.println("WebSocket SUBSCRIBE to: " + accessor.getDestination());
|
||||
System.out.println("WebSocket user during subscribe: " + (accessor.getUser() != null ? accessor.getUser().getName() : "null"));
|
||||
}
|
||||
return message;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.openisle.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MessageNotificationPayload implements Serializable {
|
||||
private String targetUsername;
|
||||
private Object payload;
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.openisle.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonBackReference;
|
||||
import jakarta.persistence.*;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -20,6 +21,7 @@ public class Message {
|
||||
|
||||
@ManyToOne(optional = false, fetch = FetchType.LAZY)
|
||||
@JoinColumn(name = "conversation_id")
|
||||
@JsonBackReference
|
||||
private MessageConversation conversation;
|
||||
|
||||
@ManyToOne(optional = false, fetch = FetchType.LAZY)
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.openisle.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonBackReference;
|
||||
import com.fasterxml.jackson.annotation.JsonManagedReference;
|
||||
import jakarta.persistence.*;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -41,8 +43,10 @@ public class MessageConversation {
|
||||
private Message lastMessage;
|
||||
|
||||
@OneToMany(mappedBy = "conversation", cascade = CascadeType.ALL, orphanRemoval = true)
|
||||
@JsonBackReference
|
||||
private Set<MessageParticipant> participants = new HashSet<>();
|
||||
|
||||
@OneToMany(mappedBy = "conversation", cascade = CascadeType.ALL, orphanRemoval = true)
|
||||
@JsonBackReference
|
||||
private Set<Message> messages = new HashSet<>();
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.openisle.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonBackReference;
|
||||
import jakarta.persistence.*;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -19,6 +20,7 @@ public class MessageParticipant {
|
||||
|
||||
@ManyToOne(optional = false, fetch = FetchType.LAZY)
|
||||
@JoinColumn(name = "conversation_id")
|
||||
@JsonBackReference
|
||||
private MessageConversation conversation;
|
||||
|
||||
@ManyToOne(optional = false, fetch = FetchType.LAZY)
|
||||
|
||||
@@ -11,6 +11,9 @@ import java.util.List;
|
||||
|
||||
@Repository
|
||||
public interface MessageConversationRepository extends JpaRepository<MessageConversation, Long> {
|
||||
|
||||
@Query("SELECT c FROM MessageConversation c LEFT JOIN FETCH c.participants p LEFT JOIN FETCH p.user WHERE c.id = :id")
|
||||
java.util.Optional<MessageConversation> findByIdWithParticipantsAndUsers(@Param("id") Long id);
|
||||
@Query("SELECT c FROM MessageConversation c " +
|
||||
"WHERE c.channel = false AND size(c.participants) = 2 " +
|
||||
"AND EXISTS (SELECT 1 FROM c.participants p1 WHERE p1.user = :user1) " +
|
||||
|
||||
@@ -16,16 +16,18 @@ import com.openisle.dto.MessageDto;
|
||||
import com.openisle.dto.ReactionDto;
|
||||
import com.openisle.dto.UserSummaryDto;
|
||||
import com.openisle.mapper.ReactionMapper;
|
||||
import com.openisle.dto.MessageNotificationPayload;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@@ -37,7 +39,7 @@ public class MessageService {
|
||||
private final MessageConversationRepository conversationRepository;
|
||||
private final MessageParticipantRepository participantRepository;
|
||||
private final UserRepository userRepository;
|
||||
private final SimpMessagingTemplate messagingTemplate;
|
||||
private final NotificationProducer notificationProducer;
|
||||
private final ReactionRepository reactionRepository;
|
||||
private final ReactionMapper reactionMapper;
|
||||
|
||||
@@ -69,26 +71,41 @@ public class MessageService {
|
||||
conversationRepository.save(conversation);
|
||||
log.info("Conversation {} updated with last message ID {}", conversation.getId(), message.getId());
|
||||
|
||||
// Broadcast the new message to subscribed clients
|
||||
MessageDto messageDto = toDto(message);
|
||||
String conversationDestination = "/topic/conversation/" + conversation.getId();
|
||||
messagingTemplate.convertAndSend(conversationDestination, messageDto);
|
||||
log.info("Message {} broadcasted to destination: {}", message.getId(), conversationDestination);
|
||||
|
||||
// Also notify the recipient on their personal channel to update the conversation list
|
||||
String userDestination = "/topic/user/" + recipient.getId() + "/messages";
|
||||
messagingTemplate.convertAndSend(userDestination, messageDto);
|
||||
log.info("Message {} notification sent to destination: {}", message.getId(), userDestination);
|
||||
|
||||
// Notify recipient of new unread count
|
||||
long unreadCount = getUnreadMessageCount(recipientId);
|
||||
log.info("Calculating unread count for user {}: {}", recipientId, unreadCount);
|
||||
|
||||
// Send using username instead of user ID for WebSocket routing
|
||||
String recipientUsername = recipient.getUsername();
|
||||
messagingTemplate.convertAndSendToUser(recipientUsername, "/queue/unread-count", unreadCount);
|
||||
log.info("Sent unread count {} to user {} (username: {}) via WebSocket destination: /user/{}/queue/unread-count",
|
||||
unreadCount, recipientId, recipientUsername, recipientUsername);
|
||||
try {
|
||||
MessageDto messageDto = toDto(message)
|
||||
|
||||
long unreadCount = getUnreadMessageCount(recipientId);
|
||||
|
||||
// 创建包含对话和参与者信息的完整payload
|
||||
Map<String, Object> conversationInfo = new HashMap<>();
|
||||
conversationInfo.put("id", conversation.getId());
|
||||
conversationInfo.put("participants", conversation.getParticipants().stream()
|
||||
.map(p -> {
|
||||
Map<String, Object> participantInfo = new HashMap<>();
|
||||
participantInfo.put("userId", p.getUser().getId());
|
||||
participantInfo.put("username", p.getUser().getUsername());
|
||||
return participantInfo;
|
||||
}).collect(Collectors.toList()));
|
||||
|
||||
Map<String, Object> combinedPayload = new HashMap<>();
|
||||
combinedPayload.put("message", messageDto);
|
||||
combinedPayload.put("unreadCount", unreadCount);
|
||||
combinedPayload.put("conversation", conversationInfo);
|
||||
combinedPayload.put("senderId", senderId);
|
||||
if (notificationProducer != null) {
|
||||
log.info("NotificationProducer is available");
|
||||
} else {
|
||||
log.info("ERROR: NotificationProducer is NULL!");
|
||||
return message;
|
||||
}
|
||||
log.info("Recipient username: {}", recipient.getUsername());
|
||||
|
||||
notificationProducer.sendNotification(new MessageNotificationPayload(recipient.getUsername(), combinedPayload));
|
||||
log.info("=== Notification call completed ===");
|
||||
} catch (Exception e) {
|
||||
log.error("=== Error in notification process ===", e);
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
@@ -97,7 +114,7 @@ public class MessageService {
|
||||
public Message sendMessageToConversation(Long senderId, Long conversationId, String content, Long replyToId) {
|
||||
User sender = userRepository.findById(senderId)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Sender not found"));
|
||||
MessageConversation conversation = conversationRepository.findById(conversationId)
|
||||
MessageConversation conversation = conversationRepository.findByIdWithParticipantsAndUsers(conversationId)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Conversation not found"));
|
||||
|
||||
// Join the conversation if not already a participant (useful for channels)
|
||||
@@ -125,20 +142,33 @@ public class MessageService {
|
||||
|
||||
MessageDto messageDto = toDto(message);
|
||||
String conversationDestination = "/topic/conversation/" + conversation.getId();
|
||||
messagingTemplate.convertAndSend(conversationDestination, messageDto);
|
||||
|
||||
// Notify all participants except sender for updates
|
||||
for (MessageParticipant participant : conversation.getParticipants()) {
|
||||
if (participant.getUser().getId().equals(senderId)) continue;
|
||||
String userDestination = "/topic/user/" + participant.getUser().getId() + "/messages";
|
||||
messagingTemplate.convertAndSend(userDestination, messageDto);
|
||||
|
||||
|
||||
long unreadCount = getUnreadMessageCount(participant.getUser().getId());
|
||||
String username = participant.getUser().getUsername();
|
||||
messagingTemplate.convertAndSendToUser(username, "/queue/unread-count", unreadCount);
|
||||
|
||||
long channelUnread = getUnreadChannelCount(participant.getUser().getId());
|
||||
messagingTemplate.convertAndSendToUser(username, "/queue/channel-unread", channelUnread);
|
||||
|
||||
Map<String, Object> combinedPayload = new HashMap<>();
|
||||
combinedPayload.put("message", messageDto);
|
||||
|
||||
Map<String, Object> conversationInfo = new HashMap<>();
|
||||
conversationInfo.put("id", conversation.getId());
|
||||
conversationInfo.put("participants", conversation.getParticipants().stream()
|
||||
.filter(item -> participant.getUser().getId().equals(item.getUser().getId()))
|
||||
.map(p -> {
|
||||
Map<String, Object> participantInfo = new HashMap<>();
|
||||
participantInfo.put("userId", p.getUser().getId());
|
||||
participantInfo.put("username", p.getUser().getUsername());
|
||||
return participantInfo;
|
||||
}).collect(Collectors.toList()));
|
||||
|
||||
combinedPayload.put("conversation", conversationInfo);
|
||||
combinedPayload.put("senderId", senderId);
|
||||
combinedPayload.put("unreadCount", unreadCount);
|
||||
combinedPayload.put("channelUnread", channelUnread);
|
||||
|
||||
notificationProducer.sendNotification(new MessageNotificationPayload(participant.getUser().getUsername(), combinedPayload));
|
||||
}
|
||||
|
||||
return message;
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.openisle.service;
|
||||
|
||||
import com.openisle.config.RabbitMQConfig;
|
||||
import com.openisle.config.ShardInfo;
|
||||
import com.openisle.config.ShardingStrategy;
|
||||
import com.openisle.dto.MessageNotificationPayload;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class NotificationProducer {
|
||||
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
private final ShardingStrategy shardingStrategy;
|
||||
|
||||
@Value("${rabbitmq.sharding.enabled}")
|
||||
private boolean shardingEnabled;
|
||||
|
||||
public void sendNotification(MessageNotificationPayload payload) {
|
||||
String targetUsername = payload.getTargetUsername();
|
||||
|
||||
try {
|
||||
if (shardingEnabled) {
|
||||
// 使用分片策略发送消息
|
||||
sendShardedNotification(payload, targetUsername);
|
||||
} else {
|
||||
// 使用原始单队列方式发送(向后兼容)
|
||||
sendLegacyNotification(payload);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to send message to RabbitMQ for user: {}", targetUsername, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用分片策略发送消息
|
||||
*/
|
||||
private void sendShardedNotification(MessageNotificationPayload payload, String targetUsername) {
|
||||
ShardInfo shardInfo = shardingStrategy.getShardInfo(targetUsername);
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitMQConfig.EXCHANGE_NAME,
|
||||
shardInfo.getRoutingKey(),
|
||||
payload
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用原始单队列方式发送消息(向后兼容)
|
||||
*/
|
||||
private void sendLegacyNotification(MessageNotificationPayload payload) {
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitMQConfig.EXCHANGE_NAME,
|
||||
RabbitMQConfig.ROUTING_KEY,
|
||||
payload
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user