当我们在做数据库分库分表或者是分布式缓存时,不可避免的都会遇到一个问题:
如何将数据均匀的分散到各个节点中,并且尽量的在加减节点时能使受影响的数据最少

1. Hash 取模

随机放置就不说了,会带来很多问题。通常最容易想到的方案就是 hash 取模了。

可以将传入的 Key 按照 index = hash(key) % N 这样来计算出需要存放的节点。其中 hash 函数是一个将字符串转换为正整数的哈希映射方法,N 就是节点的数量。

这样可以满足数据的均匀分配,但是这个算法的容错性和扩展性都较差。

比如增加或删除了一个节点时,所有的 Key 都需要重新计算,显然这样成本较高,为此需要一个算法满足分布均匀同时也要有良好的容错性和拓展性。

2. 经典一致 Hash 算法

2.1一致性Hash算法背景

一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分布式Cache中提出的,设计目标是为了解决因特网中的热点(Hot spot)问题,初衷和CARP十分类似。一致性哈希修正了CARP使用的简单哈希算法带来的问题,使得DHT可以在P2P环境中真正得到应用。

但现在一致性hash算法在分布式系统中也得到了广泛应用,研究过memcached缓存数据库的人都知道,memcached服务器端本身不提供分布式cache的一致性,而是由客户端来提供,具体在计算一致性hash时采用如下步骤:

  1. 首先求出memcached服务器(节点)的哈希值,并将其配置到0~232的圆(continuum)上。
  2. 然后采用同样的方法求出存储数据的键的哈希值,并映射到相同的圆上。
  3. 然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过232仍然找不到服务器,就会保存到第一台memcached服务器上。
    image-1649828710444

从上图的状态中添加一台memcached服务器。余数分布式算法由于保存键的服务器会发生巨大变化而影响缓存的命中率,但Consistent Hashing中,只有在园(continuum)上增加服务器的地点逆时针方向的第一台服务器上的键会受到影响,如下图所示:
image-1649828728551

2.2一致性Hash性质

考虑到分布式系统每个节点都有可能失效,并且新的节点很可能动态的增加进来,如何保证当系统的节点数目发生变化时仍然能够对外提供良好的服务,这是值得考虑的,尤其实在设计分布式缓存系统时,如果某台服务器失效,对于整个系统来说如果不采用合适的算法来保证一致性,那么缓存于系统中的所有数据都可能会失效(即由于系统节点数目变少,客户端在请求某一对象时需要重新计算其hash值(通常与系统中的节点数目有关),由于hash值已经改变,所以很可能找不到保存该对象的服务器节点),因此一致性hash就显得至关重要,良好的分布式cahce系统中的一致性hash算法应该满足以下几个方面:

平衡性(Balance)

平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件。

单调性(Monotonicity)

单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲区加入到系统中,那么哈希的结果应能够保证原有已分配的内容可以被映射到新的缓冲区中去,而不会被映射到旧的缓冲集合中的其他缓冲区。简单的哈希算法往往不能满足单调性的要求,如最简单的线性哈希:x = (ax + b) mod §,在上式中,P表示全部缓冲的大小。不难看出,当缓冲大小发生变化时(从P1到P2),原来所有的哈希结果均会发生变化,从而不满足单调性的要求。哈希结果的变化意味着当缓冲空间发生变化时,所有的映射关系需要在系统内全部更新。而在P2P系统内,缓冲的变化等价于Peer加入或退出系统,这一情况在P2P系统中会频繁发生,因此会带来极大计算和传输负荷。单调性就是要求哈希算法能够应对这种情况。

分散性(Spread)

在分布式环境中,终端有可能看不到所有的缓冲,而是只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应能够尽量避免不一致的情况发生,也就是尽量降低分散性。

负载(Load)

负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷。

平滑性(Smoothness)

平滑性是指缓存服务器的数目平滑改变和缓存对象的平滑改变是一致的。

2.3原理

一致性哈希将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为0-232-1(即哈希值是一个32位无符号整形),整个哈希空间环如下:
image-1649828873768

整个空间按顺时针方向组织。0和232-1在零点中方向重合。

下一步将各个服务器使用Hash进行一个哈希,具体可以选择服务器的ip或主机名作为关键字进行哈希,这样每台机器就能确定其在哈希环上的位置,这里假设将上文中四台服务器使用ip地址哈希后在环空间的位置如下:
image-1649828892362

接下来使用如下算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。

例如我们有Object A、Object B、Object C、Object D四个数据对象,经过哈希计算后,在环空间上的位置如下:
image-1649828913729

根据一致性哈希算法,数据A会被定为到Node A上,B被定为到Node B上,C被定为到Node C上,D被定为到Node D上。

下面分析一致性哈希算法的容错性和可扩展性。现假设Node C不幸宕机,可以看到此时对象A、B、D不会受到影响,只有C对象被重定位到Node D。一般的,在一致性哈希算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响。

下面考虑另外一种情况,如果在系统中增加一台服务器Node X,如下图所示:
image-1649828927294

此时对象Object A、B、D不受影响,只有对象C需要重定位到新的Node X 。一般的,在一致性哈希算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响。

综上所述,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。

另外,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。例如系统中只有两台服务器,其环分布如下:
image-1649828945582

此时必然造成大量数据集中到Node A上,而只有极少量会定位到Node B上。为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器ip或主机名的后面增加编号来实现。例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “Node A#1”、“Node A#2”、“Node A#3”、“Node B#1”、“Node B#2”、“Node B#3”的哈希值,于是形成六个虚拟节点:
image-1649828987361

同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到“Node A#1”、“Node A#2”、“Node A#3”三个虚拟节点的数据均定位到Node A上。这样就解决了服务节点少时数据倾斜的问题。在实际应用中,通常将虚拟节点数设置为32甚至更大,因此即使很少的服务节点也能做到相对均匀的数据分布。

3.无法解决缓存命中率及单一热点问题

一致性哈希解决的是某节点宕机后缓存失效的问题,只会导致相邻节点负载增加。但是因为宕机后需要重新从数据库读取,会导致此时缓存命中率下降及db压力增加。
也无法避免单一热点问题。某一数据被海量请求,不论怎么哈希,哈希环多大,数据只存在一个节点,早晚有被打垮的时候。

此时的解决策略是每个节点主备或主主集群。

4.数据库分表使用一致性哈希

如果数据量在一开始预计会有很大的提升,为了以后扩容(分表)时影响的数据最小化,可以使用一致性哈希算法进行分表处理。使用一致性哈希算法解决分数据存储问题时,需要将增加结点时的数据迁移考虑进去,而如果只是为了服务器的负载均衡就不需要了。

下面是一个实践的例子,有一定参考价值

3.1一致性Hash算法实现

一致性Hash算法的hash部分,采用了著名的ketama算法

public long hash(String key) {
    if (md5 == null) {
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException("no md5 algorythm found");
        }
    }

    md5.reset();
    md5.update(key.getBytes());
    byte[] bKey = md5.digest();

    long res = ((long) (bKey[3] & 0xFF) << 24) | 
                     ((long) (bKey[2] & 0xFF) << 16) | 
                     ((long) (bKey[1] & 0xFF) << 8) | 
                     (long) (bKey[0] & 0xFF);
    return res & 0xffffffffL;
}

有了Hash的算法,接下来就要构造Hash环了。Hash环采用的SortedMap数据结构实现。

private final SortedMap<Long, T> circle = new TreeMap<Long, T>();

其中添加节点和移除节点部分,需要根据hash算法得到节点在环上的位置,具体代码如下:

/**
 * 添加虚拟节点
 * numberOfReplicas为虚拟节点的数量,初始化hash环的时候传入,我们使用300个虚拟节点
 * @param node
 */
public void add(T node) {
    for (int i = 0; i < numberOfReplicas; i++) {
        circle.put(hashFunction.hash(node.toString() + i), node);
    }
}

/**
 * 移除节点
 * @param node
 */
public void remove(T node) {
    for (int i = 0; i < numberOfReplicas; i++) {
        circle.remove(hashFunction.hash(node.toString() + i));
    }
}

而hash环中得到节点部分比较特殊,根据一致性hash算法的介绍,得到hash环中的节点,实际上是计算出的hash值顺时针找到的第一个节点。

 /**
 * 获得一个最近的顺时针节点
 * @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
 * @return
 */
public T get(Object key) {
    if (circle.isEmpty()) {
        return null;
    }
    long hash = hashFunction.hash((String) key);
    if (!circle.containsKey(hash)) {
        //返回此映射的部分视图,其键大于等于 hash
        SortedMap<Long, T> tailMap = circle.tailMap(hash); 
        hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
    }
    return circle.get(hash);
}

3.2单表拆分实践

上面完成了一致性hash算法的实现,包含了hash算法和hash环的实现。接下来就要处理具体业务中,如何使用这个hash环和算法了。

我们业务中,主要操作这张表的数据,也就是增删查。然后我们数据库拆分成了3个,所以需要增删查的操作基本一致,都是先通过一致性hash得到库,再通过一致性hash得到表。

获取数据库名的操作如下,获取到数据库后,根据数据库名到对应的连接池中获取连接。

/**
 * 根据试验信息id获取其所在库名
 * DatabaseType为我们数据的枚举
 * @return 数据库的名称
 **/
private String getDataBase(String experimentMessageId) {
    //获取数据源
    DatabaseType[] databasetype = DatabaseType.values();
    List<String> dataBaselist = new ArrayList<>();
    Map<String, DatabaseType> map = new HashMap<>();

    for (DatabaseType d:databasetype) {
        if (!d.equals(DatabaseType.KC)) {
            dataBaselist.add(d.toString());
            map.put(d.toString(), d);
        }
    }
    //获取数据源hash
    ConsistentHash<String> dataBaseCon = getConsistentHash(dataBaselist);

    //获取id所在数据源
    String dataBase = dataBaseCon.get(experimentMessageId);
    return dataBase;
}

获取表名的操作如下,获取到数据库后,在对应的数据库中找到需要的表,再从该表中查询数据。

/**
 * 根据试验信息id获取其试验数据所在表
 * @return
 **/
public String getTableName(String experimentMessageId) {
    String dataBase = getDataBase(experimentMessageId);
    //查询所有试验数据表
    List<String> tables = experimentDataEODao.queryTbaleNames(dataBase, tableName);
    ConsistentHash<String> consistentHash = getConsistentHash(tables);
    String tableName = consistentHash.get(experimentMessageId);
    return tableName;
}

剩下的增删改操作和平常一致,在此不多赘述。

3.3数据迁移实践

一致性hash势必涉及到数据迁移问题,我们采取的数据迁移方式为定时任务,针对每个数据库在每天夜里全量扫描一次。检查是否有数据量超过1000万的表,若存在这样的表,就把现有的表数量double。
数据迁移只会在同库之间迁移,不会涉及跨数据库的情况。
此方案为初步方案,后续会改进的更加智能,根据表的数量,增加不同数量的表。而不是简单的把表数量翻倍。
表创建后,将需要迁移的表数据逐个迁移。

在连接到数据源后,我们做了如下事情进行数据迁移

  1. 获取库中所有的表
 List<String> tables = getTables(connection, p, d.toString());
  1. 遍历表,检查表中数据是否超过边界线(我们为1000万)
 for (int i = 0; i < tables.size(); i++) {
    //查询表内数据量
    int num = countByTableName(connection, p, tables.get(i));
    //finalNum为边界值,此处为1000万
    if (num > finalNum) {
        ……
    }
    ……
}
  1. 根据所有的表计算现有的虚拟节点
ConsistentHash<String> consistentHashOld = getConsistentHash(tables);
  1. 把表加倍
List<String> tablesNew = deepCopy(tables); //注意一定要采用深复制
int tableSize = tablesNew.size();
for (int y = 0; y < tableSize; y++) {
    String tableNameNew = tableName + (tablesNew.size() + 1);
    //创建表
    createTable(connection, p, d.toString(), tableNameNew);
    tablesNew.add(tableNameNew);
    tableDelete.add(tableNameNew);
}
  1. 计算加倍后的虚拟节点
ConsistentHash<String> consistentHashNew = getConsistentHash(tablesNew);
  1. 数据迁移
for (int z = 0; z < tableSize; z++) {
    String tableNameOld = tablesNew.get(z);
    //查询试验信息id不重复的试验数据信息
    List<String> disData = selectExperimentIdDis(connection, p, tableNameOld);
    List<String> deleteList = new LinkedList<>();
    for (String experimentId : disData) {
        //如果数据hash计算 原所在表与新建表之后不一致,执行转移
        if (!consistentHashNew.get(experimentId).equals(consistentHashOld.get(experimentId))) {

            //新增到新表数据
            insertHash(connection, p, experimentId, consistentHashOld.get(experimentId),
            consistentHashNew.get(experimentId));

            //删除数据集合
            deleteList.add(experimentId);
            //删除旧表数据
            final int defaultDelNum = 1000;
            if (deleteList.size() == defaultDelNum) {
                deleteInbatch(connection, p, deleteList, tableNameOld);
                deleteList.clear();
            }
        }
    }

    //删除旧表数据
    if (deleteList.size() > 0) {
        deleteInbatch(connection, p, deleteList, tableNameOld);
    }
}

3.4总结

以上为我们所做的一致性hash实践,其中还存在很多问题,比如迁移过程单线程导致迁移较慢、自动扩容机制不智能、迁移过程中数据访问不稳定等情况。

一致性哈希算法原理
一致性Hash算法在数据库分表中的实践