背景

为什么需要一致性hash算法。假设有一种场景:我们有3台服务器,分别为node0、node1、node2,有3000个缓存存在这三台服务器上,通常使用哈希取模的方法:hash(key)%N,运算后映射到对应集群中的节点。然后进行存取。功能上实现没有问题。但是这种方案在集群扩容和收缩的时候有一定的局限性:当服务器数量N发生变化时hash(key)%N也发生变化,导致整个集群所有的缓存数据需要重新计算调整,可能导致缓存大面积失效造成缓存雪崩。为了解决这个问题诞生了一致性哈希算法。

简介

  1. 解决问题

    解决分布式集群中动态伸缩的问题,降低节点上下线的过程中带来的数据迁移成本。

  2. 应用场景

    • RPC框架Dubbo用来选择服务提供者

    • 分布式关系数据库分库分表:数据与节点的映射关系

    • LVS负载均衡调度器

原理

本质上也是一种取模算法,只不过之前介绍的取模算法是按照节点数量取模,而一致性hash算法是对固定的2^32取模,这使得一致性hash算法具备良好的单调性:因为运算和节点数量无关,所以不管集群中有多少个节点,只要key固定,那所请求的服务器节点也是童颜固定的。原理如下:

  1. 一致性hash算法将整个哈希值空间映射成一个虚拟的圆环,整个哈希空间的取值范围为0~2^32-1

  2. 计算各服务器节点的哈希值,将其映射到圆环上

  3. 将请求的key值计算哈希值

  4. 将key对应的哈希值映射到哈希环上,同时沿顺时针方向查找,遇到的第一个节点就是所对应的处理请求的节点

  5. 新增或删除一个节点,受影响的数据仅仅是新添加或删除的服务器到其环空间中前一个节点(也就是逆时针方向遇到的第一台服务器)之间的数据,其他都不会受到到影响

综上所述,一致性哈希算法对于节点的增减都只需要把重定位环空间中的一笑部分数据,具有较好的容错性和可扩展性。

数据倾斜与虚拟节点

  1. 什么是数据倾斜

    哈希计算有着随机性,可能会导致数据倾斜,比如三个节点的hash值可能离得很近,间隔很小,这导致按顺时针寻找节点时,第一个节点被找到的概率更大,上面的数据更多。即使三个节点在hash环上分布均匀,如果其中1个节点下线,会导致另一个节点上的所有数据全部映射到另一个节点,导致数据倾斜。这导致某个节点数据和请求压力过大可能导致崩溃,进而引起集群的崩溃。

  2. 如何解决数据倾斜

    为了解决数据倾斜的问题,一致性hash算法引入了虚拟节点机制,即对每一个真实节点映射多个虚拟节点,并将这些虚拟节点计算哈希值并映射到哈希环上,当请求到某个虚拟节点后,再将其映射到真实节点,虚拟节点越多,哈希环上的节点就越多,数据分布就越均匀,从而避免了数据倾斜的问题。

代码实现

节点

public class Node {
    public static final int VIRTUAL_NODE_NUM = 100;
    private final Map<Object, Object> cache = new ConcurrentHashMap<>();
    @Getter
    private String ip;
    @Getter
    private final List<Integer> virtualNodeHash = new ArrayList<>();

    public Node(String ip) {
        this.ip = ip;
        init();
    }

    private void init() {
        for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
            String key = (ip + "#" + i);
            int hashCode = HashUtils.hashcode(key);
            virtualNodeHash.add(hashCode);
        }
    }

    public Object get(Object key) {
        return cache.get(key);
    }

    public void put(Object key, Object value) {
        cache.put(key, value);
    }

    public void evict(Object key) {
        cache.remove(key);
    }

    @Override
    public String toString() {
        return "Node{" +
                "ip='" + ip + '\'' +
                ", cache=" + cache.size() +
                '}';
    }
}

一致性hash算法

public class ConsistentHash {
    private List<Node> nodes = new ArrayList<>();
    private TreeMap<Integer, Node> nodeMap = new TreeMap<>();

    public void addNode(String ip) {
        Node node = new Node(ip);
        nodes.add(node);
        List<Integer> virtualNodeHash = node.getVirtualNodeHash();
        virtualNodeHash.forEach(hash -> nodeMap.put(hash, node));
    }

    public void removeNode(String ip) {
        Node node = new Node(ip);
        nodes.remove(node);
        List<Integer> virtualNodeHash = node.getVirtualNodeHash();
        virtualNodeHash.forEach(hash -> nodeMap.remove(hash));
    }

    public Object get(Object key) {
        Node node = findNode(key);
        return node.get(key);
    }

    public void put(Object key, Object value) {
        Node node = findNode(key);
        node.put(key, value);
    }

    public void evict(Object key) {
        Node node = findNode(key);
        node.evict(key);
    }

    private Node findNode(Object key) {
        int hashCode = HashUtils.hashcode(key);
        Map.Entry<Integer, Node> entry = nodeMap.ceilingEntry(hashCode);
        if (entry == null) {
            entry = nodeMap.firstEntry();
        }
        return entry.getValue();
    }
}

hash工具类

public static int hashcode(Object obj) {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        String str = obj.toString();
        for (int i = 0; i < str.length(); i++)
            hash = (hash ^ str.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;

        if (hash < 0)
            hash = Math.abs(hash);
        //System.out.println("hash computer:" + hash);
        return hash;
    }
}

示例

public class Demo {
    public static final int NODE_SIZE = 10;
    public static final int STRING_COUNT = 100 * 100;
    private static final ConsistentHash consistentHash = new ConsistentHash();
    private static final List<String> sList = new ArrayList<>();

    public static void main(String[] args) {
        Random random = new Random();
        // 增加节点
        for (int i = 0; i < NODE_SIZE; i++) {
            String ip = "10.0.0." + i;
            consistentHash.addNode(ip);
        }

        // 生成需要缓存的数据;
        for (int i = 0; i < STRING_COUNT; i++) {
            sList.add(UUID.randomUUID().toString().substring(0, 8));
        }

        // 将数据放入到缓存中。
        for (String s : sList) {
            consistentHash.put(s, s);
        }

        for (int i = 0; i < 10; i++) {
            int index = random.nextInt(STRING_COUNT);
            String key = sList.get(index);
            String cache = (String) consistentHash.get(key);
            System.out.println("Random:" + index + ",key:" + key + ",consistentHash get value:" + cache + ",value is:" + key.equals(cache));
        }

        // 输出节点及数据分布情况
        for (Node node : consistentHash.getNodes()) {
            System.out.println(node);
        }

        // 新增一个数据节点
        consistentHash.addNode("10.2.1.110");
        for (int i = 0; i < 10; i++) {
            int index = random.nextInt(STRING_COUNT);
            String key = sList.get(index);
            String cache = (String) consistentHash.get(key);
            System.out.println("Random:" + index + ",key:" + key + ",consistentHash get value:" + cache + ",value is:" + key.equals(cache));
        }

        // 输出节点及数据分布情况
        for (Node node : consistentHash.getNodes()) {
            System.out.println(node);
        }
    }
}