Fix duplicate WebSocket broadcasts

This commit is contained in:
Tim
2025-09-04 13:50:05 +08:00
parent 70a83cbe06
commit 47a72dc9b0
2 changed files with 27 additions and 31 deletions

View File

@@ -141,35 +141,30 @@ public class MessageService {
conversationRepository.save(conversation); conversationRepository.save(conversation);
MessageDto messageDto = toDto(message); MessageDto messageDto = toDto(message);
String conversationDestination = "/topic/conversation/" + conversation.getId();
for (MessageParticipant participant : conversation.getParticipants()) { // Build participant payloads once to avoid duplicate broadcasts
if (participant.getUser().getId().equals(senderId)) continue; java.util.List<Map<String, Object>> participantInfos = conversation.getParticipants().stream()
.filter(p -> !p.getUser().getId().equals(senderId))
long unreadCount = getUnreadMessageCount(participant.getUser().getId()); .map(p -> {
long channelUnread = getUnreadChannelCount(participant.getUser().getId()); Map<String, Object> info = new HashMap<>();
info.put("userId", p.getUser().getId());
Map<String, Object> combinedPayload = new HashMap<>(); info.put("username", p.getUser().getUsername());
combinedPayload.put("message", messageDto); info.put("unreadCount", getUnreadMessageCount(p.getUser().getId()));
info.put("channelUnread", getUnreadChannelCount(p.getUser().getId()));
return info;
}).collect(Collectors.toList());
Map<String, Object> conversationInfo = new HashMap<>(); Map<String, Object> conversationInfo = new HashMap<>();
conversationInfo.put("id", conversation.getId()); conversationInfo.put("id", conversation.getId());
conversationInfo.put("participants", conversation.getParticipants().stream() conversationInfo.put("participants", participantInfos);
.filter(item -> participant.getUser().getId().equals(item.getUser().getId()))
.map(p -> {
Map<String, Object> participantInfo = new HashMap<>();
participantInfo.put("userId", p.getUser().getId());
participantInfo.put("username", p.getUser().getUsername());
return participantInfo;
}).collect(Collectors.toList()));
Map<String, Object> combinedPayload = new HashMap<>();
combinedPayload.put("message", messageDto);
combinedPayload.put("conversation", conversationInfo); combinedPayload.put("conversation", conversationInfo);
combinedPayload.put("senderId", senderId); combinedPayload.put("senderId", senderId);
combinedPayload.put("unreadCount", unreadCount);
combinedPayload.put("channelUnread", channelUnread);
notificationProducer.sendNotification(new MessageNotificationPayload(participant.getUser().getUsername(), combinedPayload)); // Use sender's username for sharding; only one notification is needed
} notificationProducer.sendNotification(new MessageNotificationPayload(sender.getUsername(), combinedPayload));
return message; return message;
} }

View File

@@ -79,15 +79,16 @@ public class NotificationListener {
messagingTemplate.convertAndSend(userDestination, messageObj); messagingTemplate.convertAndSend(userDestination, messageObj);
log.info("Message notification sent to destination: {}", userDestination); log.info("Message notification sent to destination: {}", userDestination);
// 发送未读数量 // 优先从 participant 中获取未读信息,兼容旧格式
if (payloadMap.containsKey("unreadCount")) { Object unreadCount = participant.getOrDefault("unreadCount", payloadMap.get("unreadCount"));
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/unread-count", payloadMap.get("unreadCount")); if (unreadCount != null) {
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/unread-count", unreadCount);
log.info("Sent unread count to user {} via /user/{}/queue/unread-count", participantUsername, participantUsername); log.info("Sent unread count to user {} via /user/{}/queue/unread-count", participantUsername, participantUsername);
} }
// 发送频道未读数量(如果有) Object channelUnread = participant.getOrDefault("channelUnread", payloadMap.get("channelUnread"));
if (payloadMap.containsKey("channelUnread")) { if (channelUnread != null) {
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/channel-unread", payloadMap.get("channelUnread")); messagingTemplate.convertAndSendToUser(participantUsername, "/queue/channel-unread", channelUnread);
log.info("Sent channel-unread to {}", participantUsername); log.info("Sent channel-unread to {}", participantUsername);
} }
} }