多个客户端在互相发送和接收消息的时候,通常会使用以下两种方式来传递消息。第一种时消息推送,也就是由消息发送者来确保所有接受者已经成功接收到了消息,Redis内置了PUBLISH和SUBSCRIBE命令可以实现;第二种时消息拉取,这种方法要求接收者自己去获取存储在某种mailbox里的消息。尽管消息推送非常有用,但是客户端因为某些原因而没办法一直保持在线的时候,采用这一消息传递方法的程序就会出现各种各样的问题。本章节将尝试编写不同的消息拉取方式,来替代PUBLISH和SUBSCRIBE。
假设现在打算开发一个移动通信程序,这个应用通过连接服务器来发送和接受雷系短信或彩信的消息,基本上就是一个文字短信和图片彩信的替代品。每条消息都只会被发送至一个客户端,这一点极大地简化了我们要解决的问题。可以为每个移动客户端使用一个列表结构,发送者会把消息放到接受者的列表中,而接受者客户端则通过发送请求来获取最新的消息。数据格式如下:
该示例的代码实现比较简单,之前也已经学过如何对列表进行推入和弹出操作,这里就不再给出代码实现了。
单个接受者的消息传递已经满足不了需求,现在要实现一个群组聊天功能,和之前一样,因为应用程序的客户端可能会载人和时候进行连接或者断开连接,所以还是不能使用内置的PUBLISH和SUBSCRIBE。
每个新创建的群组都会有一些初始用户,每个用户都可以按照自己的意愿来参加或者离开群组。群组使用有序集合来记录参加群组的用户,其中有序集合的成员为用户的名字,分值时用户在群组内接收到的最大消息ID。用户也会使用有序集合来记录自己参加的所有群组,其中有序集合的成员为群组ID,分值是用户在群组内接收到的最大消息ID。数据格式如下:
以上数据例子表示jason和jeff都参加了001群组,其中用户jason看了6条群组消息中的5条。
publicStringcreateChat(String sender, Set<String> recipients, String message){
//通过全局计数器来获取一个新的群组ID
Long chatId = stringRedisTemplate.opsForValue().increment("ids:chat:", 1l);
//将发送者也添加到群组成员中
recipients.add(sender);
stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
operations.multi();
recipients.forEach(recipient -> {
//将用户添加到群组中,并将这些用户在群组中最大已读消息ID初始化为0
stringRedisTemplate.opsForZSet().add("chat:" + chatId, recipient, 0);
//将群组ID添加到用户已参加群组的有序集合中
stringRedisTemplate.opsForZSet().add("member:" + recipient, String.valueOf(chatId),0);
});
return operations.exec();
}
});
//发送一条初始化消息
return "";
}
publicStringsendMessage(String chatId, String sender, String message){
// 使用锁来消除竞争条件,保证消息的读取和插入的顺序一致
String identifier = timeoutLockService.acquireLockWithTimeout("chat:" + chatId);
if (identifier == null){
throw new RuntimeException("Couldn't get the lock");
}
try{
//获取消息ID
Long messageId = stringRedisTemplate.opsForValue().increment("ids:message:" + chatId, 1l);
//将消息添加到消息有序集合中
JSONObject values = new JSONObject();
values.put("id", messageId);
values.put("ts", System.currentTimeMillis());
values.put("sender", sender);
values.put("message", message);
stringRedisTemplate.opsForZSet().add("msgs:" + chatId, values.toJSONString(), messageId);
}finally {
firstLockService.releaseLock("chat:" + chatId, identifier);
}
return chatId;
}
publicvoidfetchPendingMessages(String recipient){
// 获取组员的群组ID以及在各组中目前收到的消息的最大ID
Set<ZSetOperations.TypedTuple<String>> memberSet =
stringRedisTemplate.opsForZSet().rangeWithScores("member:" + recipient, 0, -1);
// 获取各聊天组未读消息(分值大于上面获取的最大消息ID)
List<Object> results = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
operations.multi();
memberSet.forEach(member -> {
String chatId = member.getValue();
double messageId = member.getScore();
operations.opsForZSet().rangeByScore("msgs:" + chatId, ++messageId, Double.MAX_VALUE);
});
return operations.exec();
}
});
//遍历未读消息
stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
operations.multi();
int i = 0;
for(ZSetOperations.TypedTuple<String> member : memberSet){
Set<String> messages = (Set<String>) results.get(i++);
System.out.println("聊天组:" + member.getValue() + ",有如下未读消息");
messages.forEach(message ->
System.out.println(JSONObject.parseObject(message).getString("message")));
//修改群组成员读取的最大消息ID
operations.opsForZSet().incrementScore(
"member:" + recipient,
member.getValue(),
messages.size());
//修改群组有序集合中成员读取的最大消息ID
operations.opsForZSet().incrementScore(
"chat:" + member.getValue(),
recipient,
messages.size());
}
return operations.exec();
}
});
}