Comet技术在项目中的使用

本文深入探讨了Comet技术及其框架Cometd,特别是使用Jetty作为服务器端推实现的详细过程。文章阐述了Comet与Jetty的结合如何优化长轮询操作,减少系统资源浪费,以及如何通过Jetty的组件化特性灵活构建应用。此外,还介绍了Jetty的工作原理,包括NIO模型下任务分配和请求处理流程,以及环境配置步骤和客户端开发方法。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

转载地址:http://my.oschina.net/ITBoy/blog/23927

Comet是一种服务器端推的技术,所谓服务器端推也就是当有事件要通知给某个用户的时候,是由服务器端直接发送到用户的浏览器。
服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology

有一个Comet的框架叫做Cometd,使用的方式为Long polling。它是使用了jetty continuations特性,jetty continuations使得异步的request成为可能,这里我们来讨论下为何需要jetty continuations呢?

比如我们的浏览器的一个请求发送到服务器端了,并进行长轮询,保持了连接不结束,直到一次长轮询timeout或者有事件发生,并接收到服务端推来的消息,所以在一次长轮询的过程中,大部分时间都是在等待,如果使用老式同步的方式进行编程的话,那么有多少个连接就需要多少个线程在那里,而大都数都是在等待,所以这无疑是系统资源的巨大浪费。
jetty continuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。
当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。

Jetty是一个纯java实现的非常轻量级的web容器,高度组件化,可以很方便的将各种组件进行组装,而且可以非常容易的将jetty嵌入到自己的应用中。
jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。
下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。

在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用java io实现的好),
我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。
这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。

换话题了,我们都知道请求最终都是在Servlet中被处理的,而Servlet得到的是request,response,这些对象什么时候出来的呢?不急,上面不是说到一个EndPoint(实现了Runnable接口)EndPoint对象在被初始化的时候就对其_connection成员进行了初始化,生成一个HttpConnection对象,newConnection的方法其实在SelectChannelConnector中被覆盖了。然后这个EndPoint对象不是被分配到ThreadPool了么,ThreadPool将其加入到队列中,当有空闲线程的时候,就对这个endPoint对象进行处理了,运行EndPoint的run方法,然后会调用自己的connection对象的handle方法,最终将connection对象交给Server的handler进行处理。Server本身继承自HandlerWrapper,自己的_handler是一个HandlerCollection的实例,HandlerCollection实例的配置在jetty.xml中有配置,在处理httpconnection对象的时候所配置的handler会依次被执行。
DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。

环境配置方法:
服务器端:
    类库清单:WEB-INF/lib
        jetty-6.1.9.jar
        jetty-util-6.1.9.jar
        servlet-api-2.5-6.1.9.jar
        (以上Jetty服务器自带)
        cometd-api-0.9.20080221.jar
        cometd-bayeux-6.1.9.jar

    web.xml配置:

   <!--  配置ContinuationCometdServlet, 这个是必须的。配置后就可以支持comted  --> 
   < servlet > 
     < servlet-name > cometd </ servlet-name > 
     < servlet-class > org.mortbay.cometd.continuation.ContinuationCometdServlet </ servlet-class > 
     <!--  对队列的内容进行过滤  --> 
     < init-param > 
       < param-name > filters </ param-name > 
       < param-value > /WEB-INF/filters.json </ param-value > 
     </ init-param > 
     <!--  超时设置The server side poll timeout in milliseconds (default 250000). This is how long the server will 
hold a reconnect request before responding.  --> 
     < init-param > 
       < param-name > timeout </ param-name > 
       < param-value > 120000 </ param-value > 
     </ init-param > 
     <!--  The client side poll timeout in milliseconds (default 0). How long a client will wait between 
reconnects  --> 
     < init-param > 
       < param-name > interval </ param-name > 
       < param-value > 0 </ param-value > 
     </ init-param > 
     <!--  the client side poll timeout if multiple connections are detected from the same browser 
(default 1500).  --> 
     < init-param > 
       < param-name > multiFrameInterval </ param-name > 
       < param-value > 1500 </ param-value > 
     </ init-param > 
     <!--  0=none, 1=info, 2=debug  --> 
     < init-param > 
       < param-name > logLevel </ param-name > 
       < param-value > 0 </ param-value > 
     </ init-param > 
     <!--  If "true" then the server will accept JSON wrapped in a comment and will generate JSON wrapped 
in a comment. This is a defence against Ajax Hijacking.  --> 
     < init-param > 
       < param-name > JSONCommented </ param-name > 
       < param-value > true </ param-value > 
     </ init-param > 

     < init-param > 
       < param-name > alwaysResumePoll </ param-name > 
       < param-value > false </ param-value >   <!--  use true for x-site cometd  --> 
     </ init-param > 
     < load-on-startup > 1 </ load-on-startup > 
   </ servlet > 

   < servlet-mapping > 
     < servlet-name > cometd </ servlet-name > 
     < url-pattern > /cometd/* </ url-pattern > 
   </ servlet-mapping >   

filters.json内容如下:

格式如下:
    {
        "channels": "/**", --要过滤的队列(支持通配符)
        "filter":"org.mortbay.cometd.filter.NoMarkupFilter", --使用的过滤器,实现接口dojox.cometd.DataFilter
        "init"    : {} --初始化的值,调用 DataFilter.init方法传入
    }

示例内容如下:

[
  {
     " channels " :  " /** " , 
     " filter "   :  " org.mortbay.cometd.filter.NoMarkupFilter " , 
     " init "     : {}
  } , 

  {
     " channels " :  " /chat/* " , 
     " filter "    :  " org.mortbay.cometd.filter.RegexFilter " , 
     " init "     :  [
                   [  "[fF ] .ck " , " dang "  ],
                  [  " teh  " , " the  " ]
                ]
  },
 
  {
     " channels " :  " /chat/** " ,
     " filter "    :  " org.mortbay.cometd.filter.RegexFilter " ,
     " init "     : [
                  [  " [ Mm ] icrosoft " ,  " Micro\\$oft "  ],
                  [  " .*tomcat.* " , null ]
                ]
  }
]

这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。
客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。

代码开发:

接下来,我们要准备客户端(使用dojo来实现)

一共三个文件
index.html
chat.js
chat.css(不是必须)

下面来看一下这两个文件的内容(加入注释)
index.html
< html > 
< head > 
     < title > Cometd chat </ title > 
     < script  type ="text/javascript"  src ="../dojo/dojo/dojo.js" ></ script > <!--  dojo类库  --> 
     < script  type ="text/javascript"  src ="../dojo/dojox/cometd.js.uncompressed.js" ></ script > <!--  dojo-cometd类库  --> 
     < script  type ="text/javascript"  src ="chat.js" ></ script > <!--  chat js文件,控制cometd的连接,消息的发送与接收  --> 
     < link  rel ="stylesheet"  type ="text/css"  href ="chat.css" > 
</ head > 
< body > 
< h1 > Cometd Chat </ h1 > 

< div  id ="chatroom" > 
  < div  id ="chat" ></ div > 
  < div  id ="input" > 
    < div  id ="join"   > <!--  未登录时,显示的登录名和登录按钮  --> 
     Username:   < input  id ="username"  type ="text" />
< input  id ="joinB"  class ="button"  type ="submit"  name ="join"  value ="Join" /> 
    </ div > 
    < div  id ="joined"  class ="hidden" > <!--  登录后,显示的消息框和发送,退出按钮(默认为隐藏)  --> 
     Chat:   < input  id ="phrase"  type ="text" ></ input > 
      < input  id ="sendB"  class ="button"  type ="submit"  name ="join"  value ="Send" /> 
      < input  id ="leaveB"  class ="button"  type ="submit"  name ="join"  value ="Leave" /> 
    </ div > 
   </ div > 
  </ div > 

</ body >

chat.js文件

1  // 引入所需要的类 
  2  dojo.require( " dojox.cometd " );
  3  dojo.require( " dojox.cometd.timestamp " );
  4  
  5  // 定义一个room类 
  6  var  room  =  {
  7       // 定义属性 
  8      _last:  "" ,  // 最后发送消息的人员(如果不是本人,则显示为空)  
  9      _username:  null ,  // 当前的用户名 
 10      _connected:  true ,  // 当前的连接状态 true已经连接, false表示未连接 
 11      groupName:  " whimsical " ,  // 组名(未知) 
 12  
 13       // 登录操作 
 14      join:  function (name){
 15  
 16           if (name  ==   null   ||  name.length == 0  ){
 17              alert('Please enter a username ! ');
 18          } else {
 19  
 20              dojox.cometd.init(
new  String(document.location).replace( / http:\ / \ / [ ^ \ / ] */ ,'').replace( / \ / examples\ / . * $ / ,'') + " /cometd " );
 21               //  dojox.cometd.init("http://127.0.0.2:8080/cometd"); 
 22               this ._connected  =   true ;
 23  
 24               this ._username  =  name;
 25              dojo.byId('join').className = 'hidden';
 26              dojo.byId('joined').className = '';
 27              dojo.byId('phrase').focus();
 28  
 29               //  subscribe and join 
 30              dojox.cometd.startBatch();
 31              dojox.cometd.subscribe( " /chat/demo " , room,  " _chat " , { groupName:  this .groupName});
 32              dojox.cometd.publish( " /chat/demo " , { 
 33                  user: room._username,
 34                  join:  true ,
 35                  chat : room._username + "  has joined " 
 36              }, { groupName:  this .groupName });
 37              dojox.cometd.endBatch();
 38  
 39               //  handle cometd failures while in the room 
 40              room._meta  =  dojo.subscribe( " /cometd/meta " ,  this ,  function (event){
 41                  console.debug(event);   
 42                   if (event.action  ==   " handshake " ){
 43                      room._chat({ data: {
 44                          join:  true ,
 45                          user: " SERVER " ,
 46                          chat: " reinitialized " 
 47                      } });
 48                      dojox.cometd.subscribe( " /chat/demo " , room,  " _chat " , { groupName:  this .groupName });
 49                  } else   if (event.action  ==   " connect " ){
 50                       if (event.successful  &&   ! this ._connected){
 51                          room._chat({ data: {
 52                              leave:  true ,
 53                              user:  " SERVER " ,
 54                              chat:  " reconnected! " 
 55                          } });
 56                      }
 57                       if ( ! event.successful  &&   this ._connected){
 58                          room._chat({ data: {
 59                              leave:  true ,
 60                              user:  " SERVER " ,
 61                              chat:  " disconnected! " 
 62                          } });
 63                      }
 64                       this ._connected  =  event.successful;
 65                  }
 66              }, {groupName:  this .groupName });
 67          }
 68      },
 69  
 70       // 离开操作 
 71      leave:  function (){
 72           if ( ! room._username){
 73               return ;
 74          }
 75  
 76           if (room._meta){
 77              dojo.unsubscribe(room._meta,  null ,  null , { groupName:  this .groupName });
 78          }
 79          room._meta = null ;
 80  
 81          dojox.cometd.startBatch();
 82          dojox.cometd.unsubscribe( " /chat/demo " , room,  " _chat " , { groupName:  this .groupName });
 83          dojox.cometd.publish( " /chat/demo " , { 
 84              user: room._username,
 85              leave:  true ,
 86              chat : room._username + "  has left " 
 87          }, { groupName:  this .groupName });
 88          dojox.cometd.endBatch();
 89  
 90           //  switch the input form 
 91          dojo.byId('join').className = '';
 92          dojo.byId('joined').className = 'hidden';
 93          dojo.byId('username').focus();
 94          room._username  =   null ;
 95          dojox.cometd.disconnect();
 96      },
 97  
 98       // 发送消息 
 99      chat:  function (text){
100           if ( ! text  ||   ! text.length){
101               return   false ;
102          }
103          dojox.cometd.publish( " /chat/demo " , { user: room._username, chat: text}, { groupName:  this .groupName });
104      },
105  
106       // 从服务器收到消息后,回调的方法 
107      _chat:  function (message){
108           var  chat = dojo.byId('chat');
109           if ( ! message.data){
110              console.debug( " bad message format  " + message);
111               return ;
112          }
113           var  from = message.data.user;
114           var  special = message.data.join  ||  message.data.leave;
115           var  text = message.data.chat;
116           if ( ! text){  return ; }
117  
118           if (  ! special  &&  from  ==  room._last ){
119              from = "   " ;
120          } else {
121              room._last = from;
122              from += " : " ;
123          }
124  
125           if (special){
126              chat.innerHTML  +=   " <span class=\ " alert\ " ><span class=\ " from\ " > " + from + "  
</span><span class=\ " text\ " > " + text + " </span></span><br/> " ;
127              room._last = "" ;
128          } else {
129              chat.innerHTML  +=   " <span class=\ " from\ " > " + from + "  </span><span class=\ " text\ " > " + text + " </span><br/> " ;
130          } 
131          chat.scrollTop  =  chat.scrollHeight  -  chat.clientHeight;    
132      },
133      
134       // 初始操作 
135      _init:  function (){
136          dojo.byId('join').className = '';
137          dojo.byId('joined').className = 'hidden';
138          dojo.byId('username').focus();
139  
140           var  element = dojo.byId('username');
141          element.setAttribute( " autocomplete " , " OFF " ); 
142          dojo.connect(element,  " onkeyup " ,  function (e){  // 支持回车,登录   
143               if (e.keyCode  ==  dojo.keys.ENTER){
144                  room.join(dojo.byId('username').value);
145                   return   false ;
146              }
147               return   true ;
148          });
149  
150          dojo.connect(dojo.byId('joinB'),  " onclick " ,  function (e){  // 绑定 room.join方法到 Join按扭 
151              room.join(dojo.byId('username').value);
152              e.preventDefault();
153          });
154  
155          element = dojo.byId('phrase'); // 取得消息框 
156          element.setAttribute( " autocomplete " , " OFF " );
157          dojo.connect(element,  " onkeyup " ,  function (e){  // 支持回车发送消息  
158               if (e.keyCode  ==  dojo.keys.ENTER){
159                  room.chat(dojo.byId('phrase').value);
160                  dojo.byId('phrase').value = '';
161                  e.preventDefault();
162              }
163          });
164  
165      dojo.connect(dojo.byId('sendB'),  " onclick " ,  function (e){   // 绑定 room.chat方法到 sendB按扭  
166              room.chat(dojo.byId('phrase').value);
167              dojo.byId('phrase').value = '';
168      });
169          dojo.connect(dojo.byId('leaveB'),  " onclick " , room,  " leave " );  // 绑定 room.leave方法到 leaveB按扭  
170      } 
171  };
172  
173  // 页面装载时,调用room._init方法 
174  dojo.addOnLoad(room,  " _init " );
175  // 页面关闭时,调用 room.leave方法 
176  dojo.addOnUnload(room, " leave " );
177  
178  // vim:ts=4:noet:

补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作
要进行 监控消息队列,以及进行订阅,发送消息操作的关键就是取得 Bayeux接口实现类 的实例
可以通过 ServletContextAttributeListener 这个监听器接口,通过attributeAdded方式加入
实现方法如下:

1  public   class  BayeuxStartupListener  implements  ServletContextAttributeListener
 2  {
 3       public   void  initialize(Bayeux bayeux)
 4      {
 5           synchronized (bayeux)
 6          {
 7               if  ( ! bayeux.hasChannel( " /service/echo " ))
 8              {
 9                                   // 取得 bayeux实例                
10              }
11          }
12      }
13      
14       public   void  attributeAdded(ServletContextAttributeEvent scab)
15      {
16           if  (scab.getName().equals(Bayeux.DOJOX_COMETD_BAYEUX))
17          {
18              Bayeux bayeux = (Bayeux) scab.getValue();
19              initialize(bayeux);
20          }
21      }
22  
23       public   void  attributeRemoved(ServletContextAttributeEvent scab)
24      {
25  
26      }
27  
28       public   void  attributeReplaced(ServletContextAttributeEvent scab)
29      {
30  
31      }
32  }

取到 Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息

1       public   void  initialize(Bayeux bayeux)
 2      {
 3           synchronized (bayeux)
 4          {
 5               if  ( ! bayeux.hasChannel( " /service/echo " ))
 6              {
 7                                   // 取得 bayeux实例   
 8                                   new  ChatService(bayeux);             
 9              }
10          }
11      }

具体方法请看下面这段代码:

1  // 定义 ChatService类,继承 BayeuxService 
 2  public   static   class  ChatService  extends  BayeuxService {
 3  
 4      ConcurrentMap < String,Set < String >>  _members  =   new  ConcurrentHashMap < String,Set < String >> ();
 5      
 6       public  ChatService(Bayeux bayeux)
 7      {
 8           super (bayeux,  " chat " ); // 必须,把 Bayeux传入到 BayeuxService对象中 
 9          subscribe( " /chat/** " ,  " trackMembers " );  // 订阅队列,收到消息后,会回调trackMembers方法 
10           /* 
11              subscribe支持回调的方法如下:
12              # myMethod(Client fromClient, Object data)
13                      # myMethod(Client fromClient, Object data, String id)
14                      # myMethod(Client fromClient, String channel, Object data,String id)
15              # myMethod(Client fromClient, Message message)
16              
17                  参数:
18                      Client fromClient 发送消息的客户端
19                      Object data 消息内容
20                      id The id of the message 
21                      channel 队列名称
22                      Message message 消息对象。继承于Map
23          
24           */ 
25      }
26      
27       // 发布消息到队列 
28       public   void  sendMessage(String message) {
29              Map < String,Object >  mydata  =   new  HashMap < String, Object > ();
30              mydata.put( " chat " , message);
31              
32              Client sender  =  getBayeux().newClient( " server " );
33              
34              getBayeux().getChannel( " /chat/demo " ,  false ).publish(sender, mydata,  " 0 " /* null */ );
35  
36      }
37      
38       // 发送消息给指定的client(非广播方式) 
39       public   void  sendMessageToClient(Client joiner, String message) {
40              Map < String,Object >  mydata  =   new  HashMap < String, Object > ();
41              mydata.put( " chat " , message);
42             
43              send(joiner,  " /chat/demo " , mydata,  " 0 " /* null */ );
44      }    
45      
46       // 订阅消息回调方法 
47       public   void  trackMembers(Client joiner, String channel, Map < String,Object >  data, String id)
48      {
49               // 解释消息内容,如果消息内容中 有 join这个字段且值为true 
50           if  (Boolean.TRUE.equals(data.get( " join " )))
51          {
52                   // 根据队列,取得当前登录的人员 
53              Set < String >  m  =  _members.get(channel);
54               if  (m == null )
55              {
56                       // 如果为空,则创建一个新的Set实现 
57                  Set < String >  new_list = new  CopyOnWriteArraySet < String > ();
58                  m = _members.putIfAbsent(channel,new_list);
59                   if  (m == null )
60                      m = new_list;
61              }
62              
63               final  Set < String >  members = m;
64               final  String username = (String)data.get( " user " );
65              
66              members.add(username);
67                           // 为该client增加事件,Remove事件。当用户退出时,触发该方法。             
68              joiner.addListener( new  RemoveListener(){
69                   public   void  removed(String clientId,  boolean  timeout)
70                  {
71                      members.remove(username);
72                  }
73              });
74  
75                           // 为该client增加事件,消息的发送和接收事件。当用户退出时,触发该方法。 
76              joiner.addListener( new  MessageListener() {
77                                   public   void  deliver(Client fromClient, Client toClient, Message message) {
78                                      System.out.println( " message from  "   +  fromClient.getId()  +   "  to  " 
79                                               +  toClient.getId()  +   "  message is  "   +  message.getData());
80                                  }      
81              });
82  
83              Map < String,Object >  mydata  =   new  HashMap < String, Object > ();
84              mydata.put( " chat " ,  " members= "   +  members);
85               // 把已经登录的人员信息列表,发送回给消息发送者 
86              send(joiner,channel,mydata,id);
87           
88          }
89      }
90  }
91 


评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值