1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.io.IOException;
18 import java.security.SecureRandom;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import javax.servlet.ServletContext;
31 import javax.servlet.http.HttpServletRequest;
32
33 import org.cometd.Bayeux;
34 import org.cometd.BayeuxListener;
35 import org.cometd.Channel;
36 import org.cometd.ChannelBayeuxListener;
37 import org.cometd.Client;
38 import org.cometd.ClientBayeuxListener;
39 import org.cometd.Extension;
40 import org.cometd.Message;
41 import org.cometd.SecurityPolicy;
42 import org.mortbay.util.LazyList;
43 import org.mortbay.util.ajax.JSON;
44
45
46
47
48
49
50
51 public abstract class AbstractBayeux extends MessagePool implements Bayeux
52 {
53 public static final ChannelId META_ID=new ChannelId(META);
54 public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
55 public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
56 public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
57 public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
58 public static final ChannelId META_PING_ID=new ChannelId(META_PING);
59 public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
60 public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
61 public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
62
63 private HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
64
65 private ChannelImpl _root=new ChannelImpl("/",this);
66 private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
67 protected SecurityPolicy _securityPolicy=new DefaultPolicy();
68 protected JSON.Literal _advice;
69 protected JSON.Literal _multiFrameAdvice;
70 protected int _adviceVersion=0;
71 protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
72 protected int _logLevel;
73 protected long _timeout=30000;
74 protected long _interval=0;
75 protected long _maxInterval=10000;
76 protected boolean _initialized;
77 protected ConcurrentHashMap<String,List<String>> _browser2client=new ConcurrentHashMap<String,List<String>>();
78 protected int _multiFrameInterval=-1;
79
80 protected boolean _requestAvailable;
81 protected ThreadLocal<HttpServletRequest> _request=new ThreadLocal<HttpServletRequest>();
82
83 transient ServletContext _context;
84 transient Random _random;
85 transient ConcurrentHashMap<String,ChannelId> _channelIdCache;
86 protected Handler _publishHandler;
87 protected Handler _metaPublishHandler;
88 protected int _maxClientQueue=-1;
89
90 protected Extension[] _extensions;
91 protected JSON.Literal _transports=new JSON.Literal("[\"" + Bayeux.TRANSPORT_LONG_POLL + "\",\"" + Bayeux.TRANSPORT_CALLBACK_POLL + "\"]");
92 protected JSON.Literal _replyExt=new JSON.Literal("{\"ack\":\"true\"}");
93 protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
94 protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
95
96 protected int _maxLazyLatency=5000;
97
98
99
100
101
102 protected AbstractBayeux()
103 {
104 _publishHandler=new PublishHandler();
105 _metaPublishHandler=new MetaPublishHandler();
106 _handlers.put(META_HANDSHAKE,new HandshakeHandler());
107 _handlers.put(META_CONNECT,new ConnectHandler());
108 _handlers.put(META_DISCONNECT,new DisconnectHandler());
109 _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
110 _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
111 _handlers.put(META_PING,new PingHandler());
112
113 setTimeout(getTimeout());
114 }
115
116
117 public void addExtension(Extension ext)
118 {
119 _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
120 }
121
122
123
124
125
126
127 public ChannelImpl getChannel(ChannelId id)
128 {
129 return _root.getChild(id);
130 }
131
132
133 public ChannelImpl getChannel(String id)
134 {
135 ChannelId cid=getChannelId(id);
136 if (cid.depth() == 0)
137 return null;
138 return _root.getChild(cid);
139 }
140
141
142 public Channel getChannel(String id, boolean create)
143 {
144 synchronized(this)
145 {
146 ChannelImpl channel=getChannel(id);
147
148 if (channel == null && create)
149 {
150 channel=new ChannelImpl(id,this);
151 _root.addChild(channel);
152
153 if (isLogInfo())
154 logInfo("newChannel: " + channel);
155 }
156 return channel;
157 }
158 }
159
160
161 public ChannelId getChannelId(String id)
162 {
163 ChannelId cid=_channelIdCache.get(id);
164 if (cid == null)
165 {
166
167 cid=new ChannelId(id);
168 _channelIdCache.put(id,cid);
169 }
170 return cid;
171 }
172
173
174
175
176
177
178
179 public Client getClient(String client_id)
180 {
181 synchronized(this)
182 {
183 if (client_id == null)
184 return null;
185 Client client=_clients.get(client_id);
186 return client;
187 }
188 }
189
190
191 public Set<String> getClientIDs()
192 {
193 return _clients.keySet();
194 }
195
196
197
198
199
200
201 public long getMaxInterval()
202 {
203 return _maxInterval;
204 }
205
206
207
208
209
210 public int getLogLevel()
211 {
212 return _logLevel;
213 }
214
215
216
217
218
219
220
221 public SecurityPolicy getSecurityPolicy()
222 {
223 return _securityPolicy;
224 }
225
226
227 public long getTimeout()
228 {
229 return _timeout;
230 }
231
232
233 public long getInterval()
234 {
235 return _interval;
236 }
237
238
239
240
241
242
243
244 public boolean isDirectDeliver()
245 {
246 return false;
247 }
248
249
250
251
252
253
254
255
256
257 public void setDirectDeliver(boolean directDeliver)
258 {
259 _context.log("directDeliver is deprecated");
260 }
261
262
263
264
265
266
267
268
269
270
271
272
273
274 public String handle(ClientImpl client, Transport transport, Message message) throws IOException
275 {
276 String channel_id=message.getChannel();
277
278 Handler handler=(Handler)_handlers.get(channel_id);
279 if (handler != null)
280 {
281 message=extendRcvMeta(client,message);
282 handler.handle(client,transport,message);
283 _metaPublishHandler.handle(client,transport,message);
284 }
285 else if (channel_id.startsWith(META_SLASH))
286 {
287 message=extendRcvMeta(client,message);
288 _metaPublishHandler.handle(client,transport,message);
289 }
290 else
291 {
292
293 handler=_publishHandler;
294 message=extendRcv(client,message);
295 handler.handle(client,transport,message);
296 }
297
298 return channel_id;
299 }
300
301
302 public boolean hasChannel(String id)
303 {
304 ChannelId cid=getChannelId(id);
305 return _root.getChild(cid) != null;
306 }
307
308
309 public boolean isInitialized()
310 {
311 return _initialized;
312 }
313
314
315
316
317
318
319 public boolean isJSONCommented()
320 {
321 return false;
322 }
323
324
325 public boolean isLogDebug()
326 {
327 return _logLevel > 1;
328 }
329
330
331 public boolean isLogInfo()
332 {
333 return _logLevel > 0;
334 }
335
336
337 public void logDebug(String message)
338 {
339 if (_logLevel > 1)
340 _context.log(message);
341 }
342
343
344 public void logDebug(String message, Throwable th)
345 {
346 if (_logLevel > 1)
347 _context.log(message,th);
348 }
349
350
351 public void logWarn(String message, Throwable th)
352 {
353 _context.log(message + ": " + th.toString());
354 }
355
356
357 public void logWarn(String message)
358 {
359 _context.log(message);
360 }
361
362
363 public void logInfo(String message)
364 {
365 if (_logLevel > 0)
366 _context.log(message);
367 }
368
369
370 public Client newClient(String idPrefix)
371 {
372 ClientImpl client=new ClientImpl(this,idPrefix);
373 return client;
374 }
375
376
377 public abstract ClientImpl newRemoteClient();
378
379
380
381
382
383
384
385
386
387
388
389 public Transport newTransport(ClientImpl client, Map<?,?> message)
390 {
391 if (isLogDebug())
392 logDebug("newTransport: client=" + client + ",message=" + message);
393
394 Transport result;
395
396 String type = client == null ? null : client.getConnectionType();
397 if (type == null)
398 {
399
400 type = (String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
401 }
402 if (type == null)
403 {
404
405 Object types = message.get(Bayeux.SUPPORTED_CONNECTION_TYPES_FIELD);
406 if (types != null)
407 {
408 List supportedTypes;
409 if (types instanceof Object[]) supportedTypes = Arrays.asList((Object[])types);
410 else if (types instanceof List) supportedTypes = (List)types;
411 else if (types instanceof Map) supportedTypes = new ArrayList(((Map)types).values());
412 else supportedTypes = Collections.emptyList();
413
414 if (supportedTypes.contains(Bayeux.TRANSPORT_LONG_POLL)) type = Bayeux.TRANSPORT_LONG_POLL;
415 else if (supportedTypes.contains(Bayeux.TRANSPORT_CALLBACK_POLL)) type = Bayeux.TRANSPORT_CALLBACK_POLL;
416 }
417 }
418 if (type == null)
419 {
420
421 String jsonp = (String) message.get(Bayeux.JSONP_PARAMETER);
422 type = jsonp != null ? Bayeux.TRANSPORT_CALLBACK_POLL : Bayeux.TRANSPORT_LONG_POLL;
423 }
424
425 if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type))
426 {
427 String jsonp = (String)message.get(Bayeux.JSONP_PARAMETER);
428 if (jsonp == null) throw new IllegalArgumentException("Missing 'jsonp' field in message " + message + " for transport " + type);
429 result = new JSONPTransport(jsonp);
430 }
431 else if (Bayeux.TRANSPORT_LONG_POLL.equals(type))
432 {
433 result = new JSONTransport();
434 }
435 else
436 {
437 throw new IllegalArgumentException("Unsupported transport type " + type);
438 }
439
440 if (isLogDebug())
441 logDebug("newTransport: result="+result);
442
443 return result;
444 }
445
446
447
448
449
450
451
452
453
454
455
456 protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
457 {
458 final MessageImpl message=newMessage();
459 message.put(CHANNEL_FIELD,to.toString());
460
461 if (msgId == null)
462 {
463 long id=message.hashCode() ^ (to == null?0:to.hashCode()) ^ (from == null?0:from.hashCode());
464 id=id < 0?-id:id;
465 message.put(ID_FIELD,Long.toString(id,36));
466 }
467 else
468 message.put(ID_FIELD,msgId);
469 message.put(DATA_FIELD,data);
470
471 message.setLazy(lazy);
472
473 final Message m=extendSendBayeux(from,message);
474
475 if (m != null)
476 _root.doDelivery(to,from,m);
477 if (m instanceof MessageImpl)
478 ((MessageImpl)m).decRef();
479 }
480
481
482 public boolean removeChannel(ChannelImpl channel)
483 {
484 return _root.doRemove(channel,_channelListeners);
485 }
486
487
488 public void addChannel(ChannelImpl channel)
489 {
490 for (ChannelBayeuxListener l : _channelListeners)
491 l.channelAdded(channel);
492 }
493
494
495 protected String newClientId(long variation, String idPrefix)
496 {
497 if (idPrefix == null)
498 return Long.toString(getRandom(),36) + Long.toString(variation,36);
499 else
500 return idPrefix + "_" + Long.toString(getRandom(),36);
501 }
502
503
504 protected void addClient(ClientImpl client, String idPrefix)
505 {
506 while(true)
507 {
508 String id=newClientId(client.hashCode(),idPrefix);
509 client.setId(id);
510
511 ClientImpl other=_clients.putIfAbsent(id,client);
512 if (other == null)
513 {
514 for (ClientBayeuxListener l : _clientListeners)
515 l.clientAdded((Client)client);
516
517 return;
518 }
519 }
520 }
521
522
523
524
525
526
527
528 public Client removeClient(String client_id)
529 {
530 ClientImpl client;
531 synchronized(this)
532 {
533 if (client_id == null)
534 return null;
535 client=_clients.remove(client_id);
536 }
537 if (client != null)
538 {
539 for (ClientBayeuxListener l : _clientListeners)
540 l.clientRemoved((Client)client);
541 client.unsubscribeAll();
542 }
543 return client;
544 }
545
546
547
548
549
550
551
552 public void setMaxInterval(long ms)
553 {
554 _maxInterval=ms;
555 }
556
557
558
559
560
561 public void setJSONCommented(boolean commented)
562 {
563 if (commented)
564 _context.log("JSONCommented is deprecated");
565 }
566
567
568
569
570
571
572 public void setLogLevel(int logLevel)
573 {
574 _logLevel=logLevel;
575 }
576
577
578
579
580
581
582
583
584
585 public void setSecurityPolicy(SecurityPolicy securityPolicy)
586 {
587 _securityPolicy=securityPolicy;
588 }
589
590
591 public void setTimeout(long ms)
592 {
593 _timeout=ms;
594 generateAdvice();
595 }
596
597
598 public void setInterval(long ms)
599 {
600 _interval=ms;
601 generateAdvice();
602 }
603
604
605
606
607
608
609
610
611
612
613 public void setMultiFrameInterval(int multiFrameInterval)
614 {
615 _multiFrameInterval=multiFrameInterval;
616 generateAdvice();
617 }
618
619
620
621
622
623 public int getMultiFrameInterval()
624 {
625 return _multiFrameInterval;
626 }
627
628
629 void generateAdvice()
630 {
631 setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":" + getInterval() + ",\"timeout\":" + getTimeout() + "}"));
632 }
633
634
635 public void setAdvice(JSON.Literal advice)
636 {
637 synchronized(this)
638 {
639 _adviceVersion++;
640 _advice=advice;
641 _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
642 }
643 }
644
645
646 private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
647 {
648 Map<String,Object> a=(Map<String,Object>)JSON.parse(_advice.toString());
649 a.put("multiple-clients",Boolean.TRUE);
650 if (_multiFrameInterval > 0)
651 {
652 a.put("reconnect","retry");
653 a.put("interval",_multiFrameInterval);
654 }
655 else
656 a.put("reconnect","none");
657 return a;
658 }
659
660
661 public JSON.Literal getAdvice()
662 {
663 return _advice;
664 }
665
666
667
668
669
670
671 public boolean isRequestAvailable()
672 {
673 return _requestAvailable;
674 }
675
676
677
678
679
680
681
682 public void setRequestAvailable(boolean requestAvailable)
683 {
684 _requestAvailable=requestAvailable;
685 }
686
687
688
689
690
691
692 public HttpServletRequest getCurrentRequest()
693 {
694 return _request.get();
695 }
696
697
698
699
700
701
702 void setCurrentRequest(HttpServletRequest request)
703 {
704 _request.set(request);
705 }
706
707
708 public Collection<Channel> getChannels()
709 {
710 List<Channel> channels=new ArrayList<Channel>();
711 _root.getChannels(channels);
712 return channels;
713 }
714
715
716
717
718
719 public int getChannelCount()
720 {
721 return _root.getChannelCount();
722 }
723
724
725 public Collection<Client> getClients()
726 {
727 synchronized(this)
728 {
729 return new ArrayList<Client>(_clients.values());
730 }
731 }
732
733
734
735
736
737 public int getClientCount()
738 {
739 synchronized(this)
740 {
741 return _clients.size();
742 }
743 }
744
745
746 public boolean hasClient(String clientId)
747 {
748 synchronized(this)
749 {
750 if (clientId == null)
751 return false;
752 return _clients.containsKey(clientId);
753 }
754 }
755
756
757 public Channel removeChannel(String channelId)
758 {
759 Channel channel=getChannel(channelId);
760
761 boolean removed=false;
762 if (channel != null)
763 removed=channel.remove();
764
765 if (removed)
766 return channel;
767 else
768 return null;
769 }
770
771
772 protected void initialize(ServletContext context)
773 {
774 synchronized(this)
775 {
776 _initialized=true;
777 _context=context;
778 try
779 {
780 _random=SecureRandom.getInstance("SHA1PRNG");
781 }
782 catch(Exception e)
783 {
784 context.log("Could not get secure random for ID generation",e);
785 _random=new Random();
786 }
787 _random.setSeed(_random.nextLong() ^ hashCode() ^ System.nanoTime() ^ Runtime.getRuntime().freeMemory());
788 _channelIdCache=new ConcurrentHashMap<String,ChannelId>();
789
790 _root.addChild(new ServiceChannel(Bayeux.SERVICE));
791
792 }
793 }
794
795
796 long getRandom()
797 {
798 long l=_random.nextLong();
799 return l < 0?-l:l;
800 }
801
802
803 void clientOnBrowser(String browserId, String clientId)
804 {
805 List<String> clients=_browser2client.get(browserId);
806 if (clients == null)
807 {
808 List<String> new_clients=new CopyOnWriteArrayList<String>();
809 clients=_browser2client.putIfAbsent(browserId,new_clients);
810 if (clients == null)
811 clients=new_clients;
812 }
813 clients.add(clientId);
814 }
815
816
817 void clientOffBrowser(String browserId, String clientId)
818 {
819 List<String> clients=_browser2client.get(browserId);
820
821 if (clients != null)
822 clients.remove(clientId);
823 }
824
825
826 List<String> clientsOnBrowser(String browserId)
827 {
828 List<String> clients=_browser2client.get(browserId);
829
830 if (clients == null)
831 return Collections.emptyList();
832 return clients;
833 }
834
835
836 public void addListener(BayeuxListener listener)
837 {
838 if (listener instanceof ClientBayeuxListener)
839 _clientListeners.add((ClientBayeuxListener)listener);
840 if (listener instanceof ChannelBayeuxListener)
841 _channelListeners.add((ChannelBayeuxListener)listener);
842 }
843
844
845 public int getMaxClientQueue()
846 {
847 return _maxClientQueue;
848 }
849
850
851 public void setMaxClientQueue(int size)
852 {
853 _maxClientQueue=size;
854 }
855
856
857 protected Message extendRcv(ClientImpl from, Message message)
858 {
859 if (_extensions != null)
860 {
861 for (int i=_extensions.length; message != null && i-- > 0;)
862 message=_extensions[i].rcv(from,message);
863 }
864
865 if (from != null)
866 {
867 Extension[] client_exs=from.getExtensions();
868 if (client_exs != null)
869 {
870 for (int i=client_exs.length; message != null && i-- > 0;)
871 message=client_exs[i].rcv(from,message);
872 }
873 }
874
875 return message;
876 }
877
878
879 protected Message extendRcvMeta(ClientImpl from, Message message)
880 {
881 if (_extensions != null)
882 {
883 for (int i=_extensions.length; message != null && i-- > 0;)
884 message=_extensions[i].rcvMeta(from,message);
885 }
886
887 if (from != null)
888 {
889 Extension[] client_exs=from.getExtensions();
890 if (client_exs != null)
891 {
892 for (int i=client_exs.length; message != null && i-- > 0;)
893 message=client_exs[i].rcvMeta(from,message);
894 }
895 }
896 return message;
897 }
898
899
900 protected Message extendSendBayeux(Client from, Message message)
901 {
902 if (_extensions != null)
903 {
904 for (int i=0; message != null && i < _extensions.length; i++)
905 {
906 message=_extensions[i].send(from,message);
907 }
908 }
909
910 return message;
911 }
912
913
914 public Message extendSendClient(Client from, ClientImpl to, Message message)
915 {
916 if (to != null)
917 {
918 Extension[] client_exs=to.getExtensions();
919 if (client_exs != null)
920 {
921 for (int i=0; message != null && i < client_exs.length; i++)
922 message=client_exs[i].send(from,message);
923 }
924 }
925
926 return message;
927 }
928
929
930 public Message extendSendMeta(ClientImpl from, Message message)
931 {
932 if (_extensions != null)
933 {
934 for (int i=0; message != null && i < _extensions.length; i++)
935 message=_extensions[i].sendMeta(from,message);
936 }
937
938 if (from != null)
939 {
940 Extension[] client_exs=from.getExtensions();
941 if (client_exs != null)
942 {
943 for (int i=0; message != null && i < client_exs.length; i++)
944 message=client_exs[i].sendMeta(from,message);
945 }
946 }
947
948 return message;
949 }
950
951
952
953
954
955
956 public int getMaxLazyLatency()
957 {
958 return _maxLazyLatency;
959 }
960
961
962
963
964
965
966 public void setMaxLazyLatency(int ms)
967 {
968 _maxLazyLatency = ms;
969 }
970
971
972
973 public static class DefaultPolicy implements SecurityPolicy
974 {
975 public boolean canHandshake(Message message)
976 {
977 return true;
978 }
979
980 public boolean canCreate(Client client, String channel, Message message)
981 {
982 return client != null && !channel.startsWith(Bayeux.META_SLASH);
983 }
984
985 public boolean canSubscribe(Client client, String channel, Message message)
986 {
987 if (client != null && ("/**".equals(channel) || "/*".equals(channel)))
988 return false;
989 return client != null && !channel.startsWith(Bayeux.META_SLASH);
990 }
991
992 public boolean canPublish(Client client, String channel, Message message)
993 {
994 return client != null || client == null && Bayeux.META_HANDSHAKE.equals(channel);
995 }
996
997 }
998
999
1000
1001 protected abstract class Handler
1002 {
1003 abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
1004
1005 abstract ChannelId getMetaChannelId();
1006
1007 void unknownClient(Transport transport, String channel) throws IOException
1008 {
1009 MessageImpl reply=newMessage();
1010
1011 reply.put(CHANNEL_FIELD,channel);
1012 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1013 reply.put(ERROR_FIELD,"402::Unknown client");
1014 reply.put("advice",_handshakeAdvice);
1015 transport.send(reply);
1016 reply.decRef();
1017 }
1018
1019 void sendMetaReply(final ClientImpl client, Message reply, final Transport transport) throws IOException
1020 {
1021 reply=extendSendMeta(client,reply);
1022 if (reply != null)
1023 {
1024 transport.send(reply);
1025 if (reply instanceof MessageImpl)
1026 ((MessageImpl)reply).decRef();
1027 }
1028 }
1029 }
1030
1031
1032
1033 protected class ConnectHandler extends Handler
1034 {
1035 protected String _metaChannel=META_CONNECT;
1036
1037 @Override
1038 ChannelId getMetaChannelId()
1039 {
1040 return META_CONNECT_ID;
1041 }
1042
1043 @Override
1044 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1045 {
1046 if (client == null)
1047 {
1048 unknownClient(transport,_metaChannel);
1049 return;
1050 }
1051
1052
1053 String type=client.getConnectionType();
1054 boolean polling=true;
1055 if (type == null)
1056 {
1057 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
1058 client.setConnectionType(type);
1059 polling=false;
1060 }
1061
1062 Object advice=message.get(ADVICE_FIELD);
1063 if (advice != null)
1064 {
1065 Long timeout=(Long)((Map)advice).get("timeout");
1066 if (timeout != null && timeout.longValue() > 0)
1067 client.setTimeout(timeout.longValue());
1068 else
1069 client.setTimeout(0);
1070
1071 Long interval=(Long)((Map)advice).get("interval");
1072 if (interval != null && interval.longValue() > 0)
1073 client.setInterval(interval.longValue());
1074 else
1075 client.setInterval(0);
1076 }
1077 else
1078 {
1079 client.setTimeout(0);
1080 client.setInterval(0);
1081 }
1082
1083 advice=null;
1084
1085
1086 if (polling && _multiFrameInterval > 0 && client.getBrowserId() != null)
1087 {
1088 List<String> clients=clientsOnBrowser(client.getBrowserId());
1089 int count=clients.size();
1090 if (count > 1)
1091 {
1092 polling=clients.get(0).equals(client.getId());
1093 advice=client.getAdvice();
1094 if (advice == null)
1095 advice=_multiFrameAdvice;
1096 else
1097
1098 advice=multiFrameAdvice((JSON.Literal)advice);
1099 }
1100 }
1101
1102 synchronized(this)
1103 {
1104 if (advice == null)
1105 {
1106 if (_adviceVersion != client._adviseVersion)
1107 {
1108 advice=_advice;
1109 client._adviseVersion=_adviceVersion;
1110 }
1111 }
1112 else
1113 client._adviseVersion=-1;
1114 }
1115
1116
1117 String id=message.getId();
1118
1119 Message reply=newMessage(message);
1120
1121 reply.put(CHANNEL_FIELD,META_CONNECT);
1122 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1123 if (advice != null)
1124 reply.put(ADVICE_FIELD,advice);
1125 if (id != null)
1126 reply.put(ID_FIELD,id);
1127
1128 if (polling)
1129 transport.setMetaConnectReply(reply);
1130 else
1131 sendMetaReply(client,reply,transport);
1132 }
1133 }
1134
1135
1136
1137 protected class DisconnectHandler extends Handler
1138 {
1139 @Override
1140 ChannelId getMetaChannelId()
1141 {
1142 return META_DISCONNECT_ID;
1143 }
1144
1145 @Override
1146 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1147 {
1148 if (client == null)
1149 {
1150 unknownClient(transport,META_DISCONNECT);
1151 return;
1152 }
1153 if (isLogInfo())
1154 logInfo("Disconnect " + client.getId());
1155
1156 client.remove(false);
1157
1158 Message reply=newMessage(message);
1159 reply.put(CHANNEL_FIELD,META_DISCONNECT);
1160 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1161 String id=message.getId();
1162 if (id != null)
1163 reply.put(ID_FIELD,id);
1164
1165 reply=extendSendMeta(client,reply);
1166
1167 Message pollReply=transport.getMetaConnectReply();
1168 if (pollReply != null)
1169 {
1170 transport.setMetaConnectReply(null);
1171 sendMetaReply(client,pollReply,transport);
1172 }
1173 sendMetaReply(client,reply,transport);
1174 }
1175 }
1176
1177
1178
1179 protected class HandshakeHandler extends Handler
1180 {
1181 @Override
1182 ChannelId getMetaChannelId()
1183 {
1184 return META_HANDSHAKE_ID;
1185 }
1186
1187 @Override
1188 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1189 {
1190 if (client != null)
1191 throw new IllegalStateException();
1192
1193 if (_securityPolicy != null && !_securityPolicy.canHandshake(message))
1194 {
1195 Message reply=newMessage(message);
1196 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1197 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1198 reply.put(ERROR_FIELD,"403::Handshake denied");
1199
1200 sendMetaReply(client,reply,transport);
1201 return;
1202 }
1203
1204 client=newRemoteClient();
1205
1206 Message reply=newMessage(message);
1207 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1208 reply.put(VERSION_FIELD,"1.0");
1209 reply.put(MIN_VERSION_FIELD,"0.9");
1210
1211 if (client != null)
1212 {
1213 reply.put(SUPPORTED_CONNECTION_TYPES_FIELD,_transports);
1214 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1215 reply.put(CLIENT_FIELD,client.getId());
1216 if (_advice != null)
1217 reply.put(ADVICE_FIELD,_advice);
1218 }
1219 else
1220 {
1221 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1222 if (_advice != null)
1223 reply.put(ADVICE_FIELD,_advice);
1224 }
1225
1226 if (isLogDebug())
1227 logDebug("handshake.handle: reply=" + reply);
1228
1229 String id=message.getId();
1230 if (id != null)
1231 reply.put(ID_FIELD,id);
1232
1233 sendMetaReply(client,reply,transport);
1234 }
1235 }
1236
1237
1238
1239 protected class PublishHandler extends Handler
1240 {
1241 @Override
1242 ChannelId getMetaChannelId()
1243 {
1244 return null;
1245 }
1246
1247 @Override
1248 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1249 {
1250 String channel_id=message.getChannel();
1251
1252 if (client == null && message.containsKey(CLIENT_FIELD))
1253 {
1254 unknownClient(transport,channel_id);
1255 return;
1256 }
1257
1258 String id=message.getId();
1259
1260 ChannelId cid=getChannelId(channel_id);
1261 Object data=message.get(Bayeux.DATA_FIELD);
1262
1263 Message reply=newMessage(message);
1264 reply.put(CHANNEL_FIELD,channel_id);
1265 if (id != null)
1266 reply.put(ID_FIELD,id);
1267
1268 if (data == null)
1269 {
1270 message=null;
1271 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1272 reply.put(ERROR_FIELD,"403::No data");
1273 }
1274 else if (!_securityPolicy.canPublish(client,channel_id,message))
1275 {
1276 message=null;
1277 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1278 reply.put(ERROR_FIELD,"403::Publish denied");
1279 }
1280 else
1281 {
1282 message.remove(CLIENT_FIELD);
1283 message=extendSendBayeux(client,message);
1284
1285 if (message != null)
1286 {
1287 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1288 }
1289 else
1290 {
1291 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1292 reply.put(ERROR_FIELD,"404::Message deleted");
1293 }
1294 }
1295
1296 sendMetaReply(client,reply,transport);
1297
1298 if (message != null)
1299 _root.doDelivery(cid,client,message);
1300 }
1301 }
1302
1303
1304
1305 protected class MetaPublishHandler extends Handler
1306 {
1307 @Override
1308 ChannelId getMetaChannelId()
1309 {
1310 return null;
1311 }
1312
1313 @Override
1314 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1315 {
1316 String channel_id=message.getChannel();
1317
1318 if (client == null && !META_HANDSHAKE.equals(channel_id))
1319 {
1320
1321 return;
1322 }
1323
1324 if (_securityPolicy.canPublish(client,channel_id,message))
1325 {
1326 _root.doDelivery(getChannelId(channel_id),client,message);
1327 }
1328 }
1329 }
1330
1331
1332
1333 protected class SubscribeHandler extends Handler
1334 {
1335 @Override
1336 ChannelId getMetaChannelId()
1337 {
1338 return META_SUBSCRIBE_ID;
1339 }
1340
1341 @Override
1342 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1343 {
1344 if (client == null)
1345 {
1346 unknownClient(transport,META_SUBSCRIBE);
1347 return;
1348 }
1349
1350 String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1351
1352
1353 if (subscribe_id == null)
1354 {
1355 subscribe_id=Long.toString(getRandom(),36);
1356 while(getChannel(subscribe_id) != null)
1357 subscribe_id=Long.toString(getRandom(),36);
1358 }
1359
1360 ChannelId cid=null;
1361 boolean can_subscribe=false;
1362
1363 if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1364 {
1365 can_subscribe=true;
1366 }
1367 else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1368 {
1369 can_subscribe=false;
1370 }
1371 else
1372 {
1373 cid=getChannelId(subscribe_id);
1374 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1375 }
1376
1377 Message reply=newMessage(message);
1378 reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1379 reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1380
1381 if (can_subscribe)
1382 {
1383 if (cid != null)
1384 {
1385 ChannelImpl channel=getChannel(cid);
1386 if (channel == null && _securityPolicy.canCreate(client,subscribe_id,message))
1387 channel=(ChannelImpl)getChannel(subscribe_id,true);
1388
1389 if (channel != null)
1390 channel.subscribe(client);
1391 else
1392 can_subscribe=false;
1393 }
1394
1395 if (can_subscribe)
1396 {
1397 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1398 }
1399 else
1400 {
1401 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1402 reply.put(ERROR_FIELD,"403::cannot create");
1403 }
1404 }
1405 else
1406 {
1407 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1408 reply.put(ERROR_FIELD,"403::cannot subscribe");
1409
1410 }
1411
1412 String id=message.getId();
1413 if (id != null)
1414 reply.put(ID_FIELD,id);
1415
1416 sendMetaReply(client,reply,transport);
1417 }
1418 }
1419
1420
1421
1422 protected class UnsubscribeHandler extends Handler
1423 {
1424 @Override
1425 ChannelId getMetaChannelId()
1426 {
1427 return META_UNSUBSCRIBE_ID;
1428 }
1429
1430 @Override
1431 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1432 {
1433 if (client == null)
1434 {
1435 unknownClient(transport,META_UNSUBSCRIBE);
1436 return;
1437 }
1438
1439 String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1440 ChannelImpl channel=getChannel(channel_id);
1441 if (channel != null)
1442 channel.unsubscribe(client);
1443
1444 Message reply=newMessage(message);
1445 reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1446 reply.put(SUBSCRIPTION_FIELD,channel_id);
1447 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1448
1449 String id=message.getId();
1450 if (id != null)
1451 reply.put(ID_FIELD,id);
1452
1453 sendMetaReply(client,reply,transport);
1454 }
1455 }
1456
1457
1458
1459 protected class PingHandler extends Handler
1460 {
1461 @Override
1462 ChannelId getMetaChannelId()
1463 {
1464 return META_PING_ID;
1465 }
1466
1467 @Override
1468 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1469 {
1470 Message reply=newMessage(message);
1471 reply.put(CHANNEL_FIELD,META_PING);
1472 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1473
1474 String id=message.getId();
1475 if (id != null)
1476 reply.put(ID_FIELD,id);
1477
1478 sendMetaReply(client,reply,transport);
1479 }
1480 }
1481
1482
1483
1484 protected class ServiceChannel extends ChannelImpl
1485 {
1486 ServiceChannel(String id)
1487 {
1488 super(id,AbstractBayeux.this);
1489 }
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499 @Override
1500 public void addChild(ChannelImpl channel)
1501 {
1502 super.addChild(channel);
1503 setPersistent(true);
1504 }
1505
1506
1507 @Override
1508 public void subscribe(Client client)
1509 {
1510 if (client.isLocal())
1511 super.subscribe(client);
1512 }
1513
1514 }
1515 }