jedis源码分析


类图

创建redisClient BinaryJedisCluster

我们可以从jedis给出官方的redis-clusterdemo上可以看到通过构造一个BinaryJedisCluster,这个类就是jedis给我们提供的一个与redis-cluster集群交互的一个基本接口类,有了这个类我们就能执行各种查询写入操作了。

Set<HostAndPort> set = new HashSet<>();
HostAndPort hostAndPort = new HostAndPort("127.0.0.1", 7000);
set.add(hostAndPort);
BinaryJedisCluster client = new BinaryJedisCluster(set);
  • 我们从BinaryJedisCluster的构造函数一路debug下去,可以看到这个类有一个JedisClusterConnectionHandler成员变量,在该类的构造函数当中,会去给这个成员变量赋值。进而会调用JedisSlotBasedConnectionHandler类的构造方法,又会调用到父类的构造方法。
  • JedisClusterConnectionHandler类中有一个JedisClusterInfoCache成员变量,在该类的构造函数中,会给JedisClusterInfoCache初始化。JedisSlotBasedConnectionHandler这个类是该类的子类,实现了抽象父类的getConnection,getConnectionFromSlot(int)两个方法。
  • JedisClusterInfoCache这个类缓存了redis集群的集群分片信息,我们下面来看下。
abstract class JedisClusterConnectionHandler{
    protected final JedisClusterInfoCache cache;
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      try {
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }
}

public class JedisClusterInfoCache {
    private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
    private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
    
    public void discoverClusterNodesAndSlots(Jedis jedis) {
        w.lock();
        try {
          reset();
          List<Object> slots = jedis.clusterSlots();
    
          for (Object slotInfoObj : slots) {
            List<Object> slotInfo = (List<Object>) slotInfoObj;
            if (slotInfo.size() <= MASTER_NODE_INDEX) {
              continue;
            }
            List<Integer> slotNums = getAssignedSlotArray(slotInfo);
            // hostInfos
            int size = slotInfo.size();
            for (int i = MASTER_NODE_INDEX; i < size; i++) {
              List<Object> hostInfos = (List<Object>) slotInfo.get(i);
              if (hostInfos.size() <= 0) {
                continue;
              }
              HostAndPort targetNode = generateHostAndPort(hostInfos);
              setupNodeIfNotExist(targetNode);
              if (i == MASTER_NODE_INDEX) {
                assignSlotsToNode(slotNums, targetNode);
              }
            }
          }
        } finally {
          w.unlock();
        }
    }
}

可以看到,JedisClusterInfoCache的discoverClusterNodesAndSlots方法通过任意一个jedis实例然后调用jedis.clusterSlots()拿到redis集群分配的slot信息,然后把slot信息以及hostinfo信息缓存到两个对应的JedisPool map里面,这样启动阶段基本完成了。

通过BinaryJedisCluster与redis-cluster交互

我们通过以下代码入手

BinaryJedisCluster client = new BinaryJedisCluster(set);
client.get("key".getBytes());

public class BinaryJedisCluster{
    @Override
  public byte[] get(final byte[] key) {
    return new JedisClusterCommand<byte[]>(connectionHandler, maxAttempts) {
      @Override
      public byte[] execute(Jedis connection) {
        return connection.get(key);
      }
    }.runBinary(key);
  }
}

上述通过一个简单的get操作,最终会调用到BinaryJedisCluster.runBinary(key)方法。

public abstract class JedisClusterCommand<T> {
    private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
    if (attempts <= 0) {
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {
        // TODO: Pipeline asking with the original command to make it
        // faster....
        connection = askConnection.get();
        connection.asking();

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
      }

      return execute(connection);

    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      if (attempts <= 1) {
        //We need this because if node is not reachable anymore - we need to finally initiate slots renewing,
        //or we can stuck with cluster state without one node in opposite case.
        //But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.
        //TODO make tracking of successful/unsuccessful operations for node - do renewing only
        //if there were no successful responses from this node last few seconds
        this.connectionHandler.renewSlotCache();

        //no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation
        throw jce;
      }

      return runWithRetries(key, attempts - 1, tryRandomNode, asking);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache
        // recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }
      // release current connection before recursion or renewing
      releaseConnection(connection);
      connection = null;

      if (jre instanceof JedisAskDataException) {
        asking = true;
        askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {
      } else {
        throw new JedisClusterException(jre);
      }
      return runWithRetries(key, attempts - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }
}
  • 正常情况:上述代码connection=connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));这个是根据key来计算对应的slot,然后根据启动阶段JedisClusterInfoCache的private final Map<Integer, JedisPool> slots = new HashMap<Integer,JedisPool>()属性来找到该slot对应的JedisPool,然后从pool中获取一个jedis connection,最终执行connection.get(key);
  • 异常情况1:JedisNoReachableClusterNodeException,显然都没有可以访问的节点了,直接向上层抛出,不做处理。
  • 异常情况2:JedisConnectionException会进行重试,如果当前已经是最后一次重试,则this.connectionHandler.renewSlotCache()更新slotcache。
  • 异常情况3:JedisRedirectionException和异常情况2差不多基本也是更新slot,不同是此时jedis是通的,所以可以直接传进renewSlotCache(jedis),避免renewSlotCache方法内部多余的轮询获取jedis的操作。然后重试。

文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
JedisPool连接池相关配置 JedisPool连接池相关配置
最近有些其他业务部门的同学在线上环境redis有出现以下错误Unexpected end of stream,这个错误大致是因为,redis服务器端已经关闭了客户端的连接,而客户端不知道依然拿着原来的连接去访问redis服务器,结果就会报出
2017-07-08
下一篇 
spring容器启动@Value属性无法注入 spring容器启动@Value属性无法注入
问题一个同事基于Annotation配置了一段代码,结果有一个Configuration类的两个@Value标注的属性值没有注入进来,代码如下: @Configuration @PropertySource("mysql.prop
2017-04-16
  目录