@@ -1,5 +1,6 @@
package com.lld.im.service.message.service ;
import cn.hutool.core.collection.CollUtil ;
import com.alibaba.fastjson.JSONObject ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper ;
@@ -40,10 +41,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations ;
import org.springframework.stereotype.Service ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.* ;
import java.util.stream.Collectors ;
/**
@@ -163,13 +161,9 @@ public class MessageSyncService {
return ResponseVO . successResponse ( resp ) ;
}
public List < Object > listSession ( SyncReq req ) {
List < OfflineMessageContent > listOffline Message = listOffline Message ( req ) ;
List < ImMessageHistoryEntity > listReadMessage = listReadMessage ( req ) ;
Map < String , Long > collect = listReadMessage . stream ( ) . collect ( Collectors . toMap ( ImMessageHistoryEntity : : getOwnerId , ImMessageHistoryEntity : : getMessageKey ) ) ;
return null ;
public List < ImMessageHistoryVo > listSession ( SyncReq req ) {
List < ImMessageHistoryVo > listRead Message = listRead Message ( req ) ;
return listReadMessage ;
}
public List < ImMessageHistoryVo > chatHistory ( ChatHistoryReq chatHistoryReq ) {
@@ -208,73 +202,76 @@ public class MessageSyncService {
}
/**
* 获取离线消息
* @param req
* @return
*/
public List < OfflineMessageContent > listOfflineMessage ( SyncReq req ) {
String key = req . getAppId ( ) + " : " + Constants . RedisConstants . OfflineMessage + " : " + req . getOperater ( ) ;
//获取最大的seq
Long maxSeq = 0L ;
ZSetOperations zSetOperations = redisTemplate . opsForZSet ( ) ;
Set set = zSetOperations . reverseRangeWithScores ( key , 0 , 0 ) ;
if ( ! CollectionUtils . isEmpty ( set ) ) {
List list = new ArrayList ( set ) ;
DefaultTypedTuple o = ( DefaultTypedTuple ) list . get ( 0 ) ;
maxSeq = o . getScore ( ) . longValue ( ) ;
}
List < OfflineMessageContent > respList = new ArrayList < > ( ) ;
Set < ZSetOperations . TypedTuple > querySet = zSetOperations . rangeByScoreWithScores ( key ,
req . getLastSequence ( ) , maxSeq , 0 , - 1 ) ;
for ( ZSetOperations . TypedTuple < String > typedTuple : querySet ) {
String value = typedTuple . getValue ( ) ;
OfflineMessageContent offlineMessageContent = JSONObject . parseObject ( value , OfflineMessageContent . class ) ;
respList . add ( offlineMessageContent ) ;
}
//
// List<OfflineMessageContent> imMessageBodyIds = new ArrayList<>();
// Map<String, OfflineMessageContent> toIdMap = respList.stream().collect(Collectors.toMap(OfflineMessageContent::getToId, x -> x));
// Map<String, Optional<OfflineMessageContent>> toIdMap2 = respList.stream().
// collect(Collectors.groupingBy(x -> x.getFromId() + "-"+ x.getToId(),Collectors.maxBy(Comparator.comparingLong(OfflineMessageContent::getMessageKey))));
// for (OfflineMessageContent offlineMessageContent : respList) {
// OfflineMessageContent messageKey1 = toIdMap.get(offlineMessageContent.getToId());
// if (messageKey1 != null){
// imMessageBodyIds.add(messageKey1.getMessageKey() > offlineMessageContent.getMessageKey() ? messageKey1 : offlineMessageContent);
// }
// }
return respList ;
}
/**
* 获取已读消息
* @param req
* @return
*/
public List < ImMessageHistoryEntity > listReadMessage ( SyncReq req ) {
List < ImMessageHistoryEntity > imMessageHistoryEntities = imMessageHistoryMapper . selectMessageByOwnId ( req . getOperater ( ) ) ;
List < Long > imMessageBodyIds = new ArrayList < > ( ) ;
public List < ImMessageHistoryVo > listReadMessage ( SyncReq req ) {
List < ImMessageHistoryVo > sessionVos = new ArrayList < > ( ) ;
Map < String , Long > fromIdMap = imMessageHistoryEntities . stream ( )
List < ImMessageHistoryEntity > imMessageHistoryEntities = imMessageHistoryMapper . selectMessageByOwnId ( req . getOperater ( ) ) ;
// map fromId去除本人
Map < String , ImMessageHistoryEntity > fromIdMap = imMessageHistoryEntities . stream ( )
. filter ( x - > ! StringUtils . equals ( x . getFromId ( ) , req . getOperater ( ) ) )
. collect ( Collectors . toMap ( ImMessageHistoryEntity : : getFromId , ImMessageHistoryEntity : : getMessageKey ) ) ;
Map < String , Long > toIdMap = imMessageHistoryEntities . stream ( )
. collect ( Collectors . toMap ( ImMessageHistoryEntity : : getFromId , x - > x ) ) ;
// map toId去除本人
Map < String , ImMessageHistoryEntity > toIdMap = imMessageHistoryEntities . stream ( )
. filter ( x - > ! StringUtils . equals ( x . getToId ( ) , req . getOperater ( ) ) )
. collect ( Collectors . toMap ( ImMessageHistoryEntity : : getToId , ImMessageHistoryEntity : : getMessageKey ) ) ;
. collect ( Collectors . toMap ( ImMessageHistoryEntity : : getToId , x - > x ) ) ;
// 合并map到toId
fromIdMap . forEach ( ( key , value ) - > {
toIdMap . merge ( key , value , ( x , y ) - > {
if ( value > y ) return value ;
if ( value . getMessageKey ( ) > y . getMessageKey ( ) ) return value ;
return y ;
} ) ;
} ) ;
List < ImMessageBodyEntity > imMessageBodyEntities = imMessageBodyMapper . selectList ( new LambdaQueryWrapper < ImMessageBodyEntity > ( ) . in ( ImMessageBodyEntity : : getMessageKey , imMessageBodyIds ) ) ;
return null ;
Collection < Long > messageKeys = toIdMap . values ( ) . stream ( ) . map ( ImMessageHistoryEntity : : getMessageKey ) . collect ( Collectors . toList ( ) ) ;
if ( CollUtil . isEmpty ( messageKeys ) ) {
return sessionVos ;
}
List < ImMessageBodyEntity > imMessageBodyEntities = imMessageBodyMapper . selectList (
new LambdaQueryWrapper < ImMessageBodyEntity > ( ) . in ( ImMessageBodyEntity : : getMessageKey , messageKeys ) ) ;
Map < Long , String > collect = imMessageBodyEntities . stream ( )
. collect ( Collectors . toMap ( ImMessageBodyEntity : : getMessageKey , ImMessageBodyEntity : : getMessageBody ) ) ;
// 设置消息体
List < ImMessageHistoryEntity > collect1 = toIdMap . values ( ) . stream ( ) . collect ( Collectors . toList ( ) ) ;
for ( ImMessageHistoryEntity imMessageHistoryEntity : collect1 ) {
ImMessageHistoryEntity imMessageHistoryEntity1 = imMessageHistoryMapper . selectOne ( new LambdaQueryWrapper < ImMessageHistoryEntity > ( )
. eq ( ImMessageHistoryEntity : : getMessageKey , imMessageHistoryEntity . getMessageKey ( ) )
. eq ( ImMessageHistoryEntity : : getOwnerId , req . getOperater ( ) ) ) ;
ImMessageHistoryVo imMessageHistoryVo = BeanCopyUtils . copy ( imMessageHistoryEntity1 , ImMessageHistoryVo . class ) ;
String s = collect . get ( imMessageHistoryEntity . getMessageKey ( ) ) ;
imMessageHistoryVo . setMessageBody ( s ) ;
sessionVos . add ( imMessageHistoryVo ) ;
}
// 设置未读数量( 未读消息都是fromId为别人的)
String key = req . getAppId ( ) + " : " + Constants . RedisConstants . OfflineMessage + " : " + req . getOperater ( ) ;
ZSetOperations zSetOperations = redisTemplate . opsForZSet ( ) ;
Set < String > querySet = zSetOperations . range ( key , 0 , - 1 ) ;
for ( ImMessageHistoryVo sessionVo : sessionVos ) {
for ( String value : querySet ) {
OfflineMessageContent offlineMessageContent = JSONObject . parseObject ( value , OfflineMessageContent . class ) ;
if ( ( StringUtils . equals ( sessionVo . getFromId ( ) , offlineMessageContent . getFromId ( ) ) & &
StringUtils . equals ( sessionVo . getToId ( ) , offlineMessageContent . getToId ( ) ) ) | |
( StringUtils . equals ( sessionVo . getFromId ( ) , offlineMessageContent . getToId ( ) ) & &
StringUtils . equals ( sessionVo . getToId ( ) , offlineMessageContent . getFromId ( ) ) ) ) {
sessionVo . setNum ( sessionVo . getNum ( ) + 1 ) ;
}
}
}
sessionVos . sort ( new Comparator < ImMessageHistoryVo > ( ) {
@Override
public int compare ( ImMessageHistoryVo o1 , ImMessageHistoryVo o2 ) {
return o1 . getCreateTime ( ) . compareTo ( o2 . getCreateTime ( ) ) ;
}
} ) ;
return sessionVos ;
}
//修改历史消息的状态