View Javadoc

1   //========================================================================
2   //Copyright 2007 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd.continuation;
16  
17  import java.io.IOException;
18  import java.nio.ByteBuffer;
19  import javax.servlet.ServletException;
20  import javax.servlet.http.HttpServletRequest;
21  import javax.servlet.http.HttpServletResponse;
22  
23  import org.cometd.Bayeux;
24  import org.cometd.Client;
25  import org.cometd.Extension;
26  import org.cometd.Message;
27  import org.mortbay.cometd.AbstractBayeux;
28  import org.mortbay.cometd.AbstractCometdServlet;
29  import org.mortbay.cometd.ClientImpl;
30  import org.mortbay.cometd.JSONTransport;
31  import org.mortbay.cometd.MessageImpl;
32  import org.mortbay.cometd.Transport;
33  import org.mortbay.util.ArrayQueue;
34  import org.mortbay.util.StringUtil;
35  import org.mortbay.util.ajax.Continuation;
36  import org.mortbay.util.ajax.ContinuationSupport;
37  
38  public class ContinuationCometdServlet extends AbstractCometdServlet
39  {
40      /* ------------------------------------------------------------ */
41      @Override
42      protected AbstractBayeux newBayeux()
43      {
44          return new ContinuationBayeux();
45      }
46  
47      /* ------------------------------------------------------------ */
48      @Override
49      protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
50      {
51          // Look for an existing client and protect from context restarts
52          Object clientObj=request.getAttribute(CLIENT_ATTR);
53          Transport transport=null;
54          int received=-1;
55          boolean metaConnectDeliveryOnly=false;
56          boolean pendingResponse=false;
57          boolean metaConnect=false;
58  
59          // Have we seen this request before
60          ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
61          if (client != null)
62          {
63              // yes - extract saved properties
64              transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
65              transport.setResponse(response);
66              metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
67              metaConnect=true;
68          }
69          else
70          {
71              Message[] messages=getMessages(request);
72              received=messages.length;
73  
74              /* check jsonp parameter */
75              String jsonpParam=request.getParameter("jsonp");
76  
77              // Handle all received messages
78              try
79              {
80                  for (Message message : messages)
81                  {
82                      if (jsonpParam != null)
83                          message.put("jsonp",jsonpParam);
84  
85                      if (client == null)
86                      {
87                          client=(ContinuationClient)_bayeux.getClient((String)message.get(AbstractBayeux.CLIENT_FIELD));
88  
89                          // If no client, SHOULD be a handshake, so force a
90                          // transport and handle
91                          if (client == null)
92                          {
93                              // Setup a browser ID
94                              String browser_id=findBrowserId(request);
95                              if (browser_id == null)
96                                  browser_id=setBrowserId(request,response);
97  
98                              if (transport == null)
99                              {
100                                 transport=_bayeux.newTransport(client,message);
101                                 transport.setResponse(response);
102                                 metaConnectDeliveryOnly=transport.isMetaConnectDeliveryOnly();
103                             }
104                             _bayeux.handle(null,transport,message);
105                             message=null;
106                             continue;
107                         }
108                     }
109 
110                     String browser_id=findBrowserId(request);
111                     if (browser_id != null && (client.getBrowserId() == null || !client.getBrowserId().equals(browser_id)))
112                         client.setBrowserId(browser_id);
113 
114                     // resolve transport
115                     if (transport == null)
116                     {
117                         transport=_bayeux.newTransport(client,message);
118                         transport.setResponse(response);
119                         metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
120                     }
121 
122                     // Tell client to hold messages as a response is likely to
123                     // be sent.
124                     if (!metaConnectDeliveryOnly && !pendingResponse)
125                     {
126                         pendingResponse=true;
127                         client.responsePending();
128                     }
129 
130                     if (Bayeux.META_CONNECT.equals(message.getChannel()))
131                         metaConnect=true;
132 
133                     _bayeux.handle(client,transport,message);
134                 }
135             }
136             finally
137             {
138                 for (Message message : messages)
139                     ((MessageImpl)message).decRef();
140                 if (pendingResponse)
141                 {
142                     client.responded();
143                 }
144             }
145         }
146 
147         Message metaConnectReply=null;
148 
149         // Do we need to wait for messages
150         if (transport != null)
151         {
152             metaConnectReply=transport.getMetaConnectReply();
153             if (metaConnectReply != null)
154             {
155                 long timeout=client.getTimeout();
156                 if (timeout == 0)
157                     timeout=_bayeux.getTimeout();
158 
159                 Continuation continuation=ContinuationSupport.getContinuation(request,client);
160 
161                 // Get messages or wait
162                 synchronized(client)
163                 {
164                     if (!client.hasNonLazyMessages() && !continuation.isPending() && received <= 1)
165                     {
166                         // save state and suspend
167                         client.setContinuation(continuation);
168                         request.setAttribute(CLIENT_ATTR,client);
169                         request.setAttribute(TRANSPORT_ATTR,transport);
170                         continuation.suspend(timeout);
171                     }
172 
173                     if (!continuation.isPending())
174                         client.access();
175 
176                     continuation.reset();
177                 }
178 
179                 client.setContinuation(null);
180                 transport.setMetaConnectReply(null);
181             }
182             else if (client != null)
183             {
184                 client.access();
185             }
186         }
187 
188         if (client != null)
189         {
190             if (metaConnectDeliveryOnly && !metaConnect)
191             {
192                 // wake up any long poll
193                 client.resume();
194             }
195             else
196             {
197                 // Send any queued messages.
198                 synchronized(client)
199                 {
200                     client.doDeliverListeners();
201 
202                     final ArrayQueue<Message> messages=(ArrayQueue)client.getQueue();
203                     final int size=messages.size();
204 
205                     try
206                     {
207                         for (int i=0; i < size; i++)
208                         {
209                             final Message message=messages.getUnsafe(i);
210                             final MessageImpl mesgImpl=(message instanceof MessageImpl)?(MessageImpl)message:null;
211 
212                             // Can we short cut the message?
213                             if (i == 0 && size == 1 && mesgImpl != null && _refsThreshold > 0 && metaConnectReply != null && transport instanceof JSONTransport)
214                             {
215                                 // is there a response already prepared
216                                 ByteBuffer buffer=mesgImpl.getBuffer();
217                                 if (buffer != null)
218                                 {
219                                     // Send pre-prepared buffer
220                                     request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
221                                     if (metaConnectReply instanceof MessageImpl)
222                                         ((MessageImpl)metaConnectReply).decRef();
223                                     metaConnectReply=null;
224                                     transport=null;
225                                     mesgImpl.decRef();
226                                     continue;
227                                 }
228                                 else if (mesgImpl.getRefs() >= _refsThreshold)
229                                 {
230                                     // create multi-use buffer
231                                     byte[] contentBytes=("[" + mesgImpl.getJSON() + ",{\"" + Bayeux.SUCCESSFUL_FIELD + "\":true,\"" + Bayeux.CHANNEL_FIELD
232                                             + "\":\"" + Bayeux.META_CONNECT + "\"}]").getBytes(StringUtil.__UTF8);
233                                     int contentLength=contentBytes.length;
234 
235                                     String headerString="HTTP/1.1 200 OK\r\n" + "Content-Type: text/json; charset=utf-8\r\n" + "Content-Length: "
236                                             + contentLength + "\r\n" + "\r\n";
237 
238                                     byte[] headerBytes=headerString.getBytes(StringUtil.__UTF8);
239 
240                                     buffer=ByteBuffer.allocateDirect(headerBytes.length + contentLength);
241                                     buffer.put(headerBytes);
242                                     buffer.put(contentBytes);
243                                     buffer.flip();
244 
245                                     mesgImpl.setBuffer(buffer);
246                                     request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
247                                     metaConnectReply=null;
248                                     if (metaConnectReply instanceof MessageImpl)
249                                         ((MessageImpl)metaConnectReply).decRef();
250                                     transport=null;
251                                     mesgImpl.decRef();
252                                     continue;
253                                 }
254                             }
255 
256                             if (message != null)
257                                 transport.send(message);
258                             if (mesgImpl != null)
259                                 mesgImpl.decRef();
260                         }
261                     }
262                     finally
263                     {
264                         messages.clear();
265                     }
266                 }
267 
268                 if (metaConnectReply != null)
269                 {
270                     metaConnectReply=_bayeux.extendSendMeta(client,metaConnectReply);
271                     transport.send(metaConnectReply);
272                     if (metaConnectReply instanceof MessageImpl)
273                         ((MessageImpl)metaConnectReply).decRef();
274                 }
275             }
276         }
277 
278         if (transport != null)
279             transport.complete();
280     }
281 
282     public void destroy()
283     {
284         if (_bayeux != null)
285             ((ContinuationBayeux)_bayeux).destroy();
286     }
287 }