1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.client;
16
17 import java.io.IOException;
18 import java.net.URLEncoder;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Queue;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import javax.servlet.http.Cookie;
29
30 import org.cometd.Bayeux;
31 import org.cometd.Client;
32 import org.cometd.ClientListener;
33 import org.cometd.Extension;
34 import org.cometd.Message;
35 import org.cometd.MessageListener;
36 import org.cometd.RemoveListener;
37 import org.mortbay.cometd.MessageImpl;
38 import org.mortbay.cometd.MessagePool;
39 import org.mortbay.component.AbstractLifeCycle;
40 import org.mortbay.io.Buffer;
41 import org.mortbay.io.ByteArrayBuffer;
42 import org.mortbay.jetty.HttpHeaders;
43 import org.mortbay.jetty.HttpSchemes;
44 import org.mortbay.jetty.HttpURI;
45 import org.mortbay.jetty.client.Address;
46 import org.mortbay.jetty.client.ContentExchange;
47 import org.mortbay.jetty.client.HttpClient;
48 import org.mortbay.jetty.client.HttpExchange;
49 import org.mortbay.log.Log;
50 import org.mortbay.util.ArrayQueue;
51 import org.mortbay.util.LazyList;
52 import org.mortbay.util.QuotedStringTokenizer;
53 import org.mortbay.util.ajax.JSON;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public class BayeuxClient extends AbstractLifeCycle implements Client
69 {
70 private final static String __TIMER="org.mortbay.cometd.client.Timer";
71 private final static String __JSON="org.mortbay.cometd.client.JSON";
72 private final static String __MSGPOOL="org.mortbay.cometd.MessagePool";
73 protected HttpClient _httpClient;
74
75 protected MessagePool _msgPool;
76 private ArrayQueue<Message> _inQ = new ArrayQueue<Message>();
77 private ArrayQueue<Message> _outQ = new ArrayQueue<Message>();
78 protected Address _cometdAddress;
79 private Exchange _pull;
80 private Exchange _push;
81 private String _path = "/cometd";
82 private boolean _initialized = false;
83 private boolean _disconnecting = false;
84 private boolean _handshook = false;
85 private String _clientId;
86 private org.cometd.Listener _listener;
87 private List<RemoveListener> _rListeners;
88 private List<MessageListener> _mListeners;
89 private int _batch;
90 private boolean _formEncoded;
91 private Map<String, Cookie> _cookies = new ConcurrentHashMap<String, Cookie>();
92 private Advice _advice;
93 private Timer _timer;
94 private int _backoffInterval = 0;
95 private int _backoffIncrement = 1000;
96 private int _backoffMaxInterval = 60000;
97 private Buffer _scheme;
98 protected Extension[] _extensions;
99 protected JSON _jsonOut;
100
101
102 public BayeuxClient(HttpClient client, String url)
103 {
104 this(client,url,null);
105 }
106
107
108 public BayeuxClient(HttpClient client, String url, Timer timer)
109 {
110 HttpURI uri = new HttpURI(url);
111 _httpClient = client;
112 _cometdAddress = new Address(uri.getHost(),uri.getPort());
113 _path=uri.getPath();
114 _timer = timer;
115 _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER;
116 }
117
118
119 public BayeuxClient(HttpClient client, Address address, String path, Timer timer)
120 {
121 _httpClient = client;
122 _cometdAddress = address;
123 _path = path;
124 _timer = timer;
125 }
126
127
128 public BayeuxClient(HttpClient client, Address address, String uri)
129 {
130 this(client,address,uri,null);
131 }
132
133
134 public void addExtension(Extension ext)
135 {
136 _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
137 }
138
139
140 Extension[] getExtensions()
141 {
142 return _extensions;
143 }
144
145
146
147
148
149
150
151
152 public void setBackOffInterval(int interval)
153 {
154 _backoffInterval = interval;
155 }
156
157
158
159
160
161
162 public int getBackoffInterval()
163 {
164 return _backoffInterval;
165 }
166
167
168
169
170
171
172 public void setBackoffMaxRetries(int retries)
173 {
174 }
175
176
177
178
179
180 public int getBackoffMaxRetries()
181 {
182 return -1;
183 }
184
185
186
187
188
189
190
191 public void setBackoffIncrement(int interval)
192 {
193 _backoffIncrement = interval;
194 }
195
196
197
198
199
200
201 public int getBackoffIncrement()
202 {
203 return _backoffIncrement;
204 }
205
206
207 public void setBackoffMaxInterval(int interval)
208 {
209 _backoffMaxInterval = interval;
210 }
211
212 public int getBackoffMaxInterval()
213 {
214 return _backoffMaxInterval;
215 }
216
217
218
219
220
221
222
223 public String getId()
224 {
225 return _clientId;
226 }
227
228
229 protected void doStart() throws Exception
230 {
231 if (!_httpClient.isStarted())
232 throw new IllegalStateException("!HttpClient.isStarted()");
233
234 synchronized (_httpClient)
235 {
236 if (_jsonOut == null)
237 {
238 _jsonOut = (JSON)_httpClient.getAttribute(__JSON);
239 if (_jsonOut==null)
240 {
241 _jsonOut = new JSON();
242 _httpClient.setAttribute(__JSON,_jsonOut);
243 }
244 }
245
246 if (_timer == null)
247 {
248 _timer = (Timer)_httpClient.getAttribute(__TIMER);
249 if (_timer==null)
250 {
251 _timer = new Timer(__TIMER+"@"+hashCode(),true);
252 _httpClient.setAttribute(__TIMER,_timer);
253 }
254 }
255
256 if (_msgPool == null)
257 {
258 _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL);
259 if (_msgPool==null)
260 {
261 _msgPool = new MessagePool();
262 _httpClient.setAttribute(__MSGPOOL,_msgPool);
263 }
264 }
265 }
266 _disconnecting=false;
267 _pull=null;
268 _push=null;
269 super.doStart();
270 synchronized (_outQ)
271 {
272 if (!_initialized && _pull == null)
273 {
274 _pull = new Handshake();
275 send((Exchange)_pull,false);
276 }
277 }
278 }
279
280
281 protected void doStop() throws Exception
282 {
283 if (!_disconnecting)
284 disconnect();
285 super.doStop();
286 }
287
288
289 public boolean isPolling()
290 {
291 synchronized (_outQ)
292 {
293 return isRunning() && (_pull != null);
294 }
295 }
296
297
298
299
300
301 public void deliver(Client from, Message message)
302 {
303 if (!isRunning())
304 throw new IllegalStateException("Not running");
305
306 synchronized (_inQ)
307 {
308 if (_mListeners == null)
309 _inQ.add(message);
310 else
311 {
312 for (MessageListener l : _mListeners)
313 l.deliver(from,this,message);
314 }
315 }
316 }
317
318
319
320
321
322
323
324
325 public void deliver(Client from, String toChannel, Object data, String id)
326 {
327 if (!isRunning())
328 throw new IllegalStateException("Not running");
329
330 MessageImpl message = _msgPool.newMessage();
331
332 message.put(Bayeux.CHANNEL_FIELD,toChannel);
333 message.put(Bayeux.DATA_FIELD,data);
334 if (id != null)
335 message.put(Bayeux.ID_FIELD,id);
336
337 synchronized (_inQ)
338 {
339 if (_mListeners == null)
340 {
341 message.incRef();
342 _inQ.add(message);
343 }
344 else
345 {
346 for (MessageListener l : _mListeners)
347 if (l instanceof MessageListener.Synchronous)
348 l.deliver(from,this,message);
349 }
350 }
351 if (_mListeners !=null)
352 for (MessageListener l : _mListeners)
353 if (!(l instanceof MessageListener.Synchronous))
354 l.deliver(from,this,message);
355 message.decRef();
356 }
357
358
359
360
361
362 public org.cometd.Listener getListener()
363 {
364 synchronized (_inQ)
365 {
366 return _listener;
367 }
368 }
369
370
371
372
373
374
375
376 public boolean hasMessages()
377 {
378 synchronized (_inQ)
379 {
380 return _inQ.size() > 0;
381 }
382 }
383
384
385
386
387
388
389
390 public boolean isLocal()
391 {
392 return false;
393 }
394
395
396
397
398
399
400
401 private void publish(MessageImpl msg)
402 {
403 msg.incRef();
404 synchronized (_outQ)
405 {
406 _outQ.add(msg);
407
408 if (_batch == 0 && _initialized && _push == null)
409 {
410 _push = new Publish();
411 try
412 {
413 send(_push);
414 }
415 catch (IOException e)
416 {
417 metaPublishFail(e,((Publish)_push).getOutboundMessages());
418 }
419 catch (IllegalStateException e)
420 {
421 metaPublishFail(e,((Publish)_push).getOutboundMessages());
422 }
423 }
424 }
425 }
426
427
428
429
430
431
432
433
434 public void publish(String toChannel, Object data, String msgId)
435 {
436 if (!isRunning() || _disconnecting)
437 throw new IllegalStateException("Not running");
438
439 MessageImpl msg = _msgPool.newMessage();
440 msg.put(Bayeux.CHANNEL_FIELD,toChannel);
441 msg.put(Bayeux.DATA_FIELD,data);
442 if (msgId != null)
443 msg.put(Bayeux.ID_FIELD,msgId);
444 publish(msg);
445 msg.decRef();
446 }
447
448
449
450
451
452
453
454 public void subscribe(String toChannel)
455 {
456 if (!isRunning() || _disconnecting)
457 throw new IllegalStateException("Not running");
458
459 MessageImpl msg = _msgPool.newMessage();
460 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
461 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
462 publish(msg);
463 msg.decRef();
464 }
465
466
467
468
469
470
471
472 public void unsubscribe(String toChannel)
473 {
474 if (!isRunning() || _disconnecting)
475 throw new IllegalStateException("Not running");
476
477 MessageImpl msg = _msgPool.newMessage();
478 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
479 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
480 publish(msg);
481 msg.decRef();
482 }
483
484
485
486
487
488
489 public void remove()
490 {
491 disconnect();
492 }
493
494
495
496
497
498 public void disconnect()
499 {
500 if (isStopped() || _disconnecting)
501 throw new IllegalStateException("Not running");
502
503 MessageImpl msg = _msgPool.newMessage();
504 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
505
506 synchronized (_outQ)
507 {
508 _outQ.add(msg);
509 _disconnecting = true;
510 if (_batch == 0 && _initialized && _push == null)
511 {
512 _push = new Publish();
513 try
514 {
515 send(_push);
516 }
517 catch (IOException e)
518 {
519 Log.warn(e.toString());
520 Log.debug(e);
521 send(_push,true);
522 }
523 }
524 _initialized = false;
525 }
526 }
527
528
529
530
531
532 public void setListener(org.cometd.Listener listener)
533 {
534 synchronized (_inQ)
535 {
536 if (_listener != null)
537 removeListener(_listener);
538 _listener = listener;
539 if (_listener != null)
540 addListener(_listener);
541 }
542 }
543
544
545
546
547
548
549
550
551 public List<Message> takeMessages()
552 {
553 final LinkedList<Message> list;
554 synchronized (_inQ)
555 {
556 list = new LinkedList<Message>(_inQ);
557 _inQ.clear();
558 }
559 for (Message m : list)
560 if (m instanceof MessageImpl)
561 ((MessageImpl)m).decRef();
562 return list;
563 }
564
565
566
567
568
569
570
571 public void endBatch()
572 {
573 synchronized (_outQ)
574 {
575 if (--_batch <= 0)
576 {
577 _batch = 0;
578 if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0)
579 {
580 _push = new Publish();
581 try
582 {
583 send(_push);
584 }
585 catch (IOException e)
586 {
587 metaPublishFail(e,((Publish)_push).getOutboundMessages());
588 }
589 }
590 }
591 }
592 }
593
594
595
596
597
598
599
600 public void startBatch()
601 {
602 if (isStopped())
603 throw new IllegalStateException("Not running");
604
605 synchronized (_outQ)
606 {
607 _batch++;
608 }
609 }
610
611
612
613
614
615
616
617 protected void customize(HttpExchange exchange)
618 {
619 StringBuilder buf = null;
620 for (Cookie cookie : _cookies.values())
621 {
622 if (buf == null)
623 buf = new StringBuilder();
624 else
625 buf.append("; ");
626 buf.append(cookie.getName());
627 buf.append("=");
628 buf.append(cookie.getValue());
629 }
630 if (buf != null)
631 exchange.setRequestHeader(HttpHeaders.COOKIE,buf.toString());
632
633 if (_scheme!=null)
634 exchange.setScheme(_scheme);
635 }
636
637
638 public void setCookie(Cookie cookie)
639 {
640 _cookies.put(cookie.getName(),cookie);
641 }
642
643
644
645
646
647
648
649 protected class Exchange extends ContentExchange
650 {
651 Message[] _responses;
652 int _connectFailures;
653 int _backoff = _backoffInterval;
654 String _json;
655
656
657 Exchange(String info)
658 {
659 setMethod("POST");
660 setScheme(HttpSchemes.HTTP_BUFFER);
661 setAddress(_cometdAddress);
662 setURI(_path + "/" + info);
663 setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
664 }
665
666
667 public int getBackoff()
668 {
669 return _backoff;
670 }
671
672
673 public void incBackoff()
674 {
675 _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval);
676 }
677
678
679 protected void setMessage(String message)
680 {
681 message=extendOut(message);
682 setJson(message);
683 }
684
685
686 protected void setJson(String json)
687 {
688 try
689 {
690 _json = json;
691
692 if (_formEncoded)
693 setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8")));
694 else
695 setRequestContent(new ByteArrayBuffer(_json,"utf-8"));
696 }
697 catch (Exception e)
698 {
699 Log.ignore(e);
700 setRequestContent(new ByteArrayBuffer(_json));
701 }
702 }
703
704
705 protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
706 {
707 super.onResponseStatus(version,status,reason);
708 }
709
710
711 protected void onResponseHeader(Buffer name, Buffer value) throws IOException
712 {
713 super.onResponseHeader(name,value);
714 if (!isRunning())
715 return;
716
717 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL)
718 {
719 String cname = null;
720 String cvalue = null;
721
722 QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false);
723 tok.setSingle(false);
724
725 if (tok.hasMoreElements())
726 cname = tok.nextToken();
727 if (tok.hasMoreElements())
728 cvalue = tok.nextToken();
729
730 Cookie cookie = new Cookie(cname,cvalue);
731
732 while (tok.hasMoreTokens())
733 {
734 String token = tok.nextToken();
735 if ("Version".equalsIgnoreCase(token))
736 cookie.setVersion(Integer.parseInt(tok.nextToken()));
737 else if ("Comment".equalsIgnoreCase(token))
738 cookie.setComment(tok.nextToken());
739 else if ("Path".equalsIgnoreCase(token))
740 cookie.setPath(tok.nextToken());
741 else if ("Domain".equalsIgnoreCase(token))
742 cookie.setDomain(tok.nextToken());
743 else if ("Expires".equalsIgnoreCase(token))
744 {
745 tok.nextToken();
746
747 }
748 else if ("Max-Age".equalsIgnoreCase(token))
749 {
750 tok.nextToken();
751
752 }
753 else if ("Secure".equalsIgnoreCase(token))
754 cookie.setSecure(true);
755 }
756
757 BayeuxClient.this.setCookie(cookie);
758 }
759 }
760
761
762 protected void onResponseComplete() throws IOException
763 {
764 if (!isRunning())
765 return;
766
767 super.onResponseComplete();
768
769 if (getResponseStatus() == 200)
770 {
771 String content = getResponseContent();
772
773 if (content == null || content.length() == 0)
774 throw new IllegalStateException();
775 _responses = _msgPool.parse(content);
776
777 if (_responses!=null)
778 for (int i=0;i<_responses.length;i++)
779 extendIn(_responses[i]);
780 }
781 }
782
783
784 protected void resend(boolean backoff)
785 {
786 if (!isRunning())
787 return;
788
789 final boolean disconnecting;
790 synchronized (_outQ)
791 {
792 disconnecting=_disconnecting;
793 }
794 if (disconnecting)
795 {
796 try{stop();}catch(Exception e){Log.ignore(e);}
797 return;
798 }
799
800 setJson(_json);
801 if (!send(this,backoff))
802 Log.warn("Retries exhausted");
803 }
804
805
806 protected void recycle()
807 {
808 if (_responses!=null)
809 for (Message msg:_responses)
810 if (msg instanceof MessageImpl)
811 ((MessageImpl)msg).decRef();
812 _responses=null;
813 }
814 }
815
816
817
818
819
820
821
822 protected class Handshake extends Exchange
823 {
824 public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]";
825
826 Handshake()
827 {
828 super("handshake");
829 setMessage(__HANDSHAKE);
830 }
831
832
833
834
835
836
837
838
839 protected void onResponseComplete() throws IOException
840 {
841 super.onResponseComplete();
842
843 if (!isRunning())
844 return;
845
846 if (_disconnecting)
847 {
848 Message error=_msgPool.newMessage();
849 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
850 error.put("failure","expired");
851 metaHandshake(false,false,error);
852 try{stop();}catch(Exception e){Log.ignore(e);}
853 return;
854 }
855
856 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
857 {
858 MessageImpl response = (MessageImpl)_responses[0];
859 boolean successful = response.isSuccessful();
860
861
862 Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD);
863 if (adviceField != null)
864 _advice = new Advice(adviceField);
865
866 if (successful)
867 {
868 _handshook = true;
869 if (Log.isDebugEnabled())
870 Log.debug("Successful handshake, sending connect");
871 _clientId = (String)response.get(Bayeux.CLIENT_FIELD);
872
873 metaHandshake(true,_handshook,response);
874 _pull = new Connect();
875 send(_pull,false);
876 }
877 else
878 {
879 metaHandshake(false,false,response);
880 _handshook = false;
881 if (_advice != null && _advice.isReconnectNone())
882 throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]);
883 else if (_advice != null && _advice.isReconnectHandshake())
884 {
885 _pull = new Handshake();
886 if (!send(_pull,true))
887 throw new IOException("Handshake, retries exhausted");
888 }
889 else
890
891 {
892 _pull = new Connect();
893 if (!send(_pull,true))
894 throw new IOException("Connect after handshake, retries exhausted");
895 }
896 }
897 }
898 else
899 {
900 Message error=_msgPool.newMessage();
901 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
902 error.put("status",new Integer(getResponseStatus()));
903 error.put("content",getResponseContent());
904
905 metaHandshake(false,false,error);
906 resend(true);
907 }
908
909 recycle();
910 }
911
912
913 protected void onExpire()
914 {
915
916 Message error=_msgPool.newMessage();
917 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
918 error.put("failure","expired");
919 metaHandshake(false,false,error);
920 resend(true);
921 }
922
923
924 protected void onConnectionFailed(Throwable ex)
925 {
926
927 Message error=_msgPool.newMessage();
928 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
929 error.put("failure",ex.toString());
930 error.put("exception",ex);
931 ex.printStackTrace();
932 metaHandshake(false,false,error);
933 resend(true);
934 }
935
936
937 protected void onException(Throwable ex)
938 {
939
940 Message error=_msgPool.newMessage();
941 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
942 error.put("failure",ex.toString());
943 error.put("exception",ex);
944 metaHandshake(false,false,error);
945 resend(true);
946 }
947 }
948
949
950
951
952
953
954 protected class Connect extends Exchange
955 {
956 String _connectString;
957
958 Connect()
959 {
960 super("connect");
961 _connectString = "{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}";
962 setMessage(_connectString);
963 }
964
965 protected void onResponseComplete() throws IOException
966 {
967 super.onResponseComplete();
968 if (!isRunning())
969 return;
970
971 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
972 {
973 try
974 {
975 startBatch();
976
977 for (int i = 0; i < _responses.length; i++)
978 {
979 Message msg = _responses[i];
980
981
982 Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
983 if (adviceField != null)
984 _advice = new Advice(adviceField);
985
986 if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
987 {
988 Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
989 if (successful != null && successful.booleanValue())
990 {
991 metaConnect(true,msg);
992
993 if (!isRunning())
994 break;
995
996 synchronized (_outQ)
997 {
998 if (_disconnecting)
999 continue;
1000
1001 if (!isInitialized())
1002 {
1003 setInitialized(true);
1004 {
1005 if (_outQ.size() > 0)
1006 {
1007 _push = new Publish();
1008 send(_push);
1009 }
1010 }
1011 }
1012
1013 }
1014
1015
1016 _pull = new Connect();
1017 send(_pull,false);
1018 }
1019 else
1020 {
1021
1022
1023
1024
1025
1026
1027
1028
1029 setInitialized(false);
1030 metaConnect(false,msg);
1031
1032 synchronized(_outQ)
1033 {
1034 if (!isRunning()||_disconnecting)
1035 break;
1036 }
1037
1038 if (_advice != null && _advice.isReconnectNone())
1039 throw new IOException("Connect failed, advice reconnect=none");
1040 else if (_advice != null && _advice.isReconnectHandshake())
1041 {
1042 if (Log.isDebugEnabled())
1043 Log.debug("connect received success=false, advice is to rehandshake");
1044 _pull = new Handshake();
1045 send(_pull,true);
1046 }
1047 else
1048 {
1049
1050 if (Log.isDebugEnabled())
1051 Log.debug("Assuming retry=reconnect");
1052 resend(true);
1053 }
1054 }
1055 }
1056 deliver(null,msg);
1057 }
1058 }
1059 finally
1060 {
1061 endBatch();
1062 }
1063 }
1064 else
1065 {
1066 Message error=_msgPool.newMessage();
1067 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1068 error.put("status",getResponseStatus());
1069 error.put("content",getResponseContent());
1070 metaConnect(false,error);
1071 resend(true);
1072 }
1073
1074 recycle();
1075 }
1076
1077
1078 protected void onExpire()
1079 {
1080
1081 setInitialized(false);
1082 Message error=_msgPool.newMessage();
1083 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1084 error.put("failure","expired");
1085 metaConnect(false,error);
1086 resend(true);
1087 }
1088
1089
1090 protected void onConnectionFailed(Throwable ex)
1091 {
1092
1093 setInitialized(false);
1094 Message error=_msgPool.newMessage();
1095 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1096 error.put("failure",ex.toString());
1097 error.put("exception",ex);
1098 metaConnect(false,error);
1099 resend(true);
1100 }
1101
1102
1103 protected void onException(Throwable ex)
1104 {
1105
1106 setInitialized(false);
1107 Message error=_msgPool.newMessage();
1108 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1109 error.put("failure",ex.toString());
1110 error.put("exception",ex);
1111 metaConnect(false,error);
1112 resend(true);
1113 }
1114 }
1115
1116
1117
1118
1119
1120
1121 protected class Publish extends Exchange
1122 {
1123 Publish()
1124 {
1125 super("publish");
1126
1127 StringBuffer json = new StringBuffer(256);
1128 synchronized (json)
1129 {
1130 synchronized (_outQ)
1131 {
1132 int s=_outQ.size();
1133 if (s == 0)
1134 return;
1135
1136 for (int i=0;i<s;i++)
1137 {
1138 Message message = _outQ.getUnsafe(i);
1139 message.put(Bayeux.CLIENT_FIELD,_clientId);
1140 extendOut(message);
1141
1142 json.append(i==0?'[':',');
1143 _jsonOut.append(json,message);
1144
1145 if (message instanceof MessageImpl)
1146 ((MessageImpl)message).decRef();
1147 }
1148 json.append(']');
1149 _outQ.clear();
1150 setJson(json.toString());
1151 }
1152 }
1153 }
1154
1155 protected Message[] getOutboundMessages()
1156 {
1157 try
1158 {
1159 return _msgPool.parse(_json);
1160 }
1161 catch (IOException e)
1162 {
1163 Log.warn("Error converting outbound messages");
1164 if (Log.isDebugEnabled())
1165 Log.debug(e);
1166 return null;
1167 }
1168 }
1169
1170
1171
1172
1173
1174
1175
1176
1177 protected void onResponseComplete() throws IOException
1178 {
1179 if (!isRunning())
1180 return;
1181
1182 super.onResponseComplete();
1183 try
1184 {
1185 synchronized (_outQ)
1186 {
1187 startBatch();
1188 _push = null;
1189 }
1190
1191 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1192 {
1193 for (int i = 0; i < _responses.length; i++)
1194 {
1195 MessageImpl msg = (MessageImpl)_responses[i];
1196
1197 deliver(null,msg);
1198 if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful())
1199 {
1200 if (isStarted())
1201 {
1202 try{stop();}catch(Exception e){Log.ignore(e);}
1203 }
1204 break;
1205 }
1206 }
1207 }
1208 else
1209 {
1210 Log.warn("Publish, error=" + getResponseStatus());
1211 }
1212 }
1213 finally
1214 {
1215 endBatch();
1216 }
1217 recycle();
1218 }
1219
1220
1221 protected void onExpire()
1222 {
1223 super.onExpire();
1224 metaPublishFail(null,this.getOutboundMessages());
1225 if (_disconnecting)
1226 {
1227 try{stop();}catch(Exception e){Log.ignore(e);}
1228 }
1229 }
1230
1231
1232 protected void onConnectionFailed(Throwable ex)
1233 {
1234 super.onConnectionFailed(ex);
1235 metaPublishFail(ex,this.getOutboundMessages());
1236 if (_disconnecting)
1237 {
1238 try{stop();}catch(Exception e){Log.ignore(e);}
1239 }
1240 }
1241
1242
1243 protected void onException(Throwable ex)
1244 {
1245 super.onException(ex);
1246 metaPublishFail(ex,this.getOutboundMessages());
1247 if (_disconnecting)
1248 {
1249 try{stop();}catch(Exception e){Log.ignore(e);}
1250 }
1251 }
1252 }
1253
1254
1255 public void addListener(ClientListener listener)
1256 {
1257 synchronized (_inQ)
1258 {
1259 boolean added=false;
1260 if (listener instanceof MessageListener)
1261 {
1262 added=true;
1263 if (_mListeners == null)
1264 _mListeners = new ArrayList<MessageListener>();
1265 _mListeners.add((MessageListener)listener);
1266 }
1267 if (listener instanceof RemoveListener)
1268 {
1269 added=true;
1270 if (_rListeners == null)
1271 _rListeners = new ArrayList<RemoveListener>();
1272 _rListeners.add((RemoveListener)listener);
1273 }
1274
1275 if (!added)
1276 throw new IllegalArgumentException();
1277 }
1278 }
1279
1280
1281 public void removeListener(ClientListener listener)
1282 {
1283 synchronized (_inQ)
1284 {
1285 if (listener instanceof MessageListener)
1286 {
1287 if (_mListeners != null)
1288 _mListeners.remove((MessageListener)listener);
1289 }
1290 if (listener instanceof RemoveListener)
1291 {
1292 if (_rListeners != null)
1293 _rListeners.remove((RemoveListener)listener);
1294 }
1295 }
1296 }
1297
1298
1299 public int getMaxQueue()
1300 {
1301 return -1;
1302 }
1303
1304
1305 public Queue<Message> getQueue()
1306 {
1307 return _inQ;
1308 }
1309
1310
1311 public void setMaxQueue(int max)
1312 {
1313 if (max != -1)
1314 throw new UnsupportedOperationException();
1315 }
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326 protected boolean send(final Exchange exchange, final boolean backoff)
1327 {
1328 long interval = (_advice != null?_advice.getInterval():0);
1329
1330 if (backoff)
1331 {
1332 int backoffInterval = exchange.getBackoff();
1333 if (Log.isDebugEnabled())
1334 Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange);
1335
1336 exchange.incBackoff();
1337
1338 interval += backoffInterval;
1339 }
1340
1341 if (interval > 0)
1342 {
1343 TimerTask task = new TimerTask()
1344 {
1345 public void run()
1346 {
1347 try
1348 {
1349 send(exchange);
1350 }
1351 catch (IOException e)
1352 {
1353 Log.warn("Delayed send, retry: "+e);
1354 Log.debug(e);
1355 send(exchange,true);
1356 }
1357 catch (IllegalStateException e)
1358 {
1359 Log.warn("Delayed send, retry: "+e);
1360 Log.debug(e);
1361 send(exchange,true);
1362 }
1363 }
1364 };
1365 if (Log.isDebugEnabled())
1366 Log.debug("Delay " + interval + " send of " + exchange);
1367 _timer.schedule(task,interval);
1368 }
1369 else
1370 {
1371 try
1372 {
1373 send(exchange);
1374 }
1375 catch (IOException e)
1376 {
1377 Log.warn("Send, retry on fail: "+e);
1378 Log.debug(e);
1379 return send(exchange,true);
1380 }
1381 catch (IllegalStateException e)
1382 {
1383 Log.warn("Send, retry on fail: "+e);
1384 Log.debug(e);
1385 return send(exchange,true);
1386 }
1387 }
1388 return true;
1389
1390 }
1391
1392
1393
1394
1395
1396
1397
1398
1399 protected void send(HttpExchange exchange) throws IOException
1400 {
1401 exchange.reset();
1402 customize(exchange);
1403 if (Log.isDebugEnabled())
1404 Log.debug("Send: using any connection=" + exchange);
1405 _httpClient.send(exchange);
1406 }
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417 protected void setInitialized(boolean b)
1418 {
1419 synchronized (_outQ)
1420 {
1421 _initialized = b;
1422 }
1423 }
1424
1425
1426 protected boolean isInitialized()
1427 {
1428 return _initialized;
1429 }
1430
1431
1432
1433
1434
1435
1436 protected void metaConnect(boolean success, Message message)
1437 {
1438 if (!success)
1439 Log.warn(this.toString()+" "+message.toString());
1440 }
1441
1442
1443
1444
1445
1446
1447
1448 protected void metaHandshake(boolean success, boolean reestablish, Message message)
1449 {
1450 if (!success)
1451 Log.warn(this.toString()+" "+message.toString());
1452 }
1453
1454
1455
1456
1457
1458 protected void metaPublishFail(Throwable e, Message[] messages)
1459 {
1460 Log.warn(this.toString()+": "+e);
1461 Log.debug(e);
1462 }
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476 protected String extendOut(String msg)
1477 {
1478 if (_extensions==null)
1479 return msg;
1480
1481 try
1482 {
1483 Message[] messages = _msgPool.parse(msg);
1484 for (int i=0; i<messages.length; i++)
1485 extendOut(messages[i]);
1486 if (messages.length==1 && msg.charAt(0)=='{')
1487 return _msgPool.getMsgJSON().toJSON(messages[0]);
1488 return _msgPool.getMsgJSON().toJSON(messages);
1489 }
1490 catch(IOException e)
1491 {
1492 Log.warn(e);
1493 return msg;
1494 }
1495 }
1496
1497
1498
1499
1500
1501
1502
1503 protected void extendOut(Message message)
1504 {
1505 if (_extensions!=null)
1506 {
1507 Message m = message;
1508 if (m.getChannel().startsWith(Bayeux.META_SLASH))
1509 for (int i=0;m!=null && i<_extensions.length;i++)
1510 m=_extensions[i].sendMeta(this,m);
1511 else
1512 for (int i=0;m!=null && i<_extensions.length;i++)
1513 m=_extensions[i].send(this,m);
1514
1515 if (message!=m)
1516 {
1517 message.clear();
1518 if (m!=null)
1519 for (Map.Entry<String,Object> entry:m.entrySet())
1520 message.put(entry.getKey(),entry.getValue());
1521 }
1522 }
1523 }
1524
1525
1526
1527
1528
1529
1530
1531 protected void extendIn(Message message)
1532 {
1533 if (_extensions!=null)
1534 {
1535 Message m = message;
1536 if (m.getChannel().startsWith(Bayeux.META_SLASH))
1537 for (int i=_extensions.length;m!=null && i-->0;)
1538 m=_extensions[i].rcvMeta(this,m);
1539 else
1540 for (int i=_extensions.length;m!=null && i-->0;)
1541 m=_extensions[i].rcv(this,m);
1542
1543 if (message!=m)
1544 {
1545 message.clear();
1546 if (m!=null)
1547 for (Map.Entry<String,Object> entry:m.entrySet())
1548 message.put(entry.getKey(),entry.getValue());
1549 }
1550 }
1551 }
1552
1553
1554 }