netty简谈
netty是封装nio实现的一套JAVA高性能通信框架,它简化了网络通信编程
了解IO的发展史,大概可以看到是从最早的BIO到NIO,从阻塞到非阻塞的过程
如果使用BIO写通信框架,在通信中就会有大量的阻塞线程,产生巨大的消耗,如果消息消费漫长,服务的性能就会拉胯
如果使用NIO写通信框架,实际上性能是提高了,但是每次都要使用NIO写一套Buffer+Channel+Selector的代码,很麻烦
在这基础上,就出现了netty框架,简化逻辑编写
netty编程案例
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>...</version>
</dependency>
服务端架设
public class NettyServer {
public void server() {
new ServerBootstrap()
// 1. group-事件组:对thread和nio.selector的封装,在netty中用于选择IO组件
.group(new NioEventLoopGroup())
// 2. channel-通道:对nio.channels的封装
.channel(NioServerSocketChannel.class)
// 3. 添加处理链
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 4.1 使用一些预置的处理器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LoggingHandler());
// 4.2 自定义处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
// 4. 绑定端口
.bind(8080);
}
}
可以看到,主要有四个步骤:
设置group事件组:group是对thread和selector的封装,在netty中用于选择IO组件
设置channel通道:是对nio.channels的封装
添加处理链:这里可以添加一些预置的处理器(在
io.netty.handler
包下),也可以添加自定义的处理器绑定监听端口
客户端开发
public class NettyClient {
public void client() throws InterruptedException {
new Bootstrap()
// 1. 设置事件组
.group(new NioEventLoopGroup())
// 2. 设置channel
.channel(NioSocketChannel.class)
// 3. 设置处理链
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 4. 连接服务器
.connect("localhost", 8080)
// 5. 同步通信
.sync()
// 6. 代表连接对象
.channel()
// 7. 发送数据
.writeAndFlush("hello server");
}
}
客户端的逻辑与服务端类似,前三个是一样的,后面就是连接流程
通信测试
public class NettyTest {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
new NettyServer().server();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
new NettyClient().client();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
源码阅读
group - 事件组
grou是对nio.selector的封装,首先明确两个概念:
EventLoop:事件循环对象,是对Thread的封装,它的本质就是一个单线程,它内部封装了nio.selector,可以在一个线程内多多个io进行轮询
EventLoopGroup:事件组,是对EventLoop的封装,它本质一个ExecutorService,通过维护多个EventLoop实现多线程控制io,真正开发的过程中,一般使用EventLoopGroup,不使用EventLoop
从.group(new NioEventLoopGroup())
方法往下看
NioEventLoopGroup的构造函数
public NioEventLoopGroup() {
this(0);
}
首先是实现类的无参构造,传入了一个nthread=0
向上看,可以看到对nThread进行默认值判断
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// -- MultithreadEventLoopGroup --
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
在类的static方法中有对默认值的初始化
继续向上看,终于找到其构造函数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args)
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
线程池和命名逻辑
首先构造一个线程池
nio实现了自己的线程池,具有以下特性:
newDefaultThreadFactory()
方法构造了一个DefaultThreadFactory工工厂
// io.netty.util.concurrent.MultithreadEventExecutorGroup
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
ThreadPerTaskExecutor重写了execute方法,可以看到每执行一个任务,都会执行
DefaultThreadFactory#newThread
方法创建一条新线程
// io.netty.util.concurrent.ThreadPerTaskExecutor
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
DefaultThreadFactory的命名逻辑是nioEventLoopGroup-m-nn,m是代表第几个group,nn是第几个eventLoop
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
可见命名的逻辑来源于这个构造函数,调用toPoolName()
方法,这里传进来的poolType是nioEventLoopGroup
EventExecutor的初始化
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
……
继续根据MultithreadEventExecutorGroup的构造方法,到了这里,实际上是在初始化EventExecutor数组
这里使用的是newChiled(Executor executor, Object... args)
这个方法构造EventLoop,可见NioEventLoopGroup中封装的实际上就是EventLoop
要注意的是,该方法是在NioEventLoopGroup中重写的
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
……
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
NioEventLoop
根据NioEventLoopGroup的构造函数逻辑,可以看到是初始化了一个EventExecutor[],并且按照线程数量,初始化一定量的NioEventLoop存在里面
NioEventLoop的特性包括:
封装了selector,具备nio的选择器能力
实现Thread,具备任务启动能力
核心成员变量
private Selector selector;
可见里面是封装了nio.selector的
构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
完成了对selector的初始化操作
register
封装了nio.channel的register逻辑
if (inEventLoop()) {
register0(ch, interestOps, task);
} else {
……
// register0
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
run方法
因为NioEventLoop实现了Thread的能力,因此也具备任务启动的能力
for (;;) {
try {
int strategy;
try {
// 1. 判断策略
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
……
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
执行一个死循环,首先判断策略
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
正常有任务,这里判断的应该是SELECT策略,这时执行strategy = select(curDeadlineNanos)
方法,选择自己管理的nio.channel
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
可见调用的是nio.selector的select()
方法
后面根据select结果,做对应处理
channel - nio通道封装
netty.channel并不是nio.channels的直接实现,而是对其进行功能增强版本的封装
还是从其使用开始看
.channel(NioServerSocketChannel.class)
这里调用的是ServerBootStrap#channel
方法,则先从该方法开始分析netty.channel的创建流程
netty.channel的创建流程
首先是AbstractBootStrap#channel
方法
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
可以看出来是一个工厂模式的反射工厂,看ReflectiveChannelFactory的构造函数
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
这里取到了类的constructor,备用,看它使用的地方是在下面的newChannel方法、
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
可见这里调用的是指定class的无参构造函数,还记得一开始传进去的是NioServerSocketChannel.class
,因此调用的目标就是它的无参构造,这里成功获取到了一个netty.channel的实例
相似的,客户端侧使用BootStrap#channel
方法获取的是NioSocketChannel实例
再ctrl+alt+h分析下ReflectiveChannelFactory#newChannel的调用点,可以看到两个:
BootStrap#connect():客户端与特定ip建立连接的时候
ServerBootStrap#bind():服务端开始监听指定端口的时候
可见netty.channel的构造时机是在建连的时候
NioSocketChannel与NioServerSocketChannel
从无参构造开始看
public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioServerSocketChannel(SelectorProvider provider) {
this(provider, null);
}
// 函数3
public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
this(newChannel(provider, family));
}
// 函数4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// 函数5
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
……
try {
ch.configureBlocking(false);
……
// 函数6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在函数3中调用newChannel()
方法构造nio.channels
实例
在函数4中指定关心的selectKey为OP_ACCEPT
在函数5中已经调用到父类AbstractNioChannel,这里把传进来的nio.channels实例绑定到成员属性中,同时还设置了非阻塞
在函数6中初始化了自己的channelPipeline,是后面加handler用的
看newInstance方法
private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
ServerSocketChannel channel =
SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
return channel == null ? provider.openServerSocketChannel() : channel;
……
这里可以得到结论:NioServerSocketChannel是对ServerSocketChannel的封装,而非简单的重写,同理,NioSocketChannel也是对SocketChannel的封装
Future&Promise - netty的异步任务增强
前面已经看到了可见netty.channel的构造时机是在建连的时候,服务端是在ServerBootStrap#bind()
方法,客户端是在BootStrap#connect()
方法,以connect方法为例看下
connect - 连接建立的流程
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
validate();
return doResolveAndConnect(remoteAddress, localAddress);
}
校验,剩下流程继续向下看
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 1. 初始化netty.channel,包括注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 2. 如果完成了就直接连接服务端即可
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 连接服务端的具体代码
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// 3. 如果暂时没有完成注册,则添加监听器,在注册完成后执行对应操作
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
// 4. 回调的具体实现
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
首先在initAndRegister()
方法中完成netty.channel的初始化过程,生成了一个ChannelFuture
当这个ChannelFuture实例是刚初始化的,需要向其中添加listener监听器,其中使用promise做连接
那么这个Future和Promise到底是啥呢
Future和Promise相关能力
ChannelFuture
public interface ChannelFuture extends Future<Void>
ChannelFuture是netty对JDK的Future的二次封装,它提供的能力主要是对于监听器的增删改,说明了这个接口的基本功能:回调功能,以及监听器的管理(曾删改)
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
需要注意的是,这里返回的Future类型是netty.Future,已经对jdk的Future封装过一层了
GenericFutureListener
监听器类型
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
提供了一个方法,即针对成功的结果做相应的操作
Promise
Promise是对Future的拓展,比较重要的点,还是对回调监听器的管理,增加了判断状态的能力。新添加的功能就是:判断future的状态,同时决定是否要唤醒监听器,或者是否要抛异常
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
ChannelPromise
public interface ChannelPromise extends ChannelFuture, Promise<Void>
这个玩意继承了ChannelFuture和Promise的能力,可知它提供的功能包括:
Promise提供的对状态的判断,以及根据状态判断是否唤醒监听器
Future提供的回调管理,对监听器的增删改查能力
ChannelPromise = Channel +future + Promise
其实换句话说,Future提供的是回调能力,即处理完了,我要做什么,而Promise提供的是提前获取处理结果的能力,因此它还相当于一个数据容器,即我可以从容器中取出一部分数据,或发现异常,提早抛出,不必等异步完全执行结束
pipeline - netty中的处理链
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 4.1 使用一些预置的处理器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new LoggingHandler());
// 4.2 自定义处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
还是先看应用,这里通过BootStrap#childHandler
方法添加处理链,重新ChannelInitializer#initChannel
方法添加自己的链
netty系统提供了一些默认的处理器,也可以自定义处理器
处理器的分类
ChannelInboundHandlerAdapter:入站处理器,数据从网络流向应用程序的过程被称为“入站”,即这类处理器处理的是网络中发来的数据,例如:
StringDecoder用于处理传进来的ByteBuffer,对其进行解码
LoggingHandler用于接收数据后的日志记录
ChannelOutboundHandlerAdapter:出站处理器,数据从应用程序流向网络的过程被称为“出站”,即这类处理器处理的是待发送的数据,处理完,数据就可以发送到网络了
这是两个默认的实现类,提供了一些处理方法的默认实现,例如:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
……
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
……
}
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
……
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
……
}
评论区