diff --git a/backend/open-isle.env.example b/backend/open-isle.env.example
index 8e2d0b4b4..a3516e2b5 100644
--- a/backend/open-isle.env.example
+++ b/backend/open-isle.env.example
@@ -37,4 +37,10 @@ OPENAI_API_KEY=<你的openai-api-key>
WEBPUSH_PUBLIC_KEY=<你的webpush-public-key>
WEBPUSH_PRIVATE_KEY=<你的webpush-private-key>
+# === RabbitMQ ===
+RABBITMQ_HOST=<你的rabbitmq_host>
+RABBITMQ_PORT=<你的rabbitmq_port>
+RABBITMQ_USERNAME=<你的rabbitmq_username>
+RABBITMQ_PASSWORD=<你的rabbitmq_password>
+
# LOG_LEVEL=DEBUG
diff --git a/backend/pom.xml b/backend/pom.xml
index 4193590b6..ab71a3266 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -27,8 +27,8 @@
spring-boot-starter-web
- org.springframework.boot
- spring-boot-starter-websocket
+ org.springframework.boot
+ spring-boot-starter-amqp
org.slf4j
diff --git a/backend/src/main/java/com/openisle/config/RabbitMQConfig.java b/backend/src/main/java/com/openisle/config/RabbitMQConfig.java
new file mode 100644
index 000000000..996184ac4
--- /dev/null
+++ b/backend/src/main/java/com/openisle/config/RabbitMQConfig.java
@@ -0,0 +1,204 @@
+package com.openisle.config;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.annotation.DependsOn;
+
+import jakarta.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+
+@Configuration
+@RequiredArgsConstructor
+public class RabbitMQConfig {
+
+ public static final String EXCHANGE_NAME = "openisle-exchange";
+ // 保持向后兼容的常量
+ public static final String QUEUE_NAME = "notifications-queue";
+ public static final String ROUTING_KEY = "notifications.routingkey";
+
+ // 硬编码为16以匹配ShardingStrategy中的十六进制分片逻辑
+ private final int queueCount = 16;
+
+ @Value("${rabbitmq.queue.durable}")
+ private boolean queueDurable;
+
+ @PostConstruct
+ public void init() {
+ System.out.println("RabbitMQ配置初始化: 队列数量=" + queueCount + ", 持久化=" + queueDurable);
+ }
+
+ @Bean
+ public TopicExchange exchange() {
+ return new TopicExchange(EXCHANGE_NAME);
+ }
+
+ /**
+ * 创建所有分片队列, 使用十六进制后缀 (0-f)
+ */
+ @Bean
+ public List shardedQueues() {
+ System.out.println("开始创建分片队列 Bean...");
+
+ List queues = new ArrayList<>();
+ for (int i = 0; i < queueCount; i++) {
+ String shardKey = Integer.toHexString(i);
+ String queueName = "notifications-queue-" + shardKey;
+ Queue queue = new Queue(queueName, queueDurable);
+ queues.add(queue);
+ }
+
+ System.out.println("分片队列 Bean 创建完成,总数: " + queues.size());
+ return queues;
+ }
+
+ /**
+ * 创建所有分片绑定, 使用十六进制路由键 (notifications.shard.0 - notifications.shard.f)
+ */
+ @Bean
+ public List shardedBindings(TopicExchange exchange, @Qualifier("shardedQueues") List shardedQueues) {
+ System.out.println("开始创建分片绑定 Bean...");
+ List bindings = new ArrayList<>();
+ if (shardedQueues != null) {
+ for (Queue queue : shardedQueues) {
+ String queueName = queue.getName();
+ String shardKey = queueName.substring("notifications-queue-".length());
+ String routingKey = "notifications.shard." + shardKey;
+ Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
+ bindings.add(binding);
+ }
+ }
+
+ System.out.println("分片绑定 Bean 创建完成,总数: " + bindings.size());
+ return bindings;
+ }
+
+ /**
+ * 保持向后兼容的单队列配置(可选)
+ */
+ @Bean
+ public Queue legacyQueue() {
+ return new Queue(QUEUE_NAME, queueDurable);
+ }
+
+ /**
+ * 保持向后兼容的单队列绑定(可选)
+ */
+ @Bean
+ public Binding legacyBinding(Queue legacyQueue, TopicExchange exchange) {
+ return BindingBuilder.bind(legacyQueue).to(exchange).with(ROUTING_KEY);
+ }
+
+ @Bean
+ public Jackson2JsonMessageConverter messageConverter() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
+ objectMapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ return new Jackson2JsonMessageConverter(objectMapper);
+ }
+
+ @Bean
+ public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
+ return new RabbitAdmin(connectionFactory);
+ }
+
+ @Bean
+ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
+ RabbitTemplate template = new RabbitTemplate(connectionFactory);
+ template.setMessageConverter(messageConverter());
+ return template;
+ }
+
+ /**
+ * 使用 CommandLineRunner 确保在应用完全启动后声明队列到 RabbitMQ
+ * 这样可以确保 RabbitAdmin 和所有 Bean 都已正确初始化
+ */
+ @Bean
+ @DependsOn({"rabbitAdmin", "shardedQueues", "exchange"})
+ public CommandLineRunner queueDeclarationRunner(RabbitAdmin rabbitAdmin,
+ @Qualifier("shardedQueues") List shardedQueues,
+ TopicExchange exchange,
+ Queue legacyQueue,
+ @Qualifier("shardedBindings") List shardedBindings,
+ Binding legacyBinding) {
+ return args -> {
+ System.out.println("=== 开始主动声明 RabbitMQ 组件 ===");
+
+ try {
+ // 声明交换
+ rabbitAdmin.declareExchange(exchange);
+
+ // 声明分片队列 - 检查存在性
+ System.out.println("开始检查并声明 " + shardedQueues.size() + " 个分片队列...");
+ int successCount = 0;
+ int skippedCount = 0;
+
+ for (Queue queue : shardedQueues) {
+ String queueName = queue.getName();
+ try {
+ // 使用 declareQueue 的返回值判断队列是否已存在
+ // 如果队列已存在且配置匹配,declareQueue 会返回现有队列信息
+ // 如果不匹配或不存在,会创建新队列
+ rabbitAdmin.declareQueue(queue);
+ successCount++;
+ } catch (org.springframework.amqp.AmqpIOException e) {
+ if (e.getMessage().contains("PRECONDITION_FAILED") && e.getMessage().contains("durable")) {
+ skippedCount++;
+ }
+ } catch (Exception e) {
+ System.err.println("队列声明失败: " + queueName + ", 错误: " + e.getMessage());
+ }
+ }
+ System.out.println("分片队列处理完成: 成功 " + successCount + ", 跳过 " + skippedCount + ", 总数 " + shardedQueues.size());
+
+ // 声明分片绑定
+ System.out.println("开始声明 " + shardedBindings.size() + " 个分片绑定...");
+ int bindingSuccessCount = 0;
+ for (Binding binding : shardedBindings) {
+ try {
+ rabbitAdmin.declareBinding(binding);
+ bindingSuccessCount++;
+ } catch (Exception e) {
+ System.err.println("绑定声明失败: " + e.getMessage());
+ }
+ }
+ System.out.println("分片绑定声明完成: 成功 " + bindingSuccessCount + "/" + shardedBindings.size());
+
+ // 声明遗留队列和绑定 - 检查存在性
+ try {
+ rabbitAdmin.declareQueue(legacyQueue);
+ rabbitAdmin.declareBinding(legacyBinding);
+ System.out.println("遗留队列和绑定就绪: " + QUEUE_NAME + " (已存在或新创建)");
+ } catch (org.springframework.amqp.AmqpIOException e) {
+ if (e.getMessage().contains("PRECONDITION_FAILED") && e.getMessage().contains("durable")) {
+ System.out.println("遗留队列已存在但 durable 设置不匹配: " + QUEUE_NAME + ", 保持现有队列");
+ } else {
+ System.err.println("遗留队列声明失败: " + QUEUE_NAME + ", 错误: " + e.getMessage());
+ }
+ } catch (Exception e) {
+ System.err.println("遗留队列声明失败: " + QUEUE_NAME + ", 错误: " + e.getMessage());
+ }
+
+ System.out.println("=== RabbitMQ 组件声明完成 ===");
+ System.out.println("请检查 RabbitMQ 管理界面确认队列已正确创建");
+
+ } catch (Exception e) {
+ System.err.println("RabbitMQ 组件声明过程中发生严重错误:");
+ e.printStackTrace();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/backend/src/main/java/com/openisle/config/SecurityConfig.java b/backend/src/main/java/com/openisle/config/SecurityConfig.java
index bb6081aac..a864b6ba8 100644
--- a/backend/src/main/java/com/openisle/config/SecurityConfig.java
+++ b/backend/src/main/java/com/openisle/config/SecurityConfig.java
@@ -74,10 +74,14 @@ public class SecurityConfig {
CorsConfiguration cfg = new CorsConfiguration();
cfg.setAllowedOrigins(List.of(
"http://127.0.0.1:8080",
+ "http://127.0.0.1:8081",
+ "http://127.0.0.1:8082",
"http://127.0.0.1:3000",
"http://127.0.0.1:3001",
"http://127.0.0.1",
"http://localhost:8080",
+ "http://localhost:8081",
+ "http://localhost:8082",
"http://localhost:3000",
"http://localhost:3001",
"http://localhost",
diff --git a/backend/src/main/java/com/openisle/config/ShardInfo.java b/backend/src/main/java/com/openisle/config/ShardInfo.java
new file mode 100644
index 000000000..feae640ff
--- /dev/null
+++ b/backend/src/main/java/com/openisle/config/ShardInfo.java
@@ -0,0 +1,14 @@
+package com.openisle.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ShardInfo {
+ private int shardIndex;
+ private String queueName;
+ private String routingKey;
+}
\ No newline at end of file
diff --git a/backend/src/main/java/com/openisle/config/ShardingStrategy.java b/backend/src/main/java/com/openisle/config/ShardingStrategy.java
new file mode 100644
index 000000000..a5be64640
--- /dev/null
+++ b/backend/src/main/java/com/openisle/config/ShardingStrategy.java
@@ -0,0 +1,84 @@
+package com.openisle.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Component
+@Slf4j
+public class ShardingStrategy {
+
+ // 固定为16以匹配RabbitMQConfig中的十六进制分片逻辑
+ private static final int QUEUE_COUNT = 16;
+
+ // 分片分布统计
+ private final Map shardCounts = new ConcurrentHashMap<>();
+
+ /**
+ * 根据用户名获取分片信息(基于哈希值首字符)
+ */
+ public ShardInfo getShardInfo(String username) {
+ if (username == null || username.isEmpty()) {
+ // 空用户名默认分到第0个分片
+ return getShardInfoByIndex(0);
+ }
+
+ // 计算用户名的哈希值并转为十六进制字符串
+ String hash = Integer.toHexString(Math.abs(username.hashCode()));
+
+ // 取哈希值的第一个字符 (0-9, a-f)
+ char firstChar = hash.charAt(0);
+
+ // 十六进制字符映射到队列
+ int shard = getShardFromHexChar(firstChar);
+ recordShardUsage(shard);
+
+ log.debug("Username '{}' -> hash '{}' -> firstChar '{}' -> shard {}",
+ username, hash, firstChar, shard);
+
+ return getShardInfoByIndex(shard);
+ }
+
+ /**
+ * 将十六进制字符映射到分片索引
+ */
+ private int getShardFromHexChar(char hexChar) {
+ int charValue;
+ if (hexChar >= '0' && hexChar <= '9') {
+ charValue = hexChar - '0'; // 0-9
+ } else if (hexChar >= 'a' && hexChar <= 'f') {
+ charValue = hexChar - 'a' + 10; // 10-15
+ } else {
+ // 异常情况,默认为0
+ charValue = 0;
+ }
+
+ // 映射到队列数量范围内
+ return charValue % QUEUE_COUNT;
+ }
+
+ /**
+ * 根据分片索引获取分片信息
+ */
+ private ShardInfo getShardInfoByIndex(int shard) {
+ String shardKey = Integer.toHexString(shard);
+ return new ShardInfo(
+ shard,
+ "notifications-queue-" + shardKey,
+ "notifications.shard." + shardKey
+ );
+ }
+
+ /**
+ * 记录分片使用统计
+ */
+ private void recordShardUsage(int shard) {
+ shardCounts.computeIfAbsent(shard, k -> new AtomicLong(0)).incrementAndGet();
+ }
+
+}
\ No newline at end of file
diff --git a/backend/src/main/java/com/openisle/config/WebSocketConfig.java b/backend/src/main/java/com/openisle/config/WebSocketConfig.java
deleted file mode 100644
index f3576335b..000000000
--- a/backend/src/main/java/com/openisle/config/WebSocketConfig.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.openisle.config;
-
-import com.openisle.service.JwtService;
-import lombok.RequiredArgsConstructor;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.simp.config.ChannelRegistration;
-import org.springframework.messaging.simp.config.MessageBrokerRegistry;
-import org.springframework.messaging.simp.stomp.StompCommand;
-import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
-import org.springframework.messaging.support.ChannelInterceptor;
-import org.springframework.messaging.support.MessageHeaderAccessor;
-import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
-import org.springframework.security.core.Authentication;
-import org.springframework.security.core.userdetails.UserDetailsService;
-import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
-import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
-import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
-
-@Configuration
-@EnableWebSocketMessageBroker
-@RequiredArgsConstructor
-public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
-
- private final JwtService jwtService;
- private final UserDetailsService userDetailsService;
- @Value("${app.website-url}")
- private String websiteUrl;
-
- @Override
- public void configureMessageBroker(MessageBrokerRegistry config) {
- // Enable a simple memory-based message broker to carry the messages back to the client on destinations prefixed with "/topic" and "/queue"
- config.enableSimpleBroker("/topic", "/queue");
- // Set user destination prefix for personal messages
- config.setUserDestinationPrefix("/user");
- // Designates the "/app" prefix for messages that are bound for @MessageMapping-annotated methods.
- config.setApplicationDestinationPrefixes("/app");
- }
-
- @Override
- public void registerStompEndpoints(StompEndpointRegistry registry) {
- // 1) 原生 WebSocket(不带 SockJS)
- registry.addEndpoint("/api/ws")
- .setAllowedOriginPatterns(
- "https://staging.open-isle.com",
- "https://www.staging.open-isle.com",
- websiteUrl,
- websiteUrl.replace("://www.", "://"),
- "http://localhost:*",
- "http://127.0.0.1:*",
- "http://192.168.7.98:*",
- "http://30.211.97.238:*"
- );
-
- // 2) SockJS 回退:单独路径
- registry.addEndpoint("/api/sockjs")
- .setAllowedOriginPatterns(
- "https://staging.open-isle.com",
- "https://www.staging.open-isle.com",
- websiteUrl,
- websiteUrl.replace("://www.", "://"),
- "http://localhost:*",
- "http://127.0.0.1:*",
- "http://192.168.7.98:*",
- "http://30.211.97.238:*"
- )
- .withSockJS()
- .setWebSocketEnabled(true)
- .setSessionCookieNeeded(false);
- }
-
-
-
- @Override
- public void configureClientInboundChannel(ChannelRegistration registration) {
- registration.interceptors(new ChannelInterceptor() {
- @Override
- public Message> preSend(Message> message, MessageChannel channel) {
- StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
-
- if (StompCommand.CONNECT.equals(accessor.getCommand())) {
- System.out.println("WebSocket CONNECT command received");
- String authHeader = accessor.getFirstNativeHeader("Authorization");
- System.out.println("Authorization header: " + (authHeader != null ? "present" : "missing"));
-
- if (authHeader != null && authHeader.startsWith("Bearer ")) {
- String token = authHeader.substring(7);
- try {
- String username = jwtService.validateAndGetSubject(token);
- System.out.println("JWT validated for user: " + username);
- var userDetails = userDetailsService.loadUserByUsername(username);
- Authentication auth = new UsernamePasswordAuthenticationToken(
- userDetails, null, userDetails.getAuthorities());
- accessor.setUser(auth);
- System.out.println("WebSocket user set: " + username);
- } catch (Exception e) {
- System.err.println("JWT validation failed: " + e.getMessage());
- }
- }
- } else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
- System.out.println("WebSocket SUBSCRIBE to: " + accessor.getDestination());
- System.out.println("WebSocket user during subscribe: " + (accessor.getUser() != null ? accessor.getUser().getName() : "null"));
- }
- return message;
- }
- });
- }
-}
\ No newline at end of file
diff --git a/backend/src/main/java/com/openisle/dto/MessageNotificationPayload.java b/backend/src/main/java/com/openisle/dto/MessageNotificationPayload.java
new file mode 100644
index 000000000..72a4a143b
--- /dev/null
+++ b/backend/src/main/java/com/openisle/dto/MessageNotificationPayload.java
@@ -0,0 +1,15 @@
+package com.openisle.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class MessageNotificationPayload implements Serializable {
+ private String targetUsername;
+ private Object payload;
+}
\ No newline at end of file
diff --git a/backend/src/main/java/com/openisle/model/Message.java b/backend/src/main/java/com/openisle/model/Message.java
index 72edb3e2b..314c8f2ca 100644
--- a/backend/src/main/java/com/openisle/model/Message.java
+++ b/backend/src/main/java/com/openisle/model/Message.java
@@ -1,5 +1,6 @@
package com.openisle.model;
+import com.fasterxml.jackson.annotation.JsonBackReference;
import jakarta.persistence.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -20,6 +21,7 @@ public class Message {
@ManyToOne(optional = false, fetch = FetchType.LAZY)
@JoinColumn(name = "conversation_id")
+ @JsonBackReference
private MessageConversation conversation;
@ManyToOne(optional = false, fetch = FetchType.LAZY)
diff --git a/backend/src/main/java/com/openisle/model/MessageConversation.java b/backend/src/main/java/com/openisle/model/MessageConversation.java
index dfcda4e0c..638cfd39a 100644
--- a/backend/src/main/java/com/openisle/model/MessageConversation.java
+++ b/backend/src/main/java/com/openisle/model/MessageConversation.java
@@ -1,5 +1,7 @@
package com.openisle.model;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
import jakarta.persistence.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -41,8 +43,10 @@ public class MessageConversation {
private Message lastMessage;
@OneToMany(mappedBy = "conversation", cascade = CascadeType.ALL, orphanRemoval = true)
+ @JsonBackReference
private Set participants = new HashSet<>();
@OneToMany(mappedBy = "conversation", cascade = CascadeType.ALL, orphanRemoval = true)
+ @JsonBackReference
private Set messages = new HashSet<>();
}
\ No newline at end of file
diff --git a/backend/src/main/java/com/openisle/model/MessageParticipant.java b/backend/src/main/java/com/openisle/model/MessageParticipant.java
index d69901c8f..bb1805819 100644
--- a/backend/src/main/java/com/openisle/model/MessageParticipant.java
+++ b/backend/src/main/java/com/openisle/model/MessageParticipant.java
@@ -1,5 +1,6 @@
package com.openisle.model;
+import com.fasterxml.jackson.annotation.JsonBackReference;
import jakarta.persistence.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -19,6 +20,7 @@ public class MessageParticipant {
@ManyToOne(optional = false, fetch = FetchType.LAZY)
@JoinColumn(name = "conversation_id")
+ @JsonBackReference
private MessageConversation conversation;
@ManyToOne(optional = false, fetch = FetchType.LAZY)
diff --git a/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java b/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java
index d260c4f38..1c858af2f 100644
--- a/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java
+++ b/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java
@@ -11,6 +11,9 @@ import java.util.List;
@Repository
public interface MessageConversationRepository extends JpaRepository {
+
+ @Query("SELECT c FROM MessageConversation c LEFT JOIN FETCH c.participants p LEFT JOIN FETCH p.user WHERE c.id = :id")
+ java.util.Optional findByIdWithParticipantsAndUsers(@Param("id") Long id);
@Query("SELECT c FROM MessageConversation c " +
"WHERE c.channel = false AND size(c.participants) = 2 " +
"AND EXISTS (SELECT 1 FROM c.participants p1 WHERE p1.user = :user1) " +
diff --git a/backend/src/main/java/com/openisle/service/MessageService.java b/backend/src/main/java/com/openisle/service/MessageService.java
index 9993cd8f3..463a3b35d 100644
--- a/backend/src/main/java/com/openisle/service/MessageService.java
+++ b/backend/src/main/java/com/openisle/service/MessageService.java
@@ -16,16 +16,18 @@ import com.openisle.dto.MessageDto;
import com.openisle.dto.ReactionDto;
import com.openisle.dto.UserSummaryDto;
import com.openisle.mapper.ReactionMapper;
+import com.openisle.dto.MessageNotificationPayload;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
-import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
@Service
@@ -37,7 +39,7 @@ public class MessageService {
private final MessageConversationRepository conversationRepository;
private final MessageParticipantRepository participantRepository;
private final UserRepository userRepository;
- private final SimpMessagingTemplate messagingTemplate;
+ private final NotificationProducer notificationProducer;
private final ReactionRepository reactionRepository;
private final ReactionMapper reactionMapper;
@@ -69,26 +71,41 @@ public class MessageService {
conversationRepository.save(conversation);
log.info("Conversation {} updated with last message ID {}", conversation.getId(), message.getId());
- // Broadcast the new message to subscribed clients
- MessageDto messageDto = toDto(message);
- String conversationDestination = "/topic/conversation/" + conversation.getId();
- messagingTemplate.convertAndSend(conversationDestination, messageDto);
- log.info("Message {} broadcasted to destination: {}", message.getId(), conversationDestination);
-
- // Also notify the recipient on their personal channel to update the conversation list
- String userDestination = "/topic/user/" + recipient.getId() + "/messages";
- messagingTemplate.convertAndSend(userDestination, messageDto);
- log.info("Message {} notification sent to destination: {}", message.getId(), userDestination);
-
- // Notify recipient of new unread count
- long unreadCount = getUnreadMessageCount(recipientId);
- log.info("Calculating unread count for user {}: {}", recipientId, unreadCount);
- // Send using username instead of user ID for WebSocket routing
- String recipientUsername = recipient.getUsername();
- messagingTemplate.convertAndSendToUser(recipientUsername, "/queue/unread-count", unreadCount);
- log.info("Sent unread count {} to user {} (username: {}) via WebSocket destination: /user/{}/queue/unread-count",
- unreadCount, recipientId, recipientUsername, recipientUsername);
+ try {
+ MessageDto messageDto = toDto(message)
+
+ long unreadCount = getUnreadMessageCount(recipientId);
+
+ // 创建包含对话和参与者信息的完整payload
+ Map conversationInfo = new HashMap<>();
+ conversationInfo.put("id", conversation.getId());
+ conversationInfo.put("participants", conversation.getParticipants().stream()
+ .map(p -> {
+ Map participantInfo = new HashMap<>();
+ participantInfo.put("userId", p.getUser().getId());
+ participantInfo.put("username", p.getUser().getUsername());
+ return participantInfo;
+ }).collect(Collectors.toList()));
+
+ Map combinedPayload = new HashMap<>();
+ combinedPayload.put("message", messageDto);
+ combinedPayload.put("unreadCount", unreadCount);
+ combinedPayload.put("conversation", conversationInfo);
+ combinedPayload.put("senderId", senderId);
+ if (notificationProducer != null) {
+ log.info("NotificationProducer is available");
+ } else {
+ log.info("ERROR: NotificationProducer is NULL!");
+ return message;
+ }
+ log.info("Recipient username: {}", recipient.getUsername());
+
+ notificationProducer.sendNotification(new MessageNotificationPayload(recipient.getUsername(), combinedPayload));
+ log.info("=== Notification call completed ===");
+ } catch (Exception e) {
+ log.error("=== Error in notification process ===", e);
+ }
return message;
}
@@ -97,7 +114,7 @@ public class MessageService {
public Message sendMessageToConversation(Long senderId, Long conversationId, String content, Long replyToId) {
User sender = userRepository.findById(senderId)
.orElseThrow(() -> new IllegalArgumentException("Sender not found"));
- MessageConversation conversation = conversationRepository.findById(conversationId)
+ MessageConversation conversation = conversationRepository.findByIdWithParticipantsAndUsers(conversationId)
.orElseThrow(() -> new IllegalArgumentException("Conversation not found"));
// Join the conversation if not already a participant (useful for channels)
@@ -125,20 +142,33 @@ public class MessageService {
MessageDto messageDto = toDto(message);
String conversationDestination = "/topic/conversation/" + conversation.getId();
- messagingTemplate.convertAndSend(conversationDestination, messageDto);
- // Notify all participants except sender for updates
for (MessageParticipant participant : conversation.getParticipants()) {
if (participant.getUser().getId().equals(senderId)) continue;
- String userDestination = "/topic/user/" + participant.getUser().getId() + "/messages";
- messagingTemplate.convertAndSend(userDestination, messageDto);
-
+
long unreadCount = getUnreadMessageCount(participant.getUser().getId());
- String username = participant.getUser().getUsername();
- messagingTemplate.convertAndSendToUser(username, "/queue/unread-count", unreadCount);
-
long channelUnread = getUnreadChannelCount(participant.getUser().getId());
- messagingTemplate.convertAndSendToUser(username, "/queue/channel-unread", channelUnread);
+
+ Map combinedPayload = new HashMap<>();
+ combinedPayload.put("message", messageDto);
+
+ Map conversationInfo = new HashMap<>();
+ conversationInfo.put("id", conversation.getId());
+ conversationInfo.put("participants", conversation.getParticipants().stream()
+ .filter(item -> participant.getUser().getId().equals(item.getUser().getId()))
+ .map(p -> {
+ Map participantInfo = new HashMap<>();
+ participantInfo.put("userId", p.getUser().getId());
+ participantInfo.put("username", p.getUser().getUsername());
+ return participantInfo;
+ }).collect(Collectors.toList()));
+
+ combinedPayload.put("conversation", conversationInfo);
+ combinedPayload.put("senderId", senderId);
+ combinedPayload.put("unreadCount", unreadCount);
+ combinedPayload.put("channelUnread", channelUnread);
+
+ notificationProducer.sendNotification(new MessageNotificationPayload(participant.getUser().getUsername(), combinedPayload));
}
return message;
diff --git a/backend/src/main/java/com/openisle/service/NotificationProducer.java b/backend/src/main/java/com/openisle/service/NotificationProducer.java
new file mode 100644
index 000000000..f64f1a6f9
--- /dev/null
+++ b/backend/src/main/java/com/openisle/service/NotificationProducer.java
@@ -0,0 +1,63 @@
+package com.openisle.service;
+
+import com.openisle.config.RabbitMQConfig;
+import com.openisle.config.ShardInfo;
+import com.openisle.config.ShardingStrategy;
+import com.openisle.dto.MessageNotificationPayload;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class NotificationProducer {
+
+ private final RabbitTemplate rabbitTemplate;
+ private final ShardingStrategy shardingStrategy;
+
+ @Value("${rabbitmq.sharding.enabled}")
+ private boolean shardingEnabled;
+
+ public void sendNotification(MessageNotificationPayload payload) {
+ String targetUsername = payload.getTargetUsername();
+
+ try {
+ if (shardingEnabled) {
+ // 使用分片策略发送消息
+ sendShardedNotification(payload, targetUsername);
+ } else {
+ // 使用原始单队列方式发送(向后兼容)
+ sendLegacyNotification(payload);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send message to RabbitMQ for user: {}", targetUsername, e);
+ throw e;
+ }
+ }
+
+ /**
+ * 使用分片策略发送消息
+ */
+ private void sendShardedNotification(MessageNotificationPayload payload, String targetUsername) {
+ ShardInfo shardInfo = shardingStrategy.getShardInfo(targetUsername);
+ rabbitTemplate.convertAndSend(
+ RabbitMQConfig.EXCHANGE_NAME,
+ shardInfo.getRoutingKey(),
+ payload
+ );
+ }
+
+ /**
+ * 使用原始单队列方式发送消息(向后兼容)
+ */
+ private void sendLegacyNotification(MessageNotificationPayload payload) {
+ rabbitTemplate.convertAndSend(
+ RabbitMQConfig.EXCHANGE_NAME,
+ RabbitMQConfig.ROUTING_KEY,
+ payload
+ );
+ }
+}
\ No newline at end of file
diff --git a/backend/src/main/resources/application.properties b/backend/src/main/resources/application.properties
index 9e81189ed..62692b596 100644
--- a/backend/src/main/resources/application.properties
+++ b/backend/src/main/resources/application.properties
@@ -83,3 +83,13 @@ app.website-url=${WEBSITE_URL:https://www.open-isle.com}
# Web push configuration
app.webpush.public-key=${WEBPUSH_PUBLIC_KEY:}
app.webpush.private-key=${WEBPUSH_PRIVATE_KEY:}
+
+# RabbitMQ Configuration
+spring.rabbitmq.host=${RABBITMQ_HOST:localhost}
+spring.rabbitmq.port=${RABBITMQ_PORT:5672}
+spring.rabbitmq.username=${RABBITMQ_USERNAME:guest}
+spring.rabbitmq.password=${RABBITMQ_PASSWORD:guest}
+
+# RabbitMQ 队列配置 - 修改为非持久化以匹配现有队列
+rabbitmq.queue.durable=true
+rabbitmq.sharding.enabled=true
diff --git a/frontend_nuxt/.env.example b/frontend_nuxt/.env.example
index a57f4362a..000c9e9cd 100644
--- a/frontend_nuxt/.env.example
+++ b/frontend_nuxt/.env.example
@@ -1,5 +1,6 @@
; 本地部署后端
; NUXT_PUBLIC_API_BASE_URL=https://127.0.0.1:8081
+; NUXT_PUBLIC_WEBSOCKET_URL=https://127.0.0.1:8082
; 预发环境后端
; NUXT_PUBLIC_API_BASE_URL=https://staging.open-isle.com
; 生产环境后端
diff --git a/frontend_nuxt/.env.staging.example b/frontend_nuxt/.env.staging.example
index 3a2a26b41..726f59e44 100644
--- a/frontend_nuxt/.env.staging.example
+++ b/frontend_nuxt/.env.staging.example
@@ -1,5 +1,6 @@
; 本地部署后端
; NUXT_PUBLIC_API_BASE_URL=https://127.0.0.1:8081
+; NUXT_PUBLIC_WEBSOCKET_URL=https://127.0.0.1:8082
; 预发环境后端
NUXT_PUBLIC_API_BASE_URL=https://staging.open-isle.com
; 生产环境后端
diff --git a/frontend_nuxt/composables/useChannelsUnreadCount.js b/frontend_nuxt/composables/useChannelsUnreadCount.js
index 6db3d0c64..e684a0c53 100644
--- a/frontend_nuxt/composables/useChannelsUnreadCount.js
+++ b/frontend_nuxt/composables/useChannelsUnreadCount.js
@@ -3,82 +3,73 @@ import { useWebSocket } from './useWebSocket'
import { getToken } from '~/utils/auth'
const count = ref(0)
-let isInitialized = false
-let wsSubscription = null
+let isInitialized = false;
export function useChannelsUnreadCount() {
- const config = useRuntimeConfig()
- const API_BASE_URL = config.public.apiBaseUrl
- const { subscribe, isConnected, connect } = useWebSocket()
+ const config = useRuntimeConfig();
+ const API_BASE_URL = config.public.apiBaseUrl;
+ const { subscribe, isConnected, connect } = useWebSocket();
const fetchChannelUnread = async () => {
- const token = getToken()
+ const token = getToken();
if (!token) {
- count.value = 0
- return
+ count.value = 0;
+ return;
}
try {
const response = await fetch(`${API_BASE_URL}/api/channels/unread-count`, {
headers: { Authorization: `Bearer ${token}` },
- })
+ });
if (response.ok) {
- const data = await response.json()
- count.value = data
+ const data = await response.json();
+ count.value = data;
}
} catch (e) {
- console.error('Failed to fetch channel unread count:', e)
+ console.error('Failed to fetch channel unread count:', e);
}
- }
+ };
+
+ const setupWebSocketListener = () => {
+ const destination = '/user/queue/channel-unread';
+
+ subscribe(destination, (message) => {
+ const unread = parseInt(message.body, 10);
+ if (!isNaN(unread)) {
+ count.value = unread;
+ }
+ }).then(subscription => {
+ if (subscription) {
+ console.log('频道未读消息订阅成功');
+ }
+ });
+ };
const initialize = () => {
- const token = getToken()
+ const token = getToken();
if (!token) {
- count.value = 0
- return
+ count.value = 0;
+ return;
}
- fetchChannelUnread()
- if (!isConnected.value) {
- connect(token)
- }
- setupWebSocketListener()
- }
- const setupWebSocketListener = () => {
- if (!wsSubscription) {
- watch(
- isConnected,
- (newValue) => {
- if (newValue && !wsSubscription) {
- wsSubscription = subscribe('/user/queue/channel-unread', (message) => {
- const unread = parseInt(message.body, 10)
- if (!isNaN(unread)) {
- count.value = unread
- }
- })
- }
- },
- { immediate: true },
- )
+ if (!isConnected.value) {
+ connect(token);
}
- }
+
+ fetchChannelUnread();
+ setupWebSocketListener();
+ };
const setFromList = (channels) => {
- count.value = Array.isArray(channels) ? channels.filter((c) => c.unreadCount > 0).length : 0
- }
+ count.value = Array.isArray(channels) ? channels.filter((c) => c.unreadCount > 0).length : 0;
+ };
- const hasUnread = computed(() => count.value > 0)
+ const hasUnread = computed(() => count.value > 0);
- const token = getToken()
- if (token) {
- if (!isInitialized) {
- isInitialized = true
- initialize()
- } else {
- fetchChannelUnread()
- if (!isConnected.value) {
- connect(token)
- }
- setupWebSocketListener()
+ if (!isInitialized) {
+ const token = getToken();
+ if (token) {
+ isInitialized = true;
+ initialize();
}
}
@@ -88,5 +79,5 @@ export function useChannelsUnreadCount() {
fetchChannelUnread,
initialize,
setFromList,
- }
+ };
}
diff --git a/frontend_nuxt/composables/useUnreadCount.js b/frontend_nuxt/composables/useUnreadCount.js
index 381b67d6a..677083aea 100644
--- a/frontend_nuxt/composables/useUnreadCount.js
+++ b/frontend_nuxt/composables/useUnreadCount.js
@@ -4,7 +4,6 @@ import { getToken } from '~/utils/auth';
const count = ref(0);
let isInitialized = false;
-let wsSubscription = null;
export function useUnreadCount() {
const config = useRuntimeConfig();
@@ -30,64 +29,48 @@ export function useUnreadCount() {
}
};
- const initialize = async () => {
+ const setupWebSocketListener = () => {
+ console.log('设置未读消息订阅...');
+ const destination = '/user/queue/unread-count';
+
+ subscribe(destination, (message) => {
+ const unreadCount = parseInt(message.body, 10);
+ if (!isNaN(unreadCount)) {
+ count.value = unreadCount;
+ }
+ }).then(subscription => {
+ if (subscription) {
+ console.log('未读消息订阅成功');
+ }
+ });
+ };
+
+ const initialize = () => {
const token = getToken();
if (!token) {
count.value = 0;
return;
}
- // 总是获取最新的未读数量
- fetchUnreadCount();
-
- // 确保WebSocket连接
if (!isConnected.value) {
connect(token);
}
- // 设置WebSocket监听
- await setupWebSocketListener();
+ fetchUnreadCount();
+ setupWebSocketListener();
};
- const setupWebSocketListener = async () => {
- // 只有在还没有订阅的情况下才设置监听
- if (!wsSubscription) {
-
- watch(isConnected, (newValue) => {
- if (newValue && !wsSubscription) {
- const destination = `/user/queue/unread-count`;
- wsSubscription = subscribe(destination, (message) => {
- const unreadCount = parseInt(message.body, 10);
- if (!isNaN(unreadCount)) {
- count.value = unreadCount;
- }
- });
- }
- }, { immediate: true });
- }
- };
-
- // 自动初始化逻辑 - 确保每次调用都能获取到未读数量并设置监听
- const token = getToken();
- if (token) {
- if (!isInitialized) {
+ if (!isInitialized) {
+ const token = getToken();
+ if (token) {
isInitialized = true;
- initialize(); // 完整初始化,包括WebSocket监听
- } else {
- // 即使已经初始化,也要确保获取最新的未读数量并确保WebSocket监听存在
- fetchUnreadCount();
-
- // 确保WebSocket连接和监听都存在
- if (!isConnected.value) {
- connect(token);
- }
- setupWebSocketListener();
+ initialize();
}
}
return {
count,
fetchUnreadCount,
- initialize,
+ initialize,
};
}
\ No newline at end of file
diff --git a/frontend_nuxt/composables/useWebSocket.js b/frontend_nuxt/composables/useWebSocket.js
index c07d3a4ea..18a95b15f 100644
--- a/frontend_nuxt/composables/useWebSocket.js
+++ b/frontend_nuxt/composables/useWebSocket.js
@@ -1,86 +1,182 @@
-import { ref } from 'vue'
+import { ref, readonly, watch } from 'vue'
import { Client } from '@stomp/stompjs'
import SockJS from 'sockjs-client/dist/sockjs.min.js'
import { useRuntimeConfig } from '#app'
const client = ref(null)
const isConnected = ref(false)
+const activeSubscriptions = ref(new Map())
+// Store callbacks to allow for re-subscription after reconnect
+const resubscribeCallbacks = new Map()
+
+// Helper for unified subscription logging
+const logSubscriptionActivity = (action, destination, subscriptionId = 'N/A') => {
+ console.log(
+ `[SUB_MAN] ${action} | Dest: ${destination} | SubID: ${subscriptionId} | Active: ${activeSubscriptions.value.size}`
+ )
+}
const connect = (token) => {
- if (isConnected.value) {
+ if (isConnected.value || (client.value && client.value.active)) {
return
}
- const config = useRuntimeConfig()
- const API_BASE_URL = config.public.apiBaseUrl
- const socketUrl = `${API_BASE_URL}/api/sockjs`
- const socket = new SockJS(socketUrl)
+ const config = useRuntimeConfig()
+ const WEBSOCKET_URL = config.public.websocketUrl
+ const socketUrl = `${WEBSOCKET_URL}/api/sockjs`
+
const stompClient = new Client({
- webSocketFactory: () => socket,
+ webSocketFactory: () => new SockJS(socketUrl),
connectHeaders: {
Authorization: `Bearer ${token}`,
},
- debug: function (str) {},
- reconnectDelay: 5000,
+ debug: function (str) {
+
+ },
+ reconnectDelay: 10000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
})
stompClient.onConnect = (frame) => {
isConnected.value = true
+ resubscribeCallbacks.forEach((callback, destination) => {
+ doSubscribe(destination, callback)
+ })
}
stompClient.onStompError = (frame) => {
- console.error('WebSocket STOMP error:', frame)
+ console.error('Full frame:', frame)
+ }
+
+ stompClient.onWebSocketError = (event) => {
+
+ }
+
+ stompClient.onWebSocketClose = (event) => {
+ isConnected.value = false;
+ activeSubscriptions.value.clear();
+ logSubscriptionActivity('Cleared all subscriptions due to WebSocket close', 'N/A');
+ };
+
+ stompClient.onDisconnect = (frame) => {
+ isConnected.value = false
}
stompClient.activate()
client.value = stompClient
}
+const unsubscribe = (destination) => {
+ if (!destination) {
+ return false
+ }
+ const subscription = activeSubscriptions.value.get(destination)
+ if (subscription) {
+ try {
+ subscription.unsubscribe()
+ logSubscriptionActivity('Unsubscribed', destination, subscription.id)
+ } catch (e) {
+ console.error(`Error during unsubscribe for ${destination}:`, e)
+ } finally {
+ activeSubscriptions.value.delete(destination)
+ resubscribeCallbacks.delete(destination)
+ }
+ return true
+ } else {
+ return false
+ }
+}
+
+const unsubscribeAll = () => {
+ logSubscriptionActivity('Unsubscribing from ALL', `Total: ${activeSubscriptions.value.size}`)
+ const destinations = [...activeSubscriptions.value.keys()]
+ destinations.forEach(dest => {
+ unsubscribe(dest)
+ })
+}
+
const disconnect = () => {
+ unsubscribeAll()
if (client.value) {
- isConnected.value = false
- client.value.deactivate()
+ try {
+ client.value.deactivate()
+ } catch (e) {
+ console.error('Error during client deactivation:', e)
+ }
client.value = null
+ isConnected.value = false
+ }
+}
+
+const doSubscribe = (destination, callback) => {
+ try {
+ if (!client.value || !client.value.connected) {
+ return null
+ }
+
+ if (activeSubscriptions.value.has(destination)) {
+ unsubscribe(destination)
+ }
+
+ const subscription = client.value.subscribe(destination, (message) => {
+ callback(message)
+ })
+
+ if (subscription) {
+ activeSubscriptions.value.set(destination, subscription)
+ resubscribeCallbacks.set(destination, callback) // Store for re-subscription
+ logSubscriptionActivity('Subscribed', destination, subscription.id)
+ return subscription
+ } else {
+ return null
+ }
+ } catch (error) {
+ console.error(`Exception during subscription to ${destination}:`, error)
+ return null
}
}
const subscribe = (destination, callback) => {
- if (!isConnected.value || !client.value || !client.value.connected) {
- return null
+ if (!destination) {
+ return Promise.resolve(null)
}
- try {
- const subscription = client.value.subscribe(destination, (message) => {
- try {
- if (
- destination.includes('/queue/unread-count') ||
- destination.includes('/queue/channel-unread')
- ) {
- callback(message)
- } else {
- const parsedMessage = JSON.parse(message.body)
- callback(parsedMessage)
+ return new Promise((resolve) => {
+ if (client.value && client.value.connected) {
+ const sub = doSubscribe(destination, callback)
+ resolve(sub)
+ } else {
+ const unwatch = watch(isConnected, (newVal) => {
+ if (newVal) {
+ setTimeout(() => {
+ const sub = doSubscribe(destination, callback)
+ unwatch()
+ resolve(sub)
+ }, 100)
}
- } catch (error) {
- callback(message)
- }
- })
-
- return subscription
- } catch (error) {
- return null
- }
+ }, { immediate: false })
+
+ setTimeout(() => {
+ unwatch()
+ if (!isConnected.value) {
+ resolve(null)
+ }
+ }, 15000)
+ }
+ })
}
export function useWebSocket() {
return {
- client,
+ client: readonly(client),
isConnected,
connect,
disconnect,
subscribe,
+ unsubscribe,
+ unsubscribeAll,
+ activeSubscriptions: readonly(activeSubscriptions),
}
}
diff --git a/frontend_nuxt/nuxt.config.ts b/frontend_nuxt/nuxt.config.ts
index 6084395b5..ec01e7cb4 100644
--- a/frontend_nuxt/nuxt.config.ts
+++ b/frontend_nuxt/nuxt.config.ts
@@ -6,6 +6,7 @@ export default defineNuxtConfig({
runtimeConfig: {
public: {
apiBaseUrl: process.env.NUXT_PUBLIC_API_BASE_URL || '',
+ websocketUrl: process.env.NUXT_PUBLIC_WEBSOCKET_URL || '',
websiteBaseUrl: process.env.NUXT_PUBLIC_WEBSITE_BASE_URL || '',
googleClientId: process.env.NUXT_PUBLIC_GOOGLE_CLIENT_ID || '',
githubClientId: process.env.NUXT_PUBLIC_GITHUB_CLIENT_ID || '',
diff --git a/frontend_nuxt/pages/message-box/[id].vue b/frontend_nuxt/pages/message-box/[id].vue
index 546add168..a6a8f5d14 100644
--- a/frontend_nuxt/pages/message-box/[id].vue
+++ b/frontend_nuxt/pages/message-box/[id].vue
@@ -100,10 +100,9 @@ import BasePlaceholder from '~/components/BasePlaceholder.vue'
const config = useRuntimeConfig()
const route = useRoute()
const API_BASE_URL = config.public.apiBaseUrl
-const { connect, disconnect, subscribe, isConnected } = useWebSocket()
+const { connect, subscribe, unsubscribe, isConnected } = useWebSocket()
const { fetchUnreadCount: refreshGlobalUnreadCount } = useUnreadCount()
const { fetchChannelUnread: refreshChannelUnread } = useChannelsUnreadCount()
-let subscription = null
const messages = ref([])
const participants = ref([])
@@ -338,8 +337,12 @@ onMounted(async () => {
// 初次进入频道时,平滑滚动到底部
scrollToBottomSmooth()
const token = getToken()
- if (token && !isConnected.value) {
- connect(token)
+ if (token) {
+ if (isConnected.value) {
+ subscribeToConversation()
+ } else {
+ connect(token)
+ }
}
} else {
toast.error('请先登录')
@@ -347,26 +350,39 @@ onMounted(async () => {
}
})
+const subscribeToConversation = () => {
+ if (!currentUser.value) return;
+ const destination = `/topic/conversation/${conversationId}`
+
+ subscribe(destination, async (message) => {
+ try {
+ const parsedMessage = JSON.parse(message.body)
+
+ if (parsedMessage.sender && parsedMessage.sender.id === currentUser.value.id) {
+ return
+ }
+
+ messages.value.push({
+ ...parsedMessage,
+ src: parsedMessage.sender.avatar,
+ iconClick: () => openUser(parsedMessage.sender.id),
+ })
+
+ await markConversationAsRead()
+ await nextTick()
+
+ if (isUserNearBottom.value) {
+ scrollToBottomSmooth()
+ }
+ } catch (e) {
+ console.error("Failed to parse websocket message", e)
+ }
+ })
+}
+
watch(isConnected, (newValue) => {
if (newValue) {
- setTimeout(() => {
- subscription = subscribe(`/topic/conversation/${conversationId}`, async (message) => {
- // 避免重复显示当前用户发送的消息
- if (message.sender.id !== currentUser.value.id) {
- messages.value.push({
- ...message,
- src: message.sender.avatar,
- iconClick: () => {
- openUser(message.sender.id)
- },
- })
- // 收到消息后只标记已读,不强制滚动(符合“非发送不拉底”)
- markConversationAsRead()
- await nextTick()
- updateNearBottom()
- }
- })
- }, 500)
+ subscribeToConversation()
}
})
@@ -378,7 +394,12 @@ onActivated(async () => {
await nextTick()
scrollToBottomSmooth()
updateNearBottom()
- if (!isConnected.value) {
+
+ if (isConnected.value) {
+ // 如果已连接,重新订阅
+ subscribeToConversation()
+ } else {
+ // 如果未连接,则发起连接
const token = getToken()
if (token) connect(token)
}
@@ -386,22 +407,17 @@ onActivated(async () => {
})
onDeactivated(() => {
- if (subscription) {
- subscription.unsubscribe()
- subscription = null
- }
- disconnect()
+ const destination = `/topic/conversation/${conversationId}`
+ unsubscribe(destination)
})
onUnmounted(() => {
- if (subscription) {
- subscription.unsubscribe()
- subscription = null
- }
+ const destination = `/topic/conversation/${conversationId}`
+ unsubscribe(destination)
+
if (messagesListEl.value) {
messagesListEl.value.removeEventListener('scroll', updateNearBottom)
}
- disconnect()
})
function minimize() {
diff --git a/frontend_nuxt/pages/message-box/index.vue b/frontend_nuxt/pages/message-box/index.vue
index 4c52f9d86..735a5597b 100644
--- a/frontend_nuxt/pages/message-box/index.vue
+++ b/frontend_nuxt/pages/message-box/index.vue
@@ -118,7 +118,7 @@