Merge pull request #878 from nagisa77/codex/fix-duplicate-message-forwarding-issue

Fix duplicate WebSocket broadcasts
This commit is contained in:
Tim
2025-09-04 13:50:24 +08:00
committed by GitHub
2 changed files with 27 additions and 31 deletions

View File

@@ -141,35 +141,30 @@ public class MessageService {
conversationRepository.save(conversation);
MessageDto messageDto = toDto(message);
String conversationDestination = "/topic/conversation/" + conversation.getId();
for (MessageParticipant participant : conversation.getParticipants()) {
if (participant.getUser().getId().equals(senderId)) continue;
long unreadCount = getUnreadMessageCount(participant.getUser().getId());
long channelUnread = getUnreadChannelCount(participant.getUser().getId());
// Build participant payloads once to avoid duplicate broadcasts
java.util.List<Map<String, Object>> participantInfos = conversation.getParticipants().stream()
.filter(p -> !p.getUser().getId().equals(senderId))
.map(p -> {
Map<String, Object> info = new HashMap<>();
info.put("userId", p.getUser().getId());
info.put("username", p.getUser().getUsername());
info.put("unreadCount", getUnreadMessageCount(p.getUser().getId()));
info.put("channelUnread", getUnreadChannelCount(p.getUser().getId()));
return info;
}).collect(Collectors.toList());
Map<String, Object> combinedPayload = new HashMap<>();
combinedPayload.put("message", messageDto);
Map<String, Object> conversationInfo = new HashMap<>();
conversationInfo.put("id", conversation.getId());
conversationInfo.put("participants", participantInfos);
Map<String, Object> conversationInfo = new HashMap<>();
conversationInfo.put("id", conversation.getId());
conversationInfo.put("participants", conversation.getParticipants().stream()
.filter(item -> participant.getUser().getId().equals(item.getUser().getId()))
.map(p -> {
Map<String, Object> participantInfo = new HashMap<>();
participantInfo.put("userId", p.getUser().getId());
participantInfo.put("username", p.getUser().getUsername());
return participantInfo;
}).collect(Collectors.toList()));
Map<String, Object> combinedPayload = new HashMap<>();
combinedPayload.put("message", messageDto);
combinedPayload.put("conversation", conversationInfo);
combinedPayload.put("senderId", senderId);
combinedPayload.put("conversation", conversationInfo);
combinedPayload.put("senderId", senderId);
combinedPayload.put("unreadCount", unreadCount);
combinedPayload.put("channelUnread", channelUnread);
notificationProducer.sendNotification(new MessageNotificationPayload(participant.getUser().getUsername(), combinedPayload));
}
// Use sender's username for sharding; only one notification is needed
notificationProducer.sendNotification(new MessageNotificationPayload(sender.getUsername(), combinedPayload));
return message;
}

View File

@@ -79,15 +79,16 @@ public class NotificationListener {
messagingTemplate.convertAndSend(userDestination, messageObj);
log.info("Message notification sent to destination: {}", userDestination);
// 发送未读数量
if (payloadMap.containsKey("unreadCount")) {
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/unread-count", payloadMap.get("unreadCount"));
// 优先从 participant 中获取未读信息,兼容旧格式
Object unreadCount = participant.getOrDefault("unreadCount", payloadMap.get("unreadCount"));
if (unreadCount != null) {
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/unread-count", unreadCount);
log.info("Sent unread count to user {} via /user/{}/queue/unread-count", participantUsername, participantUsername);
}
// 发送频道未读数量(如果有)
if (payloadMap.containsKey("channelUnread")) {
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/channel-unread", payloadMap.get("channelUnread"));
Object channelUnread = participant.getOrDefault("channelUnread", payloadMap.get("channelUnread"));
if (channelUnread != null) {
messagingTemplate.convertAndSendToUser(participantUsername, "/queue/channel-unread", channelUnread);
log.info("Sent channel-unread to {}", participantUsername);
}
}