SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,
SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,
socketReadThread():负责读socket的数据;
socketWriteThread():负责写入socket数据;
ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听;
SocketThread :: private static SocketThread[] socketReadThread = null; private static SocketThread[] socketWriteThread = null; private static ThreadPoolExecutor executor = null; private static CompletionService<IOService<?>> completionService = null; //下面是实例属性 private Selector clientsSel = null; private boolean reading = false; private boolean writing = false; static { if (socketReadThread == null) { int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1; executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); completionService = new ExecutorCompletionService<IOService<?>>(executor); //执行任务的线程池 socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据; socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据; for (int i = 0; i < socketReadThread.length; i++) { socketReadThread[i] = new SocketThread("socketReadThread-" + i); socketReadThread[i].reading = true; Thread thrd = new Thread(socketReadThread[i]); thrd.setName("socketReadThread-" + i); thrd.start();//启动,会执行run() } log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length); for (int i = 0; i < socketWriteThread.length; i++) { socketWriteThread[i] = new SocketThread("socketWriteThread-" + i); socketWriteThread[i].writing = true; Thread thrd = new Thread(socketWriteThread[i]); thrd.setName("socketWriteThread-" + i); thrd.start();////启动,会执行run() } log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length); } // end of if (acceptThread == null) } //生成每一个SocketThread都会有一个对应ResultsListener线程 private SocketThread(String name) { try { clientsSel = Selector.open(); } catch (Exception e) { log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e); stopping = true; } // end of try-catch new ResultsListener("ResultsListener-" + name).start(); } public void SocketThread.run() { while ( !stopping) { try { clientsSel.select(); if (log.isLoggable(Level.FINEST)) { log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel); } //等到已选择的key,证明有数据要处理 Set<SelectionKey> selected = clientsSel.selectedKeys(); int selectedKeys = selected.size(); if ((selectedKeys == 0) && (waiting.size() == 0)) { if (log.isLoggable(Level.FINEST)) { log.finest("Selected keys = 0!!! a bug again?"); } if ((++empty_selections) > MAX_EMPTY_SELECTIONS) { recreateSelector(); } } else { empty_selections = 0; if (selectedKeys > 0) { for (SelectionKey sk : selected) { //得到ConnectionListenerImpl.accept()中绑定的ioservice IOService s = (IOService) sk.attachment(); try { ..... //下一次socket从selector监听队列中移除 sk.cancel(); forCompletion.add(s); } catch (CancelledKeyException e) { ... } } } // Clean-up cancelled keys... clientsSel.selectNow(); } //注册新的socket到selector中进行监听 addAllWaiting(); IOService serv = null; while ((serv = forCompletion.pollFirst()) != null) { //放线程沲中执行,调用了IOService.call()进行数据处理 completionService.submit(serv); } // clientsSel.selectNow(); } catch (CancelledKeyException brokene) { 。。 } catch (IOException ioe) { 。。 } catch (Exception exe) { .. } } } //ResultsListener.run() public void ResultsListener.run() { for (;;) { try { //CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 //其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。 IOService<?> service = completionService.take().get(); if (service != null) { if (service.isConnected()) {//只要连接没关闭 addSocketService(service);//就再次注册到线程的Selector中 ............ } } }
相关推荐
tigase相关jar包
Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...
该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...
tigase 7.10 mongodb 3 配置
Tigase开源项目,使用java编写,是个标准的Jabber(XMPP)协议服务端项目,用户数,均衡,符合要求。主页http://www.tigase.org/ 除了tigase开源项目还有: Openfire (Wildfire) 3.x(http://www.igniterealtime.org/)...
描述了如何部署tigase http-api模块,此方式为源码部署
Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...
Tigase 概述,描述了1、为什么选择Tigase 2、RFC的实现 3、Tigase实现的XMPP扩展协议等
tigase-server-tigase-server-8.0.0.zip 源码,不知道怎么设置不用积分下载,不还意思。。。。。。。。。
QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...
tigase-local
tigase-server 配置相关内容 https://blog.csdn.net/w690333243/article/details/90550837
全面:tigase 完全实现了XMPP协议,除了全面实施的两个核心协议,它支持大多数的你可能永远都需要的扩展协议。 开源:Tigase是开源的,如果你有有那能力,你可以定制自己的XMPPServer,虽然经过了很多次此时,但是...
Spark连接Tigase服务器,完整的步骤,很清晰的看到。大家可以参考。
Tigase Server 是一个轻量级的可伸缩的 Jabber/XMPP 服务器。无需其他第三方库支持,可以处理非常高的复杂和大量的用户数,可以根据需要进行水平扩展。
tigase 集群设置,已实践测试过,本次测试 以两台机器测试的。
tigase 内部处理流程 详解,适合初学者参考。
Tigase XMPP 服务器 Docker 映像 安装了 Tigase XMPP 服务器 (5.2.3) 的 Docker 映像用于评估目的。 请勿在生产环境中使用。 为帐户注册和配置存储设置了非持久性 Derby 数据库。 在此设置中创建了一个不存在的...
tigase 组件源代码,未编译,大家方便下载
tigase-utils-3.4.4.jar(Tigase相关客户端,java语言需要用到的工具类jar包,希望大家喜欢) 正好下载到,同步发出来给更多需要的人吧