1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22
23 import org.cometd.Bayeux;
24 import org.cometd.Channel;
25 import org.cometd.ChannelBayeuxListener;
26 import org.cometd.ChannelListener;
27 import org.cometd.Client;
28 import org.cometd.DataFilter;
29 import org.cometd.Message;
30 import org.cometd.SubscriptionListener;
31 import org.mortbay.log.Log;
32 import org.mortbay.util.LazyList;
33
34
35
36
37
38
39
40
41 public class ChannelImpl implements Channel
42 {
43 protected AbstractBayeux _bayeux;
44 private volatile ClientImpl[] _subscribers=new ClientImpl[0];
45
46 private volatile DataFilter[] _dataFilters=new DataFilter[0];
47
48 private volatile SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0];
49
50
51 private ChannelId _id;
52 private ConcurrentMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
53 private ChannelImpl _wild;
54 private ChannelImpl _wildWild;
55 private boolean _persistent;
56 private int _split;
57 private boolean _lazy;
58
59
60 protected ChannelImpl(String id, AbstractBayeux bayeux)
61 {
62 _id=new ChannelId(id);
63 _bayeux=bayeux;
64 }
65
66
67
68
69
70
71
72
73 public boolean isLazy()
74 {
75 return _lazy;
76 }
77
78
79
80
81
82
83
84
85
86 public void setLazy(boolean lazy)
87 {
88 _lazy=lazy;
89 }
90
91
92 public void addChild(ChannelImpl channel)
93 {
94 ChannelId child=channel.getChannelId();
95 if (!_id.isParentOf(child))
96 {
97 throw new IllegalArgumentException(_id + " not parent of " + child);
98 }
99
100 String next=child.getSegment(_id.depth());
101
102 if ((child.depth() - _id.depth()) == 1)
103 {
104
105 synchronized(this)
106 {
107
108 ChannelImpl old=_children.putIfAbsent(next,channel);
109 if (old != null)
110 throw new IllegalArgumentException("Already Exists");
111
112 if (ChannelId.WILD.equals(next))
113 _wild=channel;
114 else if (ChannelId.WILDWILD.equals(next))
115 _wildWild=channel;
116 _bayeux.addChannel(channel);
117 }
118 }
119 else
120 {
121 ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
122 branch.addChild(channel);
123 }
124 }
125
126
127
128
129
130 public void addDataFilter(DataFilter filter)
131 {
132 synchronized(this)
133 {
134 _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
135 }
136 }
137
138
139
140
141
142 public ChannelId getChannelId()
143 {
144 return _id;
145 }
146
147
148 public ChannelImpl getChild(ChannelId id)
149 {
150 String next=id.getSegment(_id.depth());
151 if (next == null)
152 return null;
153
154 ChannelImpl channel=_children.get(next);
155
156 if (channel == null || channel.getChannelId().depth() == id.depth())
157 {
158 return channel;
159 }
160 return channel.getChild(id);
161 }
162
163
164 public void getChannels(List<Channel> list)
165 {
166 synchronized(this)
167 {
168 list.add(this);
169 for (ChannelImpl channel : _children.values())
170 channel.getChannels(list);
171 }
172 }
173
174
175 public int getChannelCount()
176 {
177 return _children.size();
178 }
179
180
181
182
183
184 public String getId()
185 {
186 return _id.toString();
187 }
188
189
190 public boolean isPersistent()
191 {
192 return _persistent;
193 }
194
195
196 public void deliver(Client from, Iterable<Client> to, Object data, String id)
197 {
198 MessageImpl message=_bayeux.newMessage();
199 message.put(Bayeux.CHANNEL_FIELD,getId());
200 message.put(Bayeux.DATA_FIELD,data);
201 if (id != null)
202 message.put(Bayeux.ID_FIELD,id);
203
204 Message m=_bayeux.extendSendBayeux(from,message);
205
206 if (m != null)
207 {
208 for (Client t : to)
209 ((ClientImpl)t).doDelivery(from,m);
210 }
211 if (m instanceof MessageImpl)
212 ((MessageImpl)m).decRef();
213 }
214
215
216 public void publish(Client fromClient, Object data, String msgId)
217 {
218 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
219 }
220
221
222 public void publishLazy(Client fromClient, Object data, String msgId)
223 {
224 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
225 }
226
227
228 public boolean remove()
229 {
230 return _bayeux.removeChannel(this);
231 }
232
233
234 public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
235 {
236 ChannelId channelId=channel.getChannelId();
237 int diff=channel._id.depth() - _id.depth();
238
239 if (diff >= 1)
240 {
241 String key=channelId.getSegment(_id.depth());
242 ChannelImpl child=_children.get(key);
243
244 if (child != null)
245 {
246
247 if (diff == 1)
248 {
249 if (!child.isPersistent())
250 {
251
252
253 synchronized(this)
254 {
255 if (child.getChannelCount() > 0)
256 {
257
258 for (ChannelImpl c : child._children.values())
259 child.doRemove(c,listeners);
260 }
261
262
263 _children.remove(key);
264 }
265 for (ChannelBayeuxListener l : listeners)
266 l.channelRemoved(channel);
267 return true;
268 }
269 return false;
270 }
271
272 boolean removed=child.doRemove(channel,listeners);
273 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
274 {
275
276 synchronized(this)
277 {
278 _children.remove(key);
279 }
280 for (ChannelBayeuxListener l : listeners)
281 l.channelRemoved(channel);
282 }
283
284 return removed;
285 }
286
287 }
288 return false;
289 }
290
291
292
293
294
295 public DataFilter removeDataFilter(DataFilter filter)
296 {
297 synchronized(this)
298 {
299 _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
300 return filter;
301 }
302 }
303
304
305 public void setPersistent(boolean persistent)
306 {
307 _persistent=persistent;
308 }
309
310
311
312
313
314 public void subscribe(Client client)
315 {
316 if (!(client instanceof ClientImpl))
317 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
318
319 synchronized(this)
320 {
321 for (ClientImpl c : _subscribers)
322 {
323 if (client.equals(c))
324 return;
325 }
326 _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
327
328 for (SubscriptionListener l : _subscriptionListeners)
329 l.subscribed(client,this);
330 }
331
332 ((ClientImpl)client).addSubscription(this);
333 }
334
335
336 @Override
337 public String toString()
338 {
339 return _id.toString();
340 }
341
342
343
344
345
346 public void unsubscribe(Client client)
347 {
348 if (!(client instanceof ClientImpl))
349 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
350 ((ClientImpl)client).removeSubscription(this);
351 synchronized(this)
352 {
353 _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
354
355 for (SubscriptionListener l : _subscriptionListeners)
356 l.unsubscribed(client,this);
357
358 if (!_persistent && _subscribers.length == 0 && _children.size() == 0)
359 remove();
360 }
361 }
362
363
364 protected void doDelivery(ChannelId to, Client from, Message msg)
365 {
366 int tail=to.depth() - _id.depth();
367
368 Object data=msg.getData();
369
370
371 if (data != null)
372 {
373 Object old=data;
374
375 try
376 {
377 switch(tail)
378 {
379 case 0:
380 {
381 final DataFilter[] filters=_dataFilters;
382 for (DataFilter filter : filters)
383 {
384 data=filter.filter(from,this,data);
385 if (data == null)
386 return;
387 }
388 }
389 break;
390
391 case 1:
392 if (_wild != null)
393 {
394 final DataFilter[] filters=_wild._dataFilters;
395 for (DataFilter filter : filters)
396 {
397 data=filter.filter(from,this,data);
398 if (data == null)
399 return;
400 }
401 }
402
403 default:
404 if (_wildWild != null)
405 {
406 final DataFilter[] filters=_wildWild._dataFilters;
407 for (DataFilter filter : filters)
408 {
409 data=filter.filter(from,this,data);
410 if (data == null)
411 return;
412 }
413 }
414 }
415 }
416 catch(IllegalStateException e)
417 {
418 Log.ignore(e);
419 return;
420 }
421
422
423
424 if (data != old)
425 msg.put(AbstractBayeux.DATA_FIELD,data);
426 }
427
428 switch(tail)
429 {
430 case 0:
431 {
432 if (_lazy && msg instanceof MessageImpl)
433 ((MessageImpl)msg).setLazy(true);
434
435 final ClientImpl[] subscribers=_subscribers;
436 if (subscribers.length > 0)
437 {
438
439 int split=_split++ % _subscribers.length;
440 for (int i=split; i < subscribers.length; i++)
441 subscribers[i].doDelivery(from,msg);
442 for (int i=0; i < split; i++)
443 subscribers[i].doDelivery(from,msg);
444 }
445 break;
446 }
447
448 case 1:
449 if (_wild != null)
450 {
451 if (_wild._lazy && msg instanceof MessageImpl)
452 ((MessageImpl)msg).setLazy(true);
453 final ClientImpl[] subscribers=_wild._subscribers;
454 for (ClientImpl client : subscribers)
455 client.doDelivery(from,msg);
456 }
457
458 default:
459 {
460 if (_wildWild != null)
461 {
462 if (_wildWild._lazy && msg instanceof MessageImpl)
463 ((MessageImpl)msg).setLazy(true);
464 final ClientImpl[] subscribers=_wildWild._subscribers;
465 for (ClientImpl client : subscribers)
466 client.doDelivery(from,msg);
467 }
468 String next=to.getSegment(_id.depth());
469 ChannelImpl channel=_children.get(next);
470 if (channel != null)
471 channel.doDelivery(to,from,msg);
472 }
473 }
474 }
475
476
477 public Collection<Client> getSubscribers()
478 {
479 synchronized(this)
480 {
481 return Arrays.asList((Client[])_subscribers);
482 }
483 }
484
485
486 public int getSubscriberCount()
487 {
488 synchronized(this)
489 {
490 return _subscribers.length;
491 }
492 }
493
494
495
496
497
498
499
500 public Collection<DataFilter> getDataFilters()
501 {
502 synchronized(this)
503 {
504 return Arrays.asList(_dataFilters);
505 }
506 }
507
508
509 public void addListener(ChannelListener listener)
510 {
511 synchronized(this)
512 {
513 if (listener instanceof SubscriptionListener)
514 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
515 }
516 }
517
518 public void removeListener(ChannelListener listener)
519 {
520 synchronized(this)
521 {
522 if (listener instanceof SubscriptionListener)
523 {
524 _subscriptionListeners=(SubscriptionListener[])LazyList.removeFromArray(_subscriptionListeners,listener);
525 }
526 }
527 }
528 }