diff --git a/backend/open-isle.env.example b/backend/open-isle.env.example index a8cba1126..ab495f8e4 100644 --- a/backend/open-isle.env.example +++ b/backend/open-isle.env.example @@ -40,4 +40,10 @@ OPENAI_API_KEY=<你的openai-api-key> WEBPUSH_PUBLIC_KEY=<你的webpush-public-key> WEBPUSH_PRIVATE_KEY=<你的webpush-private-key> +# === RabbitMQ === +RABBITMQ_HOST=<你的rabbitmq_host> +RABBITMQ_PORT=<你的rabbitmq_port> +RABBITMQ_USERNAME=<你的rabbitmq_username> +RABBITMQ_PASSWORD=<你的rabbitmq_password> + # LOG_LEVEL=DEBUG diff --git a/backend/pom.xml b/backend/pom.xml index dd3bcf62c..f162908bc 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -27,8 +27,8 @@ spring-boot-starter-web - org.springframework.boot - spring-boot-starter-websocket + org.springframework.boot + spring-boot-starter-amqp org.slf4j diff --git a/backend/src/main/java/com/openisle/config/RabbitMQConfig.java b/backend/src/main/java/com/openisle/config/RabbitMQConfig.java new file mode 100644 index 000000000..996184ac4 --- /dev/null +++ b/backend/src/main/java/com/openisle/config/RabbitMQConfig.java @@ -0,0 +1,204 @@ +package com.openisle.config; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.annotation.DependsOn; + +import jakarta.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; + +@Configuration +@RequiredArgsConstructor +public class RabbitMQConfig { + + public static final String EXCHANGE_NAME = "openisle-exchange"; + // 保持向后兼容的常量 + public static final String QUEUE_NAME = "notifications-queue"; + public static final String ROUTING_KEY = "notifications.routingkey"; + + // 硬编码为16以匹配ShardingStrategy中的十六进制分片逻辑 + private final int queueCount = 16; + + @Value("${rabbitmq.queue.durable}") + private boolean queueDurable; + + @PostConstruct + public void init() { + System.out.println("RabbitMQ配置初始化: 队列数量=" + queueCount + ", 持久化=" + queueDurable); + } + + @Bean + public TopicExchange exchange() { + return new TopicExchange(EXCHANGE_NAME); + } + + /** + * 创建所有分片队列, 使用十六进制后缀 (0-f) + */ + @Bean + public List shardedQueues() { + System.out.println("开始创建分片队列 Bean..."); + + List queues = new ArrayList<>(); + for (int i = 0; i < queueCount; i++) { + String shardKey = Integer.toHexString(i); + String queueName = "notifications-queue-" + shardKey; + Queue queue = new Queue(queueName, queueDurable); + queues.add(queue); + } + + System.out.println("分片队列 Bean 创建完成,总数: " + queues.size()); + return queues; + } + + /** + * 创建所有分片绑定, 使用十六进制路由键 (notifications.shard.0 - notifications.shard.f) + */ + @Bean + public List shardedBindings(TopicExchange exchange, @Qualifier("shardedQueues") List shardedQueues) { + System.out.println("开始创建分片绑定 Bean..."); + List bindings = new ArrayList<>(); + if (shardedQueues != null) { + for (Queue queue : shardedQueues) { + String queueName = queue.getName(); + String shardKey = queueName.substring("notifications-queue-".length()); + String routingKey = "notifications.shard." + shardKey; + Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey); + bindings.add(binding); + } + } + + System.out.println("分片绑定 Bean 创建完成,总数: " + bindings.size()); + return bindings; + } + + /** + * 保持向后兼容的单队列配置(可选) + */ + @Bean + public Queue legacyQueue() { + return new Queue(QUEUE_NAME, queueDurable); + } + + /** + * 保持向后兼容的单队列绑定(可选) + */ + @Bean + public Binding legacyBinding(Queue legacyQueue, TopicExchange exchange) { + return BindingBuilder.bind(legacyQueue).to(exchange).with(ROUTING_KEY); + } + + @Bean + public Jackson2JsonMessageConverter messageConverter() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule()); + objectMapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + return new Jackson2JsonMessageConverter(objectMapper); + } + + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + return new RabbitAdmin(connectionFactory); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + + /** + * 使用 CommandLineRunner 确保在应用完全启动后声明队列到 RabbitMQ + * 这样可以确保 RabbitAdmin 和所有 Bean 都已正确初始化 + */ + @Bean + @DependsOn({"rabbitAdmin", "shardedQueues", "exchange"}) + public CommandLineRunner queueDeclarationRunner(RabbitAdmin rabbitAdmin, + @Qualifier("shardedQueues") List shardedQueues, + TopicExchange exchange, + Queue legacyQueue, + @Qualifier("shardedBindings") List shardedBindings, + Binding legacyBinding) { + return args -> { + System.out.println("=== 开始主动声明 RabbitMQ 组件 ==="); + + try { + // 声明交换 + rabbitAdmin.declareExchange(exchange); + + // 声明分片队列 - 检查存在性 + System.out.println("开始检查并声明 " + shardedQueues.size() + " 个分片队列..."); + int successCount = 0; + int skippedCount = 0; + + for (Queue queue : shardedQueues) { + String queueName = queue.getName(); + try { + // 使用 declareQueue 的返回值判断队列是否已存在 + // 如果队列已存在且配置匹配,declareQueue 会返回现有队列信息 + // 如果不匹配或不存在,会创建新队列 + rabbitAdmin.declareQueue(queue); + successCount++; + } catch (org.springframework.amqp.AmqpIOException e) { + if (e.getMessage().contains("PRECONDITION_FAILED") && e.getMessage().contains("durable")) { + skippedCount++; + } + } catch (Exception e) { + System.err.println("队列声明失败: " + queueName + ", 错误: " + e.getMessage()); + } + } + System.out.println("分片队列处理完成: 成功 " + successCount + ", 跳过 " + skippedCount + ", 总数 " + shardedQueues.size()); + + // 声明分片绑定 + System.out.println("开始声明 " + shardedBindings.size() + " 个分片绑定..."); + int bindingSuccessCount = 0; + for (Binding binding : shardedBindings) { + try { + rabbitAdmin.declareBinding(binding); + bindingSuccessCount++; + } catch (Exception e) { + System.err.println("绑定声明失败: " + e.getMessage()); + } + } + System.out.println("分片绑定声明完成: 成功 " + bindingSuccessCount + "/" + shardedBindings.size()); + + // 声明遗留队列和绑定 - 检查存在性 + try { + rabbitAdmin.declareQueue(legacyQueue); + rabbitAdmin.declareBinding(legacyBinding); + System.out.println("遗留队列和绑定就绪: " + QUEUE_NAME + " (已存在或新创建)"); + } catch (org.springframework.amqp.AmqpIOException e) { + if (e.getMessage().contains("PRECONDITION_FAILED") && e.getMessage().contains("durable")) { + System.out.println("遗留队列已存在但 durable 设置不匹配: " + QUEUE_NAME + ", 保持现有队列"); + } else { + System.err.println("遗留队列声明失败: " + QUEUE_NAME + ", 错误: " + e.getMessage()); + } + } catch (Exception e) { + System.err.println("遗留队列声明失败: " + QUEUE_NAME + ", 错误: " + e.getMessage()); + } + + System.out.println("=== RabbitMQ 组件声明完成 ==="); + System.out.println("请检查 RabbitMQ 管理界面确认队列已正确创建"); + + } catch (Exception e) { + System.err.println("RabbitMQ 组件声明过程中发生严重错误:"); + e.printStackTrace(); + } + }; + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/openisle/config/SecurityConfig.java b/backend/src/main/java/com/openisle/config/SecurityConfig.java index a3381d934..72421850c 100644 --- a/backend/src/main/java/com/openisle/config/SecurityConfig.java +++ b/backend/src/main/java/com/openisle/config/SecurityConfig.java @@ -74,10 +74,14 @@ public class SecurityConfig { CorsConfiguration cfg = new CorsConfiguration(); cfg.setAllowedOrigins(List.of( "http://127.0.0.1:8080", + "http://127.0.0.1:8081", + "http://127.0.0.1:8082", "http://127.0.0.1:3000", "http://127.0.0.1:3001", "http://127.0.0.1", "http://localhost:8080", + "http://localhost:8081", + "http://localhost:8082", "http://localhost:3000", "http://localhost:3001", "http://localhost", diff --git a/backend/src/main/java/com/openisle/config/ShardInfo.java b/backend/src/main/java/com/openisle/config/ShardInfo.java new file mode 100644 index 000000000..feae640ff --- /dev/null +++ b/backend/src/main/java/com/openisle/config/ShardInfo.java @@ -0,0 +1,14 @@ +package com.openisle.config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ShardInfo { + private int shardIndex; + private String queueName; + private String routingKey; +} \ No newline at end of file diff --git a/backend/src/main/java/com/openisle/config/ShardingStrategy.java b/backend/src/main/java/com/openisle/config/ShardingStrategy.java new file mode 100644 index 000000000..a5be64640 --- /dev/null +++ b/backend/src/main/java/com/openisle/config/ShardingStrategy.java @@ -0,0 +1,84 @@ +package com.openisle.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.Map; +import java.util.stream.Collectors; + +@Component +@Slf4j +public class ShardingStrategy { + + // 固定为16以匹配RabbitMQConfig中的十六进制分片逻辑 + private static final int QUEUE_COUNT = 16; + + // 分片分布统计 + private final Map shardCounts = new ConcurrentHashMap<>(); + + /** + * 根据用户名获取分片信息(基于哈希值首字符) + */ + public ShardInfo getShardInfo(String username) { + if (username == null || username.isEmpty()) { + // 空用户名默认分到第0个分片 + return getShardInfoByIndex(0); + } + + // 计算用户名的哈希值并转为十六进制字符串 + String hash = Integer.toHexString(Math.abs(username.hashCode())); + + // 取哈希值的第一个字符 (0-9, a-f) + char firstChar = hash.charAt(0); + + // 十六进制字符映射到队列 + int shard = getShardFromHexChar(firstChar); + recordShardUsage(shard); + + log.debug("Username '{}' -> hash '{}' -> firstChar '{}' -> shard {}", + username, hash, firstChar, shard); + + return getShardInfoByIndex(shard); + } + + /** + * 将十六进制字符映射到分片索引 + */ + private int getShardFromHexChar(char hexChar) { + int charValue; + if (hexChar >= '0' && hexChar <= '9') { + charValue = hexChar - '0'; // 0-9 + } else if (hexChar >= 'a' && hexChar <= 'f') { + charValue = hexChar - 'a' + 10; // 10-15 + } else { + // 异常情况,默认为0 + charValue = 0; + } + + // 映射到队列数量范围内 + return charValue % QUEUE_COUNT; + } + + /** + * 根据分片索引获取分片信息 + */ + private ShardInfo getShardInfoByIndex(int shard) { + String shardKey = Integer.toHexString(shard); + return new ShardInfo( + shard, + "notifications-queue-" + shardKey, + "notifications.shard." + shardKey + ); + } + + /** + * 记录分片使用统计 + */ + private void recordShardUsage(int shard) { + shardCounts.computeIfAbsent(shard, k -> new AtomicLong(0)).incrementAndGet(); + } + +} \ No newline at end of file diff --git a/backend/src/main/java/com/openisle/config/WebSocketConfig.java b/backend/src/main/java/com/openisle/config/WebSocketConfig.java deleted file mode 100644 index f3576335b..000000000 --- a/backend/src/main/java/com/openisle/config/WebSocketConfig.java +++ /dev/null @@ -1,110 +0,0 @@ -package com.openisle.config; - -import com.openisle.service.JwtService; -import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.simp.config.ChannelRegistration; -import org.springframework.messaging.simp.config.MessageBrokerRegistry; -import org.springframework.messaging.simp.stomp.StompCommand; -import org.springframework.messaging.simp.stomp.StompHeaderAccessor; -import org.springframework.messaging.support.ChannelInterceptor; -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.security.authentication.UsernamePasswordAuthenticationToken; -import org.springframework.security.core.Authentication; -import org.springframework.security.core.userdetails.UserDetailsService; -import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; -import org.springframework.web.socket.config.annotation.StompEndpointRegistry; -import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; - -@Configuration -@EnableWebSocketMessageBroker -@RequiredArgsConstructor -public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { - - private final JwtService jwtService; - private final UserDetailsService userDetailsService; - @Value("${app.website-url}") - private String websiteUrl; - - @Override - public void configureMessageBroker(MessageBrokerRegistry config) { - // Enable a simple memory-based message broker to carry the messages back to the client on destinations prefixed with "/topic" and "/queue" - config.enableSimpleBroker("/topic", "/queue"); - // Set user destination prefix for personal messages - config.setUserDestinationPrefix("/user"); - // Designates the "/app" prefix for messages that are bound for @MessageMapping-annotated methods. - config.setApplicationDestinationPrefixes("/app"); - } - - @Override - public void registerStompEndpoints(StompEndpointRegistry registry) { - // 1) 原生 WebSocket(不带 SockJS) - registry.addEndpoint("/api/ws") - .setAllowedOriginPatterns( - "https://staging.open-isle.com", - "https://www.staging.open-isle.com", - websiteUrl, - websiteUrl.replace("://www.", "://"), - "http://localhost:*", - "http://127.0.0.1:*", - "http://192.168.7.98:*", - "http://30.211.97.238:*" - ); - - // 2) SockJS 回退:单独路径 - registry.addEndpoint("/api/sockjs") - .setAllowedOriginPatterns( - "https://staging.open-isle.com", - "https://www.staging.open-isle.com", - websiteUrl, - websiteUrl.replace("://www.", "://"), - "http://localhost:*", - "http://127.0.0.1:*", - "http://192.168.7.98:*", - "http://30.211.97.238:*" - ) - .withSockJS() - .setWebSocketEnabled(true) - .setSessionCookieNeeded(false); - } - - - - @Override - public void configureClientInboundChannel(ChannelRegistration registration) { - registration.interceptors(new ChannelInterceptor() { - @Override - public Message preSend(Message message, MessageChannel channel) { - StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); - - if (StompCommand.CONNECT.equals(accessor.getCommand())) { - System.out.println("WebSocket CONNECT command received"); - String authHeader = accessor.getFirstNativeHeader("Authorization"); - System.out.println("Authorization header: " + (authHeader != null ? "present" : "missing")); - - if (authHeader != null && authHeader.startsWith("Bearer ")) { - String token = authHeader.substring(7); - try { - String username = jwtService.validateAndGetSubject(token); - System.out.println("JWT validated for user: " + username); - var userDetails = userDetailsService.loadUserByUsername(username); - Authentication auth = new UsernamePasswordAuthenticationToken( - userDetails, null, userDetails.getAuthorities()); - accessor.setUser(auth); - System.out.println("WebSocket user set: " + username); - } catch (Exception e) { - System.err.println("JWT validation failed: " + e.getMessage()); - } - } - } else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) { - System.out.println("WebSocket SUBSCRIBE to: " + accessor.getDestination()); - System.out.println("WebSocket user during subscribe: " + (accessor.getUser() != null ? accessor.getUser().getName() : "null")); - } - return message; - } - }); - } -} \ No newline at end of file diff --git a/backend/src/main/java/com/openisle/dto/MessageNotificationPayload.java b/backend/src/main/java/com/openisle/dto/MessageNotificationPayload.java new file mode 100644 index 000000000..72a4a143b --- /dev/null +++ b/backend/src/main/java/com/openisle/dto/MessageNotificationPayload.java @@ -0,0 +1,15 @@ +package com.openisle.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MessageNotificationPayload implements Serializable { + private String targetUsername; + private Object payload; +} \ No newline at end of file diff --git a/backend/src/main/java/com/openisle/model/Message.java b/backend/src/main/java/com/openisle/model/Message.java index 72edb3e2b..314c8f2ca 100644 --- a/backend/src/main/java/com/openisle/model/Message.java +++ b/backend/src/main/java/com/openisle/model/Message.java @@ -1,5 +1,6 @@ package com.openisle.model; +import com.fasterxml.jackson.annotation.JsonBackReference; import jakarta.persistence.*; import lombok.Getter; import lombok.NoArgsConstructor; @@ -20,6 +21,7 @@ public class Message { @ManyToOne(optional = false, fetch = FetchType.LAZY) @JoinColumn(name = "conversation_id") + @JsonBackReference private MessageConversation conversation; @ManyToOne(optional = false, fetch = FetchType.LAZY) diff --git a/backend/src/main/java/com/openisle/model/MessageConversation.java b/backend/src/main/java/com/openisle/model/MessageConversation.java index dfcda4e0c..638cfd39a 100644 --- a/backend/src/main/java/com/openisle/model/MessageConversation.java +++ b/backend/src/main/java/com/openisle/model/MessageConversation.java @@ -1,5 +1,7 @@ package com.openisle.model; +import com.fasterxml.jackson.annotation.JsonBackReference; +import com.fasterxml.jackson.annotation.JsonManagedReference; import jakarta.persistence.*; import lombok.Getter; import lombok.NoArgsConstructor; @@ -41,8 +43,10 @@ public class MessageConversation { private Message lastMessage; @OneToMany(mappedBy = "conversation", cascade = CascadeType.ALL, orphanRemoval = true) + @JsonBackReference private Set participants = new HashSet<>(); @OneToMany(mappedBy = "conversation", cascade = CascadeType.ALL, orphanRemoval = true) + @JsonBackReference private Set messages = new HashSet<>(); } \ No newline at end of file diff --git a/backend/src/main/java/com/openisle/model/MessageParticipant.java b/backend/src/main/java/com/openisle/model/MessageParticipant.java index d69901c8f..bb1805819 100644 --- a/backend/src/main/java/com/openisle/model/MessageParticipant.java +++ b/backend/src/main/java/com/openisle/model/MessageParticipant.java @@ -1,5 +1,6 @@ package com.openisle.model; +import com.fasterxml.jackson.annotation.JsonBackReference; import jakarta.persistence.*; import lombok.Getter; import lombok.NoArgsConstructor; @@ -19,6 +20,7 @@ public class MessageParticipant { @ManyToOne(optional = false, fetch = FetchType.LAZY) @JoinColumn(name = "conversation_id") + @JsonBackReference private MessageConversation conversation; @ManyToOne(optional = false, fetch = FetchType.LAZY) diff --git a/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java b/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java index d260c4f38..1c858af2f 100644 --- a/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java +++ b/backend/src/main/java/com/openisle/repository/MessageConversationRepository.java @@ -11,6 +11,9 @@ import java.util.List; @Repository public interface MessageConversationRepository extends JpaRepository { + + @Query("SELECT c FROM MessageConversation c LEFT JOIN FETCH c.participants p LEFT JOIN FETCH p.user WHERE c.id = :id") + java.util.Optional findByIdWithParticipantsAndUsers(@Param("id") Long id); @Query("SELECT c FROM MessageConversation c " + "WHERE c.channel = false AND size(c.participants) = 2 " + "AND EXISTS (SELECT 1 FROM c.participants p1 WHERE p1.user = :user1) " + diff --git a/backend/src/main/java/com/openisle/service/MessageService.java b/backend/src/main/java/com/openisle/service/MessageService.java index 9993cd8f3..2f451e46b 100644 --- a/backend/src/main/java/com/openisle/service/MessageService.java +++ b/backend/src/main/java/com/openisle/service/MessageService.java @@ -16,16 +16,18 @@ import com.openisle.dto.MessageDto; import com.openisle.dto.ReactionDto; import com.openisle.dto.UserSummaryDto; import com.openisle.mapper.ReactionMapper; +import com.openisle.dto.MessageNotificationPayload; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; -import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @Service @@ -37,7 +39,7 @@ public class MessageService { private final MessageConversationRepository conversationRepository; private final MessageParticipantRepository participantRepository; private final UserRepository userRepository; - private final SimpMessagingTemplate messagingTemplate; + private final NotificationProducer notificationProducer; private final ReactionRepository reactionRepository; private final ReactionMapper reactionMapper; @@ -69,26 +71,41 @@ public class MessageService { conversationRepository.save(conversation); log.info("Conversation {} updated with last message ID {}", conversation.getId(), message.getId()); - // Broadcast the new message to subscribed clients - MessageDto messageDto = toDto(message); - String conversationDestination = "/topic/conversation/" + conversation.getId(); - messagingTemplate.convertAndSend(conversationDestination, messageDto); - log.info("Message {} broadcasted to destination: {}", message.getId(), conversationDestination); - - // Also notify the recipient on their personal channel to update the conversation list - String userDestination = "/topic/user/" + recipient.getId() + "/messages"; - messagingTemplate.convertAndSend(userDestination, messageDto); - log.info("Message {} notification sent to destination: {}", message.getId(), userDestination); - - // Notify recipient of new unread count - long unreadCount = getUnreadMessageCount(recipientId); - log.info("Calculating unread count for user {}: {}", recipientId, unreadCount); - // Send using username instead of user ID for WebSocket routing - String recipientUsername = recipient.getUsername(); - messagingTemplate.convertAndSendToUser(recipientUsername, "/queue/unread-count", unreadCount); - log.info("Sent unread count {} to user {} (username: {}) via WebSocket destination: /user/{}/queue/unread-count", - unreadCount, recipientId, recipientUsername, recipientUsername); + try { + MessageDto messageDto = toDto(message); + + long unreadCount = getUnreadMessageCount(recipientId); + + // 创建包含对话和参与者信息的完整payload + Map conversationInfo = new HashMap<>(); + conversationInfo.put("id", conversation.getId()); + conversationInfo.put("participants", conversation.getParticipants().stream() + .map(p -> { + Map participantInfo = new HashMap<>(); + participantInfo.put("userId", p.getUser().getId()); + participantInfo.put("username", p.getUser().getUsername()); + return participantInfo; + }).collect(Collectors.toList())); + + Map combinedPayload = new HashMap<>(); + combinedPayload.put("message", messageDto); + combinedPayload.put("unreadCount", unreadCount); + combinedPayload.put("conversation", conversationInfo); + combinedPayload.put("senderId", senderId); + if (notificationProducer != null) { + log.info("NotificationProducer is available"); + } else { + log.info("ERROR: NotificationProducer is NULL!"); + return message; + } + log.info("Recipient username: {}", recipient.getUsername()); + + notificationProducer.sendNotification(new MessageNotificationPayload(recipient.getUsername(), combinedPayload)); + log.info("=== Notification call completed ==="); + } catch (Exception e) { + log.error("=== Error in notification process ===", e); + } return message; } @@ -97,7 +114,7 @@ public class MessageService { public Message sendMessageToConversation(Long senderId, Long conversationId, String content, Long replyToId) { User sender = userRepository.findById(senderId) .orElseThrow(() -> new IllegalArgumentException("Sender not found")); - MessageConversation conversation = conversationRepository.findById(conversationId) + MessageConversation conversation = conversationRepository.findByIdWithParticipantsAndUsers(conversationId) .orElseThrow(() -> new IllegalArgumentException("Conversation not found")); // Join the conversation if not already a participant (useful for channels) @@ -124,22 +141,30 @@ public class MessageService { conversationRepository.save(conversation); MessageDto messageDto = toDto(message); - String conversationDestination = "/topic/conversation/" + conversation.getId(); - messagingTemplate.convertAndSend(conversationDestination, messageDto); - // Notify all participants except sender for updates - for (MessageParticipant participant : conversation.getParticipants()) { - if (participant.getUser().getId().equals(senderId)) continue; - String userDestination = "/topic/user/" + participant.getUser().getId() + "/messages"; - messagingTemplate.convertAndSend(userDestination, messageDto); + // Build participant payloads once to avoid duplicate broadcasts + java.util.List> participantInfos = conversation.getParticipants().stream() + .filter(p -> !p.getUser().getId().equals(senderId)) + .map(p -> { + Map info = new HashMap<>(); + info.put("userId", p.getUser().getId()); + info.put("username", p.getUser().getUsername()); + info.put("unreadCount", getUnreadMessageCount(p.getUser().getId())); + info.put("channelUnread", getUnreadChannelCount(p.getUser().getId())); + return info; + }).collect(Collectors.toList()); - long unreadCount = getUnreadMessageCount(participant.getUser().getId()); - String username = participant.getUser().getUsername(); - messagingTemplate.convertAndSendToUser(username, "/queue/unread-count", unreadCount); + Map conversationInfo = new HashMap<>(); + conversationInfo.put("id", conversation.getId()); + conversationInfo.put("participants", participantInfos); - long channelUnread = getUnreadChannelCount(participant.getUser().getId()); - messagingTemplate.convertAndSendToUser(username, "/queue/channel-unread", channelUnread); - } + Map combinedPayload = new HashMap<>(); + combinedPayload.put("message", messageDto); + combinedPayload.put("conversation", conversationInfo); + combinedPayload.put("senderId", senderId); + + // Use sender's username for sharding; only one notification is needed + notificationProducer.sendNotification(new MessageNotificationPayload(sender.getUsername(), combinedPayload)); return message; } diff --git a/backend/src/main/java/com/openisle/service/NotificationProducer.java b/backend/src/main/java/com/openisle/service/NotificationProducer.java new file mode 100644 index 000000000..f64f1a6f9 --- /dev/null +++ b/backend/src/main/java/com/openisle/service/NotificationProducer.java @@ -0,0 +1,63 @@ +package com.openisle.service; + +import com.openisle.config.RabbitMQConfig; +import com.openisle.config.ShardInfo; +import com.openisle.config.ShardingStrategy; +import com.openisle.dto.MessageNotificationPayload; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +@Slf4j +public class NotificationProducer { + + private final RabbitTemplate rabbitTemplate; + private final ShardingStrategy shardingStrategy; + + @Value("${rabbitmq.sharding.enabled}") + private boolean shardingEnabled; + + public void sendNotification(MessageNotificationPayload payload) { + String targetUsername = payload.getTargetUsername(); + + try { + if (shardingEnabled) { + // 使用分片策略发送消息 + sendShardedNotification(payload, targetUsername); + } else { + // 使用原始单队列方式发送(向后兼容) + sendLegacyNotification(payload); + } + } catch (Exception e) { + log.error("Failed to send message to RabbitMQ for user: {}", targetUsername, e); + throw e; + } + } + + /** + * 使用分片策略发送消息 + */ + private void sendShardedNotification(MessageNotificationPayload payload, String targetUsername) { + ShardInfo shardInfo = shardingStrategy.getShardInfo(targetUsername); + rabbitTemplate.convertAndSend( + RabbitMQConfig.EXCHANGE_NAME, + shardInfo.getRoutingKey(), + payload + ); + } + + /** + * 使用原始单队列方式发送消息(向后兼容) + */ + private void sendLegacyNotification(MessageNotificationPayload payload) { + rabbitTemplate.convertAndSend( + RabbitMQConfig.EXCHANGE_NAME, + RabbitMQConfig.ROUTING_KEY, + payload + ); + } +} \ No newline at end of file diff --git a/backend/src/main/resources/application.properties b/backend/src/main/resources/application.properties index a245b0524..b0f9a0a79 100644 --- a/backend/src/main/resources/application.properties +++ b/backend/src/main/resources/application.properties @@ -87,6 +87,16 @@ app.website-url=${WEBSITE_URL:https://www.open-isle.com} app.webpush.public-key=${WEBPUSH_PUBLIC_KEY:} app.webpush.private-key=${WEBPUSH_PRIVATE_KEY:} +# RabbitMQ Configuration +spring.rabbitmq.host=${RABBITMQ_HOST:localhost} +spring.rabbitmq.port=${RABBITMQ_PORT:5672} +spring.rabbitmq.username=${RABBITMQ_USERNAME:guest} +spring.rabbitmq.password=${RABBITMQ_PASSWORD:guest} + +# RabbitMQ 队列配置 - 修改为非持久化以匹配现有队列 +rabbitmq.queue.durable=true +rabbitmq.sharding.enabled=true + # springdoc-openapi-starter-webmvc-api # see https://springdoc.org/#springdoc-openapi-core-properties springdoc.api-docs.path=/v3/api-docs diff --git a/frontend_nuxt/.env.dev.example b/frontend_nuxt/.env.dev.example index 3a38feb58..4f6562719 100644 --- a/frontend_nuxt/.env.dev.example +++ b/frontend_nuxt/.env.dev.example @@ -1,5 +1,6 @@ ; 本地部署后端 NUXT_PUBLIC_API_BASE_URL=https://127.0.0.1:8081 +NUXT_PUBLIC_WEBSOCKET_URL=https://127.0.0.1:8082 NUXT_PUBLIC_WEBSITE_BASE_URL=http://localhost:3000 NUXT_PUBLIC_GOOGLE_CLIENT_ID=777830451304-nt8afkkap18gui4f9entcha99unal744.apps.googleusercontent.com diff --git a/frontend_nuxt/.env.example b/frontend_nuxt/.env.example index a57f4362a..dadb36387 100644 --- a/frontend_nuxt/.env.example +++ b/frontend_nuxt/.env.example @@ -5,6 +5,9 @@ ; 生产环境后端 NUXT_PUBLIC_API_BASE_URL=https://open-isle.com +; 生产环境ws后端 +NUXT_PUBLIC_WEBSOCKET_URL=https://open-isle.com/websocket + ; 预发环境 ; NUXT_PUBLIC_WEBSITE_BASE_URL=https://staging.open-isle.com ; 正式环境/生产环境 diff --git a/frontend_nuxt/.env.production.example b/frontend_nuxt/.env.production.example index 273a2dff6..9a4b60b7b 100644 --- a/frontend_nuxt/.env.production.example +++ b/frontend_nuxt/.env.production.example @@ -3,6 +3,8 @@ NUXT_PUBLIC_API_BASE_URL=https://open-isle.com ; 正式环境/生产环境 NUXT_PUBLIC_WEBSITE_BASE_URL=https://open-isle.com +; 生产环境ws后端 +NUXT_PUBLIC_WEBSOCKET_URL=https://open-isle.com/websocket NUXT_PUBLIC_GOOGLE_CLIENT_ID=777830451304-nt8afkkap18gui4f9entcha99unal744.apps.googleusercontent.com NUXT_PUBLIC_GITHUB_CLIENT_ID=Ov23liVkO1NPAX5JyWxJ diff --git a/frontend_nuxt/.env.staging.example b/frontend_nuxt/.env.staging.example index 858d1e46b..b020df6fb 100644 --- a/frontend_nuxt/.env.staging.example +++ b/frontend_nuxt/.env.staging.example @@ -1,5 +1,12 @@ +; 本地部署后端 +; NUXT_PUBLIC_API_BASE_URL=https://127.0.0.1:8081 + ; 预发环境后端 NUXT_PUBLIC_API_BASE_URL=https://staging.open-isle.com + +; 预发环境ws后端 +NUXT_PUBLIC_WEBSOCKET_URL=https://staging.open-isle.com/websocket + ; 预发环境 NUXT_PUBLIC_WEBSITE_BASE_URL=https://staging.open-isle.com diff --git a/frontend_nuxt/composables/useChannelsUnreadCount.js b/frontend_nuxt/composables/useChannelsUnreadCount.js index 6db3d0c64..e684a0c53 100644 --- a/frontend_nuxt/composables/useChannelsUnreadCount.js +++ b/frontend_nuxt/composables/useChannelsUnreadCount.js @@ -3,82 +3,73 @@ import { useWebSocket } from './useWebSocket' import { getToken } from '~/utils/auth' const count = ref(0) -let isInitialized = false -let wsSubscription = null +let isInitialized = false; export function useChannelsUnreadCount() { - const config = useRuntimeConfig() - const API_BASE_URL = config.public.apiBaseUrl - const { subscribe, isConnected, connect } = useWebSocket() + const config = useRuntimeConfig(); + const API_BASE_URL = config.public.apiBaseUrl; + const { subscribe, isConnected, connect } = useWebSocket(); const fetchChannelUnread = async () => { - const token = getToken() + const token = getToken(); if (!token) { - count.value = 0 - return + count.value = 0; + return; } try { const response = await fetch(`${API_BASE_URL}/api/channels/unread-count`, { headers: { Authorization: `Bearer ${token}` }, - }) + }); if (response.ok) { - const data = await response.json() - count.value = data + const data = await response.json(); + count.value = data; } } catch (e) { - console.error('Failed to fetch channel unread count:', e) + console.error('Failed to fetch channel unread count:', e); } - } + }; + + const setupWebSocketListener = () => { + const destination = '/user/queue/channel-unread'; + + subscribe(destination, (message) => { + const unread = parseInt(message.body, 10); + if (!isNaN(unread)) { + count.value = unread; + } + }).then(subscription => { + if (subscription) { + console.log('频道未读消息订阅成功'); + } + }); + }; const initialize = () => { - const token = getToken() + const token = getToken(); if (!token) { - count.value = 0 - return + count.value = 0; + return; } - fetchChannelUnread() - if (!isConnected.value) { - connect(token) - } - setupWebSocketListener() - } - const setupWebSocketListener = () => { - if (!wsSubscription) { - watch( - isConnected, - (newValue) => { - if (newValue && !wsSubscription) { - wsSubscription = subscribe('/user/queue/channel-unread', (message) => { - const unread = parseInt(message.body, 10) - if (!isNaN(unread)) { - count.value = unread - } - }) - } - }, - { immediate: true }, - ) + if (!isConnected.value) { + connect(token); } - } + + fetchChannelUnread(); + setupWebSocketListener(); + }; const setFromList = (channels) => { - count.value = Array.isArray(channels) ? channels.filter((c) => c.unreadCount > 0).length : 0 - } + count.value = Array.isArray(channels) ? channels.filter((c) => c.unreadCount > 0).length : 0; + }; - const hasUnread = computed(() => count.value > 0) + const hasUnread = computed(() => count.value > 0); - const token = getToken() - if (token) { - if (!isInitialized) { - isInitialized = true - initialize() - } else { - fetchChannelUnread() - if (!isConnected.value) { - connect(token) - } - setupWebSocketListener() + if (!isInitialized) { + const token = getToken(); + if (token) { + isInitialized = true; + initialize(); } } @@ -88,5 +79,5 @@ export function useChannelsUnreadCount() { fetchChannelUnread, initialize, setFromList, - } + }; } diff --git a/frontend_nuxt/composables/useUnreadCount.js b/frontend_nuxt/composables/useUnreadCount.js index 381b67d6a..677083aea 100644 --- a/frontend_nuxt/composables/useUnreadCount.js +++ b/frontend_nuxt/composables/useUnreadCount.js @@ -4,7 +4,6 @@ import { getToken } from '~/utils/auth'; const count = ref(0); let isInitialized = false; -let wsSubscription = null; export function useUnreadCount() { const config = useRuntimeConfig(); @@ -30,64 +29,48 @@ export function useUnreadCount() { } }; - const initialize = async () => { + const setupWebSocketListener = () => { + console.log('设置未读消息订阅...'); + const destination = '/user/queue/unread-count'; + + subscribe(destination, (message) => { + const unreadCount = parseInt(message.body, 10); + if (!isNaN(unreadCount)) { + count.value = unreadCount; + } + }).then(subscription => { + if (subscription) { + console.log('未读消息订阅成功'); + } + }); + }; + + const initialize = () => { const token = getToken(); if (!token) { count.value = 0; return; } - // 总是获取最新的未读数量 - fetchUnreadCount(); - - // 确保WebSocket连接 if (!isConnected.value) { connect(token); } - // 设置WebSocket监听 - await setupWebSocketListener(); + fetchUnreadCount(); + setupWebSocketListener(); }; - const setupWebSocketListener = async () => { - // 只有在还没有订阅的情况下才设置监听 - if (!wsSubscription) { - - watch(isConnected, (newValue) => { - if (newValue && !wsSubscription) { - const destination = `/user/queue/unread-count`; - wsSubscription = subscribe(destination, (message) => { - const unreadCount = parseInt(message.body, 10); - if (!isNaN(unreadCount)) { - count.value = unreadCount; - } - }); - } - }, { immediate: true }); - } - }; - - // 自动初始化逻辑 - 确保每次调用都能获取到未读数量并设置监听 - const token = getToken(); - if (token) { - if (!isInitialized) { + if (!isInitialized) { + const token = getToken(); + if (token) { isInitialized = true; - initialize(); // 完整初始化,包括WebSocket监听 - } else { - // 即使已经初始化,也要确保获取最新的未读数量并确保WebSocket监听存在 - fetchUnreadCount(); - - // 确保WebSocket连接和监听都存在 - if (!isConnected.value) { - connect(token); - } - setupWebSocketListener(); + initialize(); } } return { count, fetchUnreadCount, - initialize, + initialize, }; } \ No newline at end of file diff --git a/frontend_nuxt/composables/useWebSocket.js b/frontend_nuxt/composables/useWebSocket.js index c07d3a4ea..18a95b15f 100644 --- a/frontend_nuxt/composables/useWebSocket.js +++ b/frontend_nuxt/composables/useWebSocket.js @@ -1,86 +1,182 @@ -import { ref } from 'vue' +import { ref, readonly, watch } from 'vue' import { Client } from '@stomp/stompjs' import SockJS from 'sockjs-client/dist/sockjs.min.js' import { useRuntimeConfig } from '#app' const client = ref(null) const isConnected = ref(false) +const activeSubscriptions = ref(new Map()) +// Store callbacks to allow for re-subscription after reconnect +const resubscribeCallbacks = new Map() + +// Helper for unified subscription logging +const logSubscriptionActivity = (action, destination, subscriptionId = 'N/A') => { + console.log( + `[SUB_MAN] ${action} | Dest: ${destination} | SubID: ${subscriptionId} | Active: ${activeSubscriptions.value.size}` + ) +} const connect = (token) => { - if (isConnected.value) { + if (isConnected.value || (client.value && client.value.active)) { return } - const config = useRuntimeConfig() - const API_BASE_URL = config.public.apiBaseUrl - const socketUrl = `${API_BASE_URL}/api/sockjs` - const socket = new SockJS(socketUrl) + const config = useRuntimeConfig() + const WEBSOCKET_URL = config.public.websocketUrl + const socketUrl = `${WEBSOCKET_URL}/api/sockjs` + const stompClient = new Client({ - webSocketFactory: () => socket, + webSocketFactory: () => new SockJS(socketUrl), connectHeaders: { Authorization: `Bearer ${token}`, }, - debug: function (str) {}, - reconnectDelay: 5000, + debug: function (str) { + + }, + reconnectDelay: 10000, heartbeatIncoming: 4000, heartbeatOutgoing: 4000, }) stompClient.onConnect = (frame) => { isConnected.value = true + resubscribeCallbacks.forEach((callback, destination) => { + doSubscribe(destination, callback) + }) } stompClient.onStompError = (frame) => { - console.error('WebSocket STOMP error:', frame) + console.error('Full frame:', frame) + } + + stompClient.onWebSocketError = (event) => { + + } + + stompClient.onWebSocketClose = (event) => { + isConnected.value = false; + activeSubscriptions.value.clear(); + logSubscriptionActivity('Cleared all subscriptions due to WebSocket close', 'N/A'); + }; + + stompClient.onDisconnect = (frame) => { + isConnected.value = false } stompClient.activate() client.value = stompClient } +const unsubscribe = (destination) => { + if (!destination) { + return false + } + const subscription = activeSubscriptions.value.get(destination) + if (subscription) { + try { + subscription.unsubscribe() + logSubscriptionActivity('Unsubscribed', destination, subscription.id) + } catch (e) { + console.error(`Error during unsubscribe for ${destination}:`, e) + } finally { + activeSubscriptions.value.delete(destination) + resubscribeCallbacks.delete(destination) + } + return true + } else { + return false + } +} + +const unsubscribeAll = () => { + logSubscriptionActivity('Unsubscribing from ALL', `Total: ${activeSubscriptions.value.size}`) + const destinations = [...activeSubscriptions.value.keys()] + destinations.forEach(dest => { + unsubscribe(dest) + }) +} + const disconnect = () => { + unsubscribeAll() if (client.value) { - isConnected.value = false - client.value.deactivate() + try { + client.value.deactivate() + } catch (e) { + console.error('Error during client deactivation:', e) + } client.value = null + isConnected.value = false + } +} + +const doSubscribe = (destination, callback) => { + try { + if (!client.value || !client.value.connected) { + return null + } + + if (activeSubscriptions.value.has(destination)) { + unsubscribe(destination) + } + + const subscription = client.value.subscribe(destination, (message) => { + callback(message) + }) + + if (subscription) { + activeSubscriptions.value.set(destination, subscription) + resubscribeCallbacks.set(destination, callback) // Store for re-subscription + logSubscriptionActivity('Subscribed', destination, subscription.id) + return subscription + } else { + return null + } + } catch (error) { + console.error(`Exception during subscription to ${destination}:`, error) + return null } } const subscribe = (destination, callback) => { - if (!isConnected.value || !client.value || !client.value.connected) { - return null + if (!destination) { + return Promise.resolve(null) } - try { - const subscription = client.value.subscribe(destination, (message) => { - try { - if ( - destination.includes('/queue/unread-count') || - destination.includes('/queue/channel-unread') - ) { - callback(message) - } else { - const parsedMessage = JSON.parse(message.body) - callback(parsedMessage) + return new Promise((resolve) => { + if (client.value && client.value.connected) { + const sub = doSubscribe(destination, callback) + resolve(sub) + } else { + const unwatch = watch(isConnected, (newVal) => { + if (newVal) { + setTimeout(() => { + const sub = doSubscribe(destination, callback) + unwatch() + resolve(sub) + }, 100) } - } catch (error) { - callback(message) - } - }) - - return subscription - } catch (error) { - return null - } + }, { immediate: false }) + + setTimeout(() => { + unwatch() + if (!isConnected.value) { + resolve(null) + } + }, 15000) + } + }) } export function useWebSocket() { return { - client, + client: readonly(client), isConnected, connect, disconnect, subscribe, + unsubscribe, + unsubscribeAll, + activeSubscriptions: readonly(activeSubscriptions), } } diff --git a/frontend_nuxt/nuxt.config.ts b/frontend_nuxt/nuxt.config.ts index 6084395b5..ec01e7cb4 100644 --- a/frontend_nuxt/nuxt.config.ts +++ b/frontend_nuxt/nuxt.config.ts @@ -6,6 +6,7 @@ export default defineNuxtConfig({ runtimeConfig: { public: { apiBaseUrl: process.env.NUXT_PUBLIC_API_BASE_URL || '', + websocketUrl: process.env.NUXT_PUBLIC_WEBSOCKET_URL || '', websiteBaseUrl: process.env.NUXT_PUBLIC_WEBSITE_BASE_URL || '', googleClientId: process.env.NUXT_PUBLIC_GOOGLE_CLIENT_ID || '', githubClientId: process.env.NUXT_PUBLIC_GITHUB_CLIENT_ID || '', diff --git a/frontend_nuxt/pages/message-box/[id].vue b/frontend_nuxt/pages/message-box/[id].vue index 546add168..a6a8f5d14 100644 --- a/frontend_nuxt/pages/message-box/[id].vue +++ b/frontend_nuxt/pages/message-box/[id].vue @@ -100,10 +100,9 @@ import BasePlaceholder from '~/components/BasePlaceholder.vue' const config = useRuntimeConfig() const route = useRoute() const API_BASE_URL = config.public.apiBaseUrl -const { connect, disconnect, subscribe, isConnected } = useWebSocket() +const { connect, subscribe, unsubscribe, isConnected } = useWebSocket() const { fetchUnreadCount: refreshGlobalUnreadCount } = useUnreadCount() const { fetchChannelUnread: refreshChannelUnread } = useChannelsUnreadCount() -let subscription = null const messages = ref([]) const participants = ref([]) @@ -338,8 +337,12 @@ onMounted(async () => { // 初次进入频道时,平滑滚动到底部 scrollToBottomSmooth() const token = getToken() - if (token && !isConnected.value) { - connect(token) + if (token) { + if (isConnected.value) { + subscribeToConversation() + } else { + connect(token) + } } } else { toast.error('请先登录') @@ -347,26 +350,39 @@ onMounted(async () => { } }) +const subscribeToConversation = () => { + if (!currentUser.value) return; + const destination = `/topic/conversation/${conversationId}` + + subscribe(destination, async (message) => { + try { + const parsedMessage = JSON.parse(message.body) + + if (parsedMessage.sender && parsedMessage.sender.id === currentUser.value.id) { + return + } + + messages.value.push({ + ...parsedMessage, + src: parsedMessage.sender.avatar, + iconClick: () => openUser(parsedMessage.sender.id), + }) + + await markConversationAsRead() + await nextTick() + + if (isUserNearBottom.value) { + scrollToBottomSmooth() + } + } catch (e) { + console.error("Failed to parse websocket message", e) + } + }) +} + watch(isConnected, (newValue) => { if (newValue) { - setTimeout(() => { - subscription = subscribe(`/topic/conversation/${conversationId}`, async (message) => { - // 避免重复显示当前用户发送的消息 - if (message.sender.id !== currentUser.value.id) { - messages.value.push({ - ...message, - src: message.sender.avatar, - iconClick: () => { - openUser(message.sender.id) - }, - }) - // 收到消息后只标记已读,不强制滚动(符合“非发送不拉底”) - markConversationAsRead() - await nextTick() - updateNearBottom() - } - }) - }, 500) + subscribeToConversation() } }) @@ -378,7 +394,12 @@ onActivated(async () => { await nextTick() scrollToBottomSmooth() updateNearBottom() - if (!isConnected.value) { + + if (isConnected.value) { + // 如果已连接,重新订阅 + subscribeToConversation() + } else { + // 如果未连接,则发起连接 const token = getToken() if (token) connect(token) } @@ -386,22 +407,17 @@ onActivated(async () => { }) onDeactivated(() => { - if (subscription) { - subscription.unsubscribe() - subscription = null - } - disconnect() + const destination = `/topic/conversation/${conversationId}` + unsubscribe(destination) }) onUnmounted(() => { - if (subscription) { - subscription.unsubscribe() - subscription = null - } + const destination = `/topic/conversation/${conversationId}` + unsubscribe(destination) + if (messagesListEl.value) { messagesListEl.value.removeEventListener('scroll', updateNearBottom) } - disconnect() }) function minimize() { diff --git a/frontend_nuxt/pages/message-box/index.vue b/frontend_nuxt/pages/message-box/index.vue index 4c52f9d86..735a5597b 100644 --- a/frontend_nuxt/pages/message-box/index.vue +++ b/frontend_nuxt/pages/message-box/index.vue @@ -118,7 +118,7 @@