Jedis 源码分析

2015/07/03 Java Jedis

##协议##

和Redis Server通信的协议规则都在redis.clients.jedis.Protocol这个类中,主要是通过对RedisInputStream和RedisOutputStream对读写操作来完成。

命令的发送都是通过redis.clients.jedis.Protocol的sendCommand来完成的,就是对RedisOutputStream写入字节流

java代码

 private void sendCommand(final RedisOutputStream os, final byte[] command,
        final byte[]... args) {
    try {
        os.write(ASTERISK_BYTE);
        os.writeIntCrLf(args.length + 1);
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(command.length);
        os.write(command);
        os.writeCrLf();

        for (final byte[] arg : args) {
            os.write(DOLLAR_BYTE);
            os.writeIntCrLf(arg.length);
            os.write(arg);
            os.writeCrLf();
        }
    } catch (IOException e) {
        throw new JedisConnectionException("protocol->sendCommand IOException");
    }
}

从这里可以看出redis的命令格式

[*号][消息元素个数]\r\n ( 消息元素个数 = 参数个数 + 1个命令)

[$号][命令字节个数]\r\n

[命令内容]\r\n

[$号][参数字节个数]\r\n

[参数内容]\r\n

[$号][参数字节个数]\r\n

[参数内容]\r\n

返回的数据是通过读取RedisInputStream 进行解析处理后得到的

java代码

private Object process(final RedisInputStream is) {
    try {
        byte b = is.readByte();
        if (b == MINUS_BYTE) {
            processError(is);
        } else if (b == ASTERISK_BYTE) {
            return processMultiBulkReply(is);
        } else if (b == COLON_BYTE) {
            return processInteger(is);
        } else if (b == DOLLAR_BYTE) {
            return processBulkReply(is);
        } else if (b == PLUS_BYTE) {
            return processStatusCodeReply(is);
        } else {
        	throw new JedisDataException("Unknown reply: " + (char) b);
        }
    } catch (IOException e) {
        throw new JedisConnectionException("process IOException ");
    }
    return null;
}

通过返回数据的第一个字节来判断返回的数据类型,调用不同的处理函数

[-号] 错误信息

[*号] 多个数据 结构和发送命令的结构一样

[:号] 一个整数

[$号] 一个数据 结构和发送命令的结构一样

[+号] 一个状态码

##连接##

和Redis Sever的Socket通信是由 redis.clients.jedis.Connection 实现的

Connection 中维护了一个底层Socket连接和自己的I/O Stream 还有Protocol I/O Stream是在Connection中Socket建立连接后获取并在使用时传给Protocol的 Connection还实现了各种返回消息由byte转为String的操作

java代码

private String host;
private int port = Protocol.DEFAULT_PORT;
private Socket socket;
private Protocol protocol = new Protocol();
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int pipelinedCommands = 0;
private int timeout = Protocol.DEFAULT_TIMEOUT;

java代码

 public void connect() {
    if (!isConnected()) {
        try {
            socket = new Socket();
            socket.connect(new InetSocketAddress(host, port), timeout);
            socket.setSoTimeout(timeout);
            outputStream = new RedisOutputStream(socket.getOutputStream());
            inputStream = new RedisInputStream(socket.getInputStream());
        } catch (IOException ex) {
            throw new JedisConnectionException(host+":"+ port+" connect IOException");
        }
    }
}

##原生客户端##

redis.clients.jedis.BinaryClient 继承 Connection, 封装了Redis的所有命令(http://redis.io/commands) 从名子可以看出 BinaryClient 是Redis客户端的二进制版本,参数都是byte[]的 BinaryClient 是通过Connection的sendCommand 调用Protocol的sendCommand 向Redis Server发送命令

java代码

 public void get(final byte[] key) {
    sendCommand(Command.GET, key);
}

sendCommand会调用Connection类的方法

java代码

 protected Connection sendCommand(final Command cmd, final byte[]... args) {
    connect();//建立连接
    protocol.sendCommand(outputStream, cmd, args);
    pipelinedCommands++;
    return this;
}

不是每次sendCommand都需要建立连接,而需要看BinaryClient是否已经建立连接

redis.clients.jedis.Client可以看成是BinaryClient 的高级版本,函数的参数都是String int long 这类的,并由redis.clients.util.SafeEncoder 转成byte后 再调用BinaryClient 对应的函数

这二个client只完成了发送命令的封装,并没有处理返回数据

java代码

 public void get(final String key) {
    get(SafeEncoder.encode(key));
}

##Jedis客户端##

我们平时用的基本都是由redis.clients.jedis.Jedis类封装的客户端 Jedis是通过对Client的调用,完成命令发送和返回数据这个完整过程的

以GET命令为例,其它命令类似 Jedis中的get函数如下

java代码

public String get(final String key) {
    checkIsInMulti();
    client.sendCommand(Protocol.Command.GET, key);
    return client.getBulkReply();
}

checkIsInMulti(); 是进行无事务检查 Jedis不能进行有事务的操作 带事务的连接要用redis.clients.jedis.Transaction类

client.sendCommand(Protocol.Command.GET, key); 调用Client发送命令

return client.getBulkReply(); 处理返回值

分析到这里 一个Jedis客户端的基本实现原理应该很清楚了

##连接池##

在实现项目中,要使用连接池来管理Jedis的生命周期,满足多线程的需求,并对资源合理使用。

jedis有两个连接池类型, 一个是管理 Jedis, 一个是管理ShardedJedis(jedis通过java实现的 多Redis实例的自动分片功能,后面会分析)

![JedisPool] (/assets/images/2015/jedis-pool.png)

他们都是Pool的不同实现

java代码

public abstract class Pool<T> {
private final GenericObjectPool internalPool;

public Pool(final GenericObjectPool.Config poolConfig,
        PoolableObjectFactory factory) {
    this.internalPool = new GenericObjectPool(factory, poolConfig);
}

@SuppressWarnings("unchecked")
public T getResource() {
    try {
        return (T) internalPool.borrowObject();
    } catch (Exception e) {
        throw new JedisConnectionException(
                "Could not get a resource from the pool", e);
    }
}
...
...

从代码中可以看出,Pool是通过 Apache Commons Pool 中的GenericObjectPool这个对象池来实现的 (Apache Commons Pool内容可参考[http://phil-xzh.iteye.com/blog/320983](http://phil-xzh.iteye.com/blog/320983) )

在JedisPool中,实现了一个符合 Apache Commons Pool 相应接口的JedisFactory,GenericObjectPool就是通过这个JedisFactory来产生Jedis实例的

其实JedisPoolConfig也是对Apache Commons Pool 中的Config进行的一个封装

当你在调用 getResource 获取Jedis时, 实际上是Pool内部的internalPool调用borrowObject()借给你了一个实例 而internalPool 这个 GenericObjectPool 又调用了 JedisFactory 的 makeObject() 来完成实例的生成 (在Pool中资源不够的时候)

java代码

public class JedisPool extends Pool<Jedis> {

public JedisPool(final GenericObjectPool.Config poolConfig,
        final String host) {
    this(poolConfig, host, Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT,
            null);
}

...
...
/**
 * PoolableObjectFactory custom impl.
 */
private static class JedisFactory extends BasePoolableObjectFactory {
    private final String host;
    private final int port;
    private final int timeout;
    private final String password;

    public JedisFactory(final String host, final int port,
            final int timeout, final String password) {
        super();
        this.host = host;
        this.port = port;
        this.timeout = (timeout > 0) ? timeout : -1;
        this.password = password;
    }

    public Object makeObject() throws Exception {
        final Jedis jedis;
        if (timeout > 0) {
            jedis = new Jedis(this.host, this.port, this.timeout);
        } else {
            jedis = new Jedis(this.host, this.port);
        }

        jedis.connect();
        if (null != this.password) {
            jedis.auth(this.password);
        }
        return jedis;
    }

ShardedJedis的Pool和jedis的Pool不同,JedisPool里的Factory是针对同一个host和port产生多个相同的Jedis实例,是为了提高性能,支持多线程。而ShardedJedisPool是针对很多分片创建实例

java代码

 /**
 * PoolableObjectFactory custom impl.
 */
private static class ShardedJedisFactory extends BasePoolableObjectFactory {
    private List<JedisShardInfo> shards;
    private Hashing algo;
    private Pattern keyTagPattern;

    public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo,
            Pattern keyTagPattern) {
        this.shards = shards;
        this.algo = algo;
        this.keyTagPattern = keyTagPattern;
    }

    public Object makeObject() throws Exception {
        ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
        return jedis;
    }

ShardedJedis包含很多分片,每一个分片可能都是一个Jedis实例。形象的理解就是ShardedJedis就是一个访问redis集群的代理客户端

##客户端自动分片##

先获取hash(key)后对应的 Jedis 再用这个Jedis进行操作

java代码

public String get(String key) {
    Jedis j = getShard(key);
    return j.get(key);
}

java代码

 public R getShard(String key) {
    return resources.get(getShardInfo(key));
}

java代码

 private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();

resources:key为ShardInfo,value为redis

初始化的操作:

java代码

 private void initialize(List<S> shards) {
    nodes = new TreeMap<Long, S>();

    for (int i = 0; i != shards.size(); ++i) {
        final S shardInfo = shards.get(i);
        if (shardInfo.getName() == null)
        	for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
        		nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
        	}
        else
        	for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
        		nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
        	}
        resources.put(shardInfo, shardInfo.createResource());
    }
}

通过Key获取对应的jedis时,先对key进行hash,和前面初始化节点环时,使用相同的算法 再从nodes这个虚拟的环中取出 大于等于 这个hash值的第一个节点(shardinfo),没有就取nodes中第一个节点(所谓的环 其实是逻辑上实现的)

java代码

public S getShardInfo(byte[] key) {
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.size() == 0) {
        return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
}

Search

    Table of Contents