Redis分了了 主从模式 和 集群模式 。
主从模式即使用一个Redis实例作为主机(Master),其余的实例作为备份机(Slave),Master支持写入和读取等各种操作,Slave支持读操作和与Master同步数据。主从模式的核心思想是读写分离,数据冗余存储和HA,Master节点出现问题,可以通过Redis Sentinel做到主从切换。
Sentinel 系统用于管理多个 Redis 服务器(instance), 该系统执行以下三个任务:
Redis主从模式虽然很强大,但是其单Master的架构,当遇到 单机内存 、 并发 、 流量 等瓶颈时便束手无策,Redis集群的出现就是为了解决主从模式所遇到的问题。在Redis Cluster面世之前,业界为了解决Redis这个问题,也出现了一些优秀的Redis集群解决方案,比如 Twemproxy 和 Codis ,如果大家感兴趣,可以去学习,本文不再比较各自的优劣。
摘抄自参考文档3,该文作者已经有了很好的总结。
分布式数据库首先要解决把 整个数据集 按照 分区规则 映射到 多个节点 的问题,每个节点负责 整体数据 的一个子集。
数据分布通常有 哈希分区 和 顺序分区 两种方式,对比如下:
| 分区方式 | 特点 | 相关产品 |
|---|---|---|
| 哈希分区 | 离散程度好,数据分布与业务无关,无法顺序访问 | Redis Cluster,Cassandra,Dynamo |
| 顺序分区 | 离散程度易倾斜,数据分布与业务相关,可以顺序访问 | BigTable,HBase,Hypertable |
由于 Redis Cluster 采用 哈希分区规则 ,这里重点讨论 哈希分区 。常见的哈希分区规则有几种:
节点取余分区:使用特定的数据,如 Redis的键或用户ID,再根据节点数量N使用公式: hash(key)% N
计算出 哈希值,用来决定数据 映射 到哪一个节点上。这种方式简单实用,常用语数据库 分库分表
,一般采用预分区的方式,提前按预估的数据量规划好分区数。缺点也很明显,当节点数量发生变化时,比如发生 扩容
或 缩容
时,数据节点的 映射关系
需要重新计算,会导致数据的重新迁移。
一致性哈希分区: 一致性哈希 可以很好的解决稳定性问题,可以将所有的 存储节点 排列在首尾相接的 Hash环 上,每个key在计算Hash后 顺时针 找到临接的存储节点存放。当有节点 加入 或 退出 时,仅影响该节点在hash环上顺时针相邻的后续节点。加入和删除节点,只影响哈希环中顺时针方向的相邻的节点,对其他节点无影响,但是还是会造成哈希环中部分数据无法命中。当使用少量节点时,节点变化将大范围影响哈希环中的数据映射, 不适合少量数据节点的分布式方案 。 普通的一致性哈希分区在增减节点时,需要增加一倍或减去一半节点,才能保证数据和负载的均衡 。
虚拟槽分区:虚拟槽分区巧妙的使用了哈希空间,使用分散度良好的哈希函数把所有数据映射到一个固定范围的整数集合中,整数定义为 槽(slot) 。 这个范围一般远远大于节点数 ,比如Redis Cluster的槽范围是0~16383。槽是集群内数据管理和迁移的基本单位。采用大范围槽的主要目的是为了方便数据拆分和集群扩展。每个节点会负责一定数量的槽。 由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽数量,都不会造成集群不可用的状态 。
Redis虚拟槽分区的特点:
Redis
集群相对 单机
在功能上存在一些限制,需要 开发人员
提前了解,在使用时做好规避。
key
批量操作
支持有限。
类似 mset
、 mget
操作,目前只支持对具有相同 slot
值的 key
执行 批量操作
。对于 映射为不同
slot
值的 key
由于执行 mget
、 mget
等操作可能存在于多个节点上,因此不被支持。
key
事务操作
支持有限。
只支持 多
key
在 同一节点上
的 事务操作
,当多个 key
分布在 不同
的节点上时 无法
使用事务功能。
key
作为 数据分区
的最小粒度
不能将一个 大的键值
对象如 hash
、 list
等映射到 不同的节点
。
单机下的 Redis
可以支持 16
个数据库( db0 ~ db15
), 集群模式
下只能使用 一个
数据库空间,即 db0
。
从节点只能复制 主节点 ,不支持 嵌套树状复制 结构。
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.util.*;
public class RedisCacheDelegate extends AbstractCache implements CacheManager, Cache {
private static Logger logger = LoggerFactory.getLogger(RedisCacheDelegate.class);
/**
* 集群节点
*/
private String clusterNodes;
/**
* 重试次数
*/
private int maxAttempts;
/**
* 超时时间,单位是秒
*/
private int timeout;
private JedisCluster jedisCluster;
private JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
private static RedisCacheDelegate redisCacheDelegate = null;
private JedisClusterInfoCache cache;
private final static String WARM_KEY = "warm_key";
private final static String WARM_VALUE = "value";
public static RedisCacheDelegate getInstant(CacheProperties cacheProperties) {
if (redisCacheDelegate == null) {
synchronized (RedisCacheDelegate.class) {
if (redisCacheDelegate == null) {
redisCacheDelegate = new RedisCacheDelegate(cacheProperties.getNodes(), cacheProperties.getTimeout(), cacheProperties.getMaxAttempts());
}
}
}
return redisCacheDelegate;
}
private RedisCacheDelegate(String clusterNodes, int timeout, int maxAttempts) {
this.clusterNodes = clusterNodes;
this.timeout = timeout;
this.maxAttempts = maxAttempts;
init();
}
private JedisPoolConfig getJedisPoolConfig() {
//连接最长等待时间,默认是-1
jedisPoolConfig.setMaxWaitMillis(200);
//连接池最大数量
jedisPoolConfig.setMaxTotal(50);
//最小闲置个数 闲置超过最小闲置个数但不超过最大闲置个数,则逐步清理闲置直到最小闲置个数
jedisPoolConfig.setMinIdle(10);
//最大闲置个数 闲置超过最大闲置个数则直接杀死超过部分
jedisPoolConfig.setMaxIdle(30);
//连接耗尽等待,等待最长{MaxWaitMillis}毫秒
jedisPoolConfig.setBlockWhenExhausted(true);
//是否开启jmx监控
jedisPoolConfig.setJmxEnabled(true);
//是否开启空闲资源监测
jedisPoolConfig.setTestWhileIdle(true);
//空闲资源的检测周期(单位为毫秒)
jedisPoolConfig.setMinEvictableIdleTimeMillis(60000);
//资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30000);
//做空闲资源检测时,每次的采样数,如果设置为-1,就是对所有连接做空闲监测
jedisPoolConfig.setNumTestsPerEvictionRun(-1);
return jedisPoolConfig;
}
@Override
public void init() {
String[] serverArray = clusterNodes.split(",");
Set<HostAndPort> nodes = new HashSet<>();
for (String ipPort : serverArray) {
String[] ipPortPair = ipPort.split(":");
nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}
jedisCluster = new JedisCluster(nodes, timeout * 1000, maxAttempts, getJedisPoolConfig());
MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
warm();
}
/**
* warm the jedis pool
*/
@Override
public void warm() {
set(WARM_KEY, WARM_VALUE, 60);
for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {
ttl(WARM_KEY);
}
}
@Override
public void set(String key, String value) {
jedisCluster.set(key, value);
}
@Override
public void set(String key, String value, int expiredTime) {
jedisCluster.setex(key, expiredTime, value);
}
@Override
public void mSet(Map<String, String> data) {
if (data != null && data.size() > 0) {
data.forEach((key, value) -> jedisCluster.set(key, value));
}
}
@Override
public void mSetPipLine(Map<String, String> data) {
setPipLine(data, 0);
}
private void setPipLine(Map<String, String> data, int expiredTime) {
if (data.size() < 1) {
return;
}
//保存地址+端口和命令的映射
Map<JedisPool, Map<String, String>> jedisPoolMap = new HashMap<>();
JedisPool currentJedisPool = null;
for (String key : data.keySet()) {
//计算哈希槽
int crc = JedisClusterCRC16.getSlot(key);
//通过哈希槽获取节点的连接
currentJedisPool = cache.getSlotPool(crc);
if (jedisPoolMap.containsKey(currentJedisPool)) {
jedisPoolMap.get(currentJedisPool).put(key, data.get(key));
} else {
Map<String, String> inner = new HashMap<>();
inner.put(key, data.get(key));
jedisPoolMap.put(currentJedisPool, inner);
}
}
//保存结果
Map<String, String> map = null;
//执行
for (Map.Entry<JedisPool, Map<String, String>> entry : jedisPoolMap.entrySet()) {
try {
currentJedisPool = entry.getKey();
map = entry.getValue();
Jedis jedis = currentJedisPool.getResource();
//获取pipeline
Pipeline currentPipeline = jedis.pipelined();
// NX是不存在时才set, XX是存在时才set, EX是秒,PX是毫秒
if (expiredTime > 0) {
map.forEach((k, v) -> currentPipeline.setex(k, expiredTime, v));
} else {
map.forEach((k, v) -> currentPipeline.set(k, v));
}
//从pipeline中获取结果
currentPipeline.sync();
currentPipeline.close();
jedis.close();
} catch (Exception e) {
logger.error("setPipline error.", e);
}
}
}
@Override
public void mSet(Map<String, String> data, int expiredTime) {
if (data != null && data.size() > 0) {
data.forEach((key, value) -> jedisCluster.setex(key, expiredTime, value));
}
}
@Override
public void mSetPipLine(Map<String, String> data, int expiredTime) {
setPipLine(data, expiredTime);
}
@Override
public String get(String key) {
return jedisCluster.get(key);
}
@Override
public List<String> mGet(List<String> keys) {
if (keys.size() < 1) {
return null;
}
List<String> result = new ArrayList<>(keys.size());
for (String key : keys) {
result.add(jedisCluster.get(key));
}
return result;
}
@Override
public List<String> mGetPipLine(List<String> key) {
return getPipLine(key);
}
@Override
public long ttl(String key) {
return jedisCluster.ttl(key);
}
private List<String> getPipLine(List<String> keys) {
if (keys.size() < 1) {
return null;
}
List<String> result = new ArrayList<>(keys.size());
Map<String, String> resultMap = new HashMap<>(keys.size());
if (keys.size() == 1) {
result.add(jedisCluster.get(keys.get(0)));
return result;
}
//保存地址+端口和命令的映射
Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
List<String> keyList = null;
JedisPool currentJedisPool = null;
Pipeline currentPipeline = null;
for (String key : keys) {
//cuteculate hash
int crc = JedisClusterCRC16.getSlot(key);
//通过哈希槽获取节点的连接
currentJedisPool = cache.getSlotPool(crc);
if (jedisPoolMap.containsKey(currentJedisPool)) {
jedisPoolMap.get(currentJedisPool).add(key);
} else {
keyList = new ArrayList<>();
keyList.add(key);
jedisPoolMap.put(currentJedisPool, keyList);
}
}
//保存结果
List<Object> res;
//执行
for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {
try {
currentJedisPool = entry.getKey();
keyList = entry.getValue();
//获取pipeline
Jedis jedis = currentJedisPool.getResource();
currentPipeline = jedis.pipelined();
for (String key : keyList) {
currentPipeline.get(key);
}
//从pipeline中获取结果
res = currentPipeline.syncAndReturnAll();
currentPipeline.close();
jedis.close();
for (int i = 0; i < keyList.size(); i++) {
resultMap.put(keyList.get(i), res.get(i) == null ? null : res.get(i).toString());
}
} catch (Exception e) {
logger.error("getPipLine error.", e);
}
}
//sort
for (String key : keys) {
result.add(resultMap.containsKey(key) ? resultMap.get(key) : null);
}
return result;
}
@Override
public void destroy() {
try {
jedisCluster.close();
} catch (Exception e) {
}
}
}
复制代码