netty源代码解析(2)——客户端流程.doc

上传人:11****ws 文档编号:2428548 上传时间:2019-05-12 格式:DOC 页数:7 大小:40.41KB
下载 相关 举报
netty源代码解析(2)——客户端流程.doc_第1页
第1页 / 共7页
netty源代码解析(2)——客户端流程.doc_第2页
第2页 / 共7页
netty源代码解析(2)——客户端流程.doc_第3页
第3页 / 共7页
netty源代码解析(2)——客户端流程.doc_第4页
第4页 / 共7页
netty源代码解析(2)——客户端流程.doc_第5页
第5页 / 共7页
点击查看更多>>
资源描述

1、前一篇文章分析了 netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下 Java代码 1. ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 2. Executors.newCachedThreadPool(); 3. 4. bootstrap.setPipelineFactory(new ChannelPipelineFactory() 5. 6. Override 7. public Cha

2、nnelPipeline getPipeline() throws Exception 8. ChannelPipeline pipleline = pipeline(); 9. pipleline.addLast(“encode“, new ObjectEncoder(1048576 * 16); 10. pipleline.addLast(“decode“, new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null); 11. pipleline.addLast(“handler“,

3、handler); 12. return pipleline; 13. 14. ); 15. 16. bootstrap.setOption(“receiveBufferSize“, 1048576 * 64); 17. bootstrap.setOption(“child.tcpNoDelay“, true); /关闭 Nagle算法 18. /tcp定期发送心跳包 比如 IM里边定期探测对方是否下线 19. /只有 tcp长连接下才有意义 20. / bootstrap.setOption(“child.keepAlive“, true); 21. ChannelFuture future

4、 = bootstrap.connect(new InetSocketAddress(address, port); 22. Channel channel = future.awaitUninterruptibly().getChannel(); 客户端事件处理顺序如下: UpStream.ChannelState.OPEN(已经 open) DownStream.ChannelState.BOUND(需要绑定 )DownStream.CONNECTED(需要连接 )UpStream.ChannelState.BOUND(已经绑定 )-UpStream.CONNECTED(连接成功 ) 在

5、connect 的时候做了如下处理 Java代码 1. public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) 2. 3. if (remoteAddress = null) 4. throw new NullPointerException(“remoteAddress“); 5. 6. 7. ChannelPipeline pipeline; 8. try 9. pipeline = getPipelineFactory().getPipeline()

6、; 10. catch (Exception e) 11. throw new ChannelPipelineException(“Failed to initialize a pipeline.“, e); 12. 13. 14. / Set the options.先创建 Channel 15. Channel ch = getFactory().newChannel(pipeline); 16. ch.getConfig().setOptions(getOptions(); 17. 18. / Bind. 19. if (localAddress != null) 20. ch.bind

7、(localAddress); 21. 22. 23. / Connect. 再进行连接 24. return ch.connect(remoteAddress); 25. 首先要创建出 Channel Java代码 1. NioClientSocketChannel( 2. ChannelFactory factory, ChannelPipeline pipeline, 3. ChannelSink sink, NioWorker worker) 4. 5. super(null, factory, pipeline, sink, newSocket(), worker); 6. fire

8、ChannelOpen(this); 7. 紧接着会 fire 一个 ChannelOpen 事件, Java代码 1. if (channel.getParent() != null) 2. fireChildChannelStateChanged(channel.getParent(), channel); 3. 4. 5. channel.getPipeline().sendUpstream( 6. new UpstreamChannelStateEvent( 7. channel, ChannelState.OPEN, Boolean.TRUE); 这样会出发 Upstream 的 C

9、hannelState.OPEN 事件。 接下来要继续 connect 了 Java代码 1. if (remoteAddress = null) 2. throw new NullPointerException(“remoteAddress“); 3. 4. ChannelFuture future = future(channel, true); 5. channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent( 6. channel, future, ChannelState.CONNECTED, remot

10、eAddress); 7. return future; 这样就会出发 Downstream 的 ChannelState.CONNECTED 事件。 接下来就要由 NioClientSocketPipelineSink 来进行处理了 Java代码 1. switch (state) 2. case OPEN: 3. if (Boolean.FALSE.equals(value) 4. channel.worker.close(channel, future); 5. 6. break; 7. case BOUND: 8. if (value != null) 9. bind(channel,

11、 future, (SocketAddress) value); 10. else 11. channel.worker.close(channel, future); 12. 13. break; 14. case CONNECTED: 15. if (value != null) 16. connect(channel, future, (SocketAddress) value); 17. else 18. channel.worker.close(channel, future); 19. 20. break; 21. case INTEREST_OPS: 22. channel.wo

12、rker.setInterestOps(channel, future, (Integer) value).intValue(); 23. break; 下面看下 channel 注册到 worker 的代码,连接的时候是在内部的一个 Boss 类里处理的 所有的连接 connect 操作都被封装成一个 RegisterTask 对象, Boss 类持有 registerTask 队列,在 loop 中不断的去进行 select Java代码 1. private static final class RegisterTask implements Runnable 2. private fi

13、nal Boss boss; 3. private final NioClientSocketChannel channel; 4. 5. RegisterTask(Boss boss, NioClientSocketChannel channel) 6. this.boss = boss; 7. this.channel = channel; 8. 9. 10. public void run() 11. try 12. channel.socket.register( 13. boss.selector, SelectionKey.OP_CONNECT, channel); 14. cat

14、ch (ClosedChannelException e) 15. channel.worker.close(channel, succeededFuture(channel); 16. 17. 18. int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); 19. if (connectTimeout 0) 20. channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; 21. 22. 23. register 方

15、法 Java代码 1. void register(NioClientSocketChannel channel) 2. Runnable registerTask = new RegisterTask(this, channel); 3. Selector selector; 4. 5. synchronized (startStopLock) 6. if (!started) 7. / Open a selector if this worker didnt start yet. 8. try 9. this.selector = selector = Selector.open(); 1

16、0. catch (Throwable t) 11. throw new ChannelException( 12. “Failed to create a selector.“, t); 13. 14. 15. / Start the worker thread with the new Selector. 16. boolean success = false; 17. try 18. DeadLockProofWorker.start( 19. bossExecutor, 20. new ThreadRenamingRunnable( 21. this, “New I/O client

17、boss #“ + id + - + subId); 22. success = true; 23. finally 24. if (!success) 25. / Release the Selector if the execution fails. 26. try 27. selector.close(); 28. catch (Throwable t) 29. logger.warn(“Failed to close a selector.“, t); 30. 31. this.selector = selector = null; 32. / The method will retu

18、rn to the caller at this point. 33. 34. 35. else 36. / Use the existing selector if this worker has been started. 37. selector = this.selector; 38. 39. 40. assert selector != null 41. 42. started = true; 43. boolean offered = registerTaskQueue.offer(registerTask); 44. assert offered; 45. RegisterTas

19、k,放到 Boss 类持有的 registerTaskQueue 之后, Boss 类会从 boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。 然后 run 方法处理感兴趣的事件 Java代码 1. public void run() 2. boolean shutdown = false; 3. Selector selector = this.selector; 4. long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); 5. for (;) 6. wakenUp.set(false); 7.

20、8. try 9. int selectedKeyCount = selector.select(500); 10. . 11. 12. 13. processRegisterTaskQueue(); 14. 15. if (selectedKeyCount 0) 16. processSelectedKeys(selector.selectedKeys(); 17. 在 loop 中, processRegisterTaskQueue 会处理需要注册的任务, processSelectedKeys 处理连接事件 Java代码 1. private void processSelectedKe

21、ys(Set selectedKeys) 2. for (Iterator i = selectedKeys.iterator(); i.hasNext();) 3. SelectionKey k = i.next(); 4. i.remove(); 5. 6. if (!k.isValid() 7. close(k); 8. continue; 9. 10. 11. if (k.isConnectable() 12. connect(k); 13. 14. 15. 将连接上的 Channel 注册到 worker 中,交给 worker 去注册 read 和 write Java代码 1.

22、private void connect(SelectionKey k) 2. NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); 3. try 4. if (ch.socket.finishConnect() 5. k.cancel(); 6. ch.worker.register(ch, ch.connectFuture); 7. 8. catch (Throwable t) 9. ch.connectFuture.setFailure(t); 10. fireExceptionCaught(ch, t); 11. k.cancel(); / Some JDK implementations run into an infinite loop without this. 12. ch.worker.close(ch, succeededFuture(ch); 13. 14. 在这一系列初始化都完成之后, channel 就可以拿来 write 和接收 read 数据了。 http:/

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 教育教学资料库 > 精品笔记

Copyright © 2018-2021 Wenke99.com All rights reserved

工信部备案号浙ICP备20026746号-2  

公安局备案号:浙公网安备33038302330469号

本站为C2C交文档易平台,即用户上传的文档直接卖给下载用户,本站只是网络服务中间平台,所有原创文档下载所得归上传人所有,若您发现上传作品侵犯了您的权利,请立刻联系网站客服并提供证据,平台将在3个工作日内予以改正。