`
jianfulove
  • 浏览: 118261 次
  • 性别: Icon_minigender_1
  • 来自: 湛江
社区版块
存档分类
最新评论

tigase源码分析3:SocketThread

阅读更多

 

SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,

SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,

socketReadThread():负责读socket的数据;

socketWriteThread():负责写入socket数据;

ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听

 

SocketThread ::
private static SocketThread[] socketReadThread = null;
private static SocketThread[] socketWriteThread = null;
private static ThreadPoolExecutor executor = null;
private static CompletionService<IOService<?>> completionService = null;
//下面是实例属性
private Selector clientsSel = null;
private boolean reading = false;
private boolean writing = false;

static {
		if (socketReadThread == null) {
			int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;

			executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
					new LinkedBlockingQueue<Runnable>());
			completionService = new ExecutorCompletionService<IOService<?>>(executor);
                        //执行任务的线程池
			socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据; 
                         socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据;
			for (int i = 0; i < socketReadThread.length; i++) {
				socketReadThread[i] = new SocketThread("socketReadThread-" + i);
				socketReadThread[i].reading = true;

				Thread thrd = new Thread(socketReadThread[i]);

				thrd.setName("socketReadThread-" + i);
				thrd.start();//启动,会执行run()
			}

			log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length);

			for (int i = 0; i < socketWriteThread.length; i++) {
				socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);
				socketWriteThread[i].writing = true;

				Thread thrd = new Thread(socketWriteThread[i]);

				thrd.setName("socketWriteThread-" + i);
				thrd.start();////启动,会执行run()
			}

			log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length);
		}    // end of if (acceptThread == null)
	}


       //生成每一个SocketThread都会有一个对应ResultsListener线程
	private SocketThread(String name) {
		try {
			clientsSel = Selector.open();
		} catch (Exception e) {
			log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
			stopping = true;
		}    // end of try-catch

		new ResultsListener("ResultsListener-" + name).start();
	}



  public void SocketThread.run() {
		while ( !stopping) {
			try {
				clientsSel.select();

				if (log.isLoggable(Level.FINEST)) {
					log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel);
				}
                              //等到已选择的key,证明有数据要处理
				Set<SelectionKey> selected = clientsSel.selectedKeys();
				int selectedKeys = selected.size();

				if ((selectedKeys == 0) && (waiting.size() == 0)) {
					if (log.isLoggable(Level.FINEST)) {
						log.finest("Selected keys = 0!!! a bug again?");
					}

					if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {
						recreateSelector();
					}
				} else {
					empty_selections = 0;

					if (selectedKeys > 0) {

						for (SelectionKey sk : selected) {
                                                //得到ConnectionListenerImpl.accept()中绑定的ioservice
							IOService s = (IOService) sk.attachment();

							try {
							
								.....
                                                       //下一次socket从selector监听队列中移除
								sk.cancel();
                                                           
								forCompletion.add(s);

								
							} catch (CancelledKeyException e) {
							...
							}
						}
					}

					// Clean-up cancelled keys...
					clientsSel.selectNow();
				}
                              //注册新的socket到selector中进行监听     
				addAllWaiting();

				IOService serv = null;

				while ((serv = forCompletion.pollFirst()) != null) {
                                       //放线程沲中执行,调用了IOService.call()进行数据处理
					completionService.submit(serv);
				}

				// clientsSel.selectNow();
			} catch (CancelledKeyException brokene) {

				。。
			} catch (IOException ioe) {
                           。。
			} catch (Exception exe) {
			..
			}
		}
	}



//ResultsListener.run()
    public void ResultsListener.run() {
    for (;;) {
	try {
//CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
      //其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。

	IOService<?> service = completionService.take().get();
		if (service != null) {
		if (service.isConnected()) {//只要连接没关闭
			addSocketService(service);//就再次注册到线程的Selector中
         ............
                  }
           }
        }

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics