View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.ByteChannel;
19  import java.nio.channels.CancelledKeyException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.mortbay.component.AbstractLifeCycle;
30  import org.mortbay.component.LifeCycle;
31  import org.mortbay.io.Connection;
32  import org.mortbay.io.EndPoint;
33  import org.mortbay.log.Log;
34  import org.mortbay.thread.Timeout;
35  
36  
37  /* ------------------------------------------------------------ */
38  /**
39   * The Selector Manager manages and number of SelectSets to allow
40   * NIO scheduling to scale to large numbers of connections.
41   * 
42   * @author gregw
43   *
44   */
45  public abstract class SelectorManager extends AbstractLifeCycle
46  {
47      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",128).intValue();
48      private static final int __JVMBUG_THRESHHOLD2=__JVMBUG_THRESHHOLD*2;
49      private static final int __JVMBUG_THRESHHOLD1=(__JVMBUG_THRESHHOLD2+__JVMBUG_THRESHHOLD)/2;
50      private boolean _delaySelectKeyUpdate=true;
51      private long _maxIdleTime;
52      private long _lowResourcesConnections;
53      private long _lowResourcesMaxIdleTime;
54      private transient SelectSet[] _selectSet;
55      private int _selectSets=1;
56      private volatile int _set;
57      private boolean _jvmBug0;
58      private boolean _jvmBug1;
59      
60  
61      /* ------------------------------------------------------------ */
62      /**
63       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
64       * @see {@link #setLowResourcesMaxIdleTime(long)}
65       */
66      public void setMaxIdleTime(long maxIdleTime)
67      {
68          _maxIdleTime=maxIdleTime;
69      }
70      
71      /* ------------------------------------------------------------ */
72      /**
73       * @param selectSets
74       */
75      public void setSelectSets(int selectSets)
76      {
77          long lrc = _lowResourcesConnections * _selectSets; 
78          _selectSets=selectSets;
79          _lowResourcesConnections=lrc/_selectSets;
80      }
81      
82      /* ------------------------------------------------------------ */
83      /**
84       * @return
85       */
86      public long getMaxIdleTime()
87      {
88          return _maxIdleTime;
89      }
90      
91      /* ------------------------------------------------------------ */
92      /**
93       * @return
94       */
95      public int getSelectSets()
96      {
97          return _selectSets;
98      }
99      
100     /* ------------------------------------------------------------ */
101     /**
102      * @return
103      */
104     public boolean isDelaySelectKeyUpdate()
105     {
106         return _delaySelectKeyUpdate;
107     }
108 
109     /* ------------------------------------------------------------ */
110     /** Register a channel
111      * @param channel
112      * @param att Attached Object
113      * @throws IOException
114      */
115     public void register(SocketChannel channel, Object att) throws IOException
116     {
117         int s=_set++; 
118         s=s%_selectSets;
119         SelectSet[] sets=_selectSet;
120         if (sets!=null)
121         {
122             SelectSet set=sets[s];
123             set.addChange(channel,att);
124             set.wakeup();
125         }
126     }
127     
128     /* ------------------------------------------------------------ */
129     /** Register a serverchannel
130      * @param acceptChannel
131      * @return
132      * @throws IOException
133      */
134     public void register(ServerSocketChannel acceptChannel) throws IOException
135     {
136         int s=_set++; 
137         s=s%_selectSets;
138         SelectSet set=_selectSet[s];
139         set.addChange(acceptChannel);
140         set.wakeup();
141     }
142 
143     /* ------------------------------------------------------------ */
144     /**
145      * @return the lowResourcesConnections
146      */
147     public long getLowResourcesConnections()
148     {
149         return _lowResourcesConnections*_selectSets;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * Set the number of connections, which if exceeded places this manager in low resources state.
155      * This is not an exact measure as the connection count is averaged over the select sets.
156      * @param lowResourcesConnections the number of connections
157      * @see {@link #setLowResourcesMaxIdleTime(long)}
158      */
159     public void setLowResourcesConnections(long lowResourcesConnections)
160     {
161         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
162     }
163 
164     /* ------------------------------------------------------------ */
165     /**
166      * @return the lowResourcesMaxIdleTime
167      */
168     public long getLowResourcesMaxIdleTime()
169     {
170         return _lowResourcesMaxIdleTime;
171     }
172 
173     /* ------------------------------------------------------------ */
174     /**
175      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
176      * @see {@link #setMaxIdleTime(long)}
177      */
178     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
179     {
180         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
181     }
182     
183     /* ------------------------------------------------------------ */
184     /**
185      * @param acceptorID
186      * @throws IOException
187      */
188     public void doSelect(int acceptorID) throws IOException
189     {
190         SelectSet[] sets= _selectSet;
191         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
192             sets[acceptorID].doSelect();
193     }
194 
195 
196     /* ------------------------------------------------------------ */
197     /**
198      * @param delaySelectKeyUpdate
199      */
200     public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
201     {
202         _delaySelectKeyUpdate=delaySelectKeyUpdate;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /**
207      * @param key
208      * @return
209      * @throws IOException 
210      */
211     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
212 
213     /* ------------------------------------------------------------------------------- */
214     public abstract boolean dispatch(Runnable task) throws IOException;
215 
216     /* ------------------------------------------------------------ */
217     /* (non-Javadoc)
218      * @see org.mortbay.component.AbstractLifeCycle#doStart()
219      */
220     protected void doStart() throws Exception
221     {
222         _selectSet = new SelectSet[_selectSets];
223         for (int i=0;i<_selectSet.length;i++)
224             _selectSet[i]= new SelectSet(i);
225 
226         super.doStart();
227     }
228 
229 
230     /* ------------------------------------------------------------------------------- */
231     protected void doStop() throws Exception
232     {
233         SelectSet[] sets= _selectSet;
234         _selectSet=null;
235         if (sets!=null)
236             for (int i=0;i<sets.length;i++)
237             {
238                 SelectSet set = sets[i];
239                 if (set!=null)
240                     set.stop();
241             }
242         super.doStop();
243     }
244 
245     /* ------------------------------------------------------------ */
246     /**
247      * @param endpoint
248      */
249     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * @param endpoint
254      */
255     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
256 
257     /* ------------------------------------------------------------------------------- */
258     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * @param channel
263      * @param selectSet
264      * @param sKey
265      * @return
266      * @throws IOException
267      */
268     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269 
270     /* ------------------------------------------------------------------------------- */
271     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272     {
273         Log.warn(ex);
274     }
275     
276     /* ------------------------------------------------------------------------------- */
277     /* ------------------------------------------------------------------------------- */
278     /* ------------------------------------------------------------------------------- */
279     public class SelectSet 
280     {
281         private transient int _change;
282         private transient List[] _changes;
283         private transient Timeout _idleTimeout;
284         private transient int _nextSet;
285         private transient Timeout _retryTimeout;
286         private transient Selector _selector;
287         private transient int _setID;
288         private transient int _jvmBug;
289         private volatile boolean _selecting;
290         
291         /* ------------------------------------------------------------ */
292         SelectSet(int acceptorID) throws Exception
293         {
294             _setID=acceptorID;
295 
296             _idleTimeout = new Timeout(this);
297             _idleTimeout.setDuration(getMaxIdleTime());
298             _retryTimeout = new Timeout(this);
299             _retryTimeout.setDuration(0L);
300 
301             // create a selector;
302             _selector = Selector.open();
303             _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
304             _change=0;
305         }
306         
307         /* ------------------------------------------------------------ */
308         public void addChange(Object point)
309         {
310             synchronized (_changes)
311             {
312                 _changes[_change].add(point);
313             }
314         }
315         
316         /* ------------------------------------------------------------ */
317         public void addChange(SelectableChannel channel, Object att)
318         {   
319             if (att==null)
320                 addChange(channel);
321             else if (att instanceof EndPoint)
322                 addChange(att);
323             else
324                 addChange(new ChangeSelectableChannel(channel,att));
325         }
326         
327         /* ------------------------------------------------------------ */
328         public void cancelIdle(Timeout.Task task)
329         {
330             synchronized (this)
331             {
332                 task.cancel();
333             }
334         }
335 
336         /* ------------------------------------------------------------ */
337         /**
338          * Select and dispatch tasks found from changes and the selector.
339          * 
340          * @throws IOException
341          */
342         public void doSelect() throws IOException
343         {
344             SelectionKey key=null;
345             
346             try
347             {
348                 List changes;
349                 final Selector selector;
350                 synchronized (_changes)
351                 {
352                     changes=_changes[_change];
353                     _change=_change==0?1:0;
354                     _selecting=true;
355                     selector=_selector;
356                 }
357                 
358 
359                 // Make any key changes required
360                 for (int i = 0; i < changes.size(); i++)
361                 {
362                     try
363                     {
364                         Object o = changes.get(i);
365                         
366                         if (o instanceof EndPoint)
367                         {
368                             // Update the operations for a key.
369                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
370                             endpoint.doUpdateKey();
371                         }
372                         else if (o instanceof Runnable)
373                         {
374                             dispatch((Runnable)o);
375                         }
376                         else if (o instanceof ChangeSelectableChannel)
377                         {
378                             // finish accepting/connecting this connection
379                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
380                             final SelectableChannel channel=asc._channel;
381                             final Object att = asc._attachment;
382 
383                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
384                             {
385                                 key = channel.register(selector,SelectionKey.OP_READ,att);
386                                 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
387                                 key.attach(endpoint);
388                                 endpoint.dispatch();
389                             }
390                             else if (channel.isOpen())
391                             {
392                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
393                             }
394                         }
395                         else if (o instanceof SocketChannel)
396                         {
397                             final SocketChannel channel=(SocketChannel)o;
398 
399                             if (channel.isConnected())
400                             {
401                                 key = channel.register(selector,SelectionKey.OP_READ,null);
402                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
403                                 key.attach(endpoint);
404                                 endpoint.dispatch();
405                             }
406                             else
407                             {
408                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
409                             }
410                         }
411                         else if (o instanceof ServerSocketChannel)
412                         {
413                             ServerSocketChannel channel = (ServerSocketChannel)o;
414                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
415                         }
416                         else if (o instanceof ChangeTask)
417                         {
418                             ((ChangeTask)o).run();
419                         }
420                         else
421                             throw new IllegalArgumentException(o.toString());
422                     }
423                     catch (CancelledKeyException e)
424                     {
425                         if (isRunning())
426                             Log.warn(e);
427                         else
428                             Log.debug(e);
429                     }
430                 }
431                 changes.clear();
432 
433                 long idle_next = 0;
434                 long retry_next = 0;
435                 long now=System.currentTimeMillis();
436                 synchronized (this)
437                 {
438                     _idleTimeout.setNow(now);
439                     _retryTimeout.setNow(now);
440                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
441                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
442                     else 
443                         _idleTimeout.setDuration(_maxIdleTime);
444                     idle_next=_idleTimeout.getTimeToNext();
445                     retry_next=_retryTimeout.getTimeToNext();
446                 }
447 
448                 // workout how low to wait in select
449                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
450                 if (idle_next >= 0 && wait > idle_next)
451                     wait = idle_next;
452                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
453                     wait = retry_next;
454     
455                 // Do the select.
456                 if (wait > 10) // TODO tune or configure this
457                 {
458                     long before=now;
459                     int selected=selector.select(wait);
460                     now = System.currentTimeMillis();
461                     _idleTimeout.setNow(now);
462                     _retryTimeout.setNow(now);
463 
464                     // Look for JVM bugs
465                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
466                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
467                     if (__JVMBUG_THRESHHOLD>0 && selected==0 && wait>__JVMBUG_THRESHHOLD && (now-before)<(wait/2) )
468                     {
469                         _jvmBug++;
470                         if (_jvmBug>=(__JVMBUG_THRESHHOLD2))
471                         {
472                             synchronized (this)
473                             {
474                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
475                                 if (_jvmBug1)
476                                     Log.debug("seeing JVM BUG(s) - recreating selector");
477                                 else
478                                 {
479                                     _jvmBug1=true;
480                                     Log.info("seeing JVM BUG(s) - recreating selector");
481                                 }
482                                 
483                                 final Selector new_selector = Selector.open();
484                                 Iterator iterator = _selector.keys().iterator();
485                                 while (iterator.hasNext())
486                                 {
487                                     SelectionKey k = (SelectionKey)iterator.next();
488                                     if (!k.isValid() || k.interestOps()==0)
489                                         continue;
490                                     
491                                     final SelectableChannel channel = k.channel();
492                                     final Object attachment = k.attachment();
493                                     
494                                     if (attachment==null)
495                                         addChange(channel);
496                                     else
497                                         addChange(channel,attachment);
498                                 }
499                                 _selector.close();
500                                 _selector=new_selector;
501                                 _jvmBug=0;
502                                 return;
503                             }
504                         }
505                         else if (_jvmBug==__JVMBUG_THRESHHOLD || _jvmBug==__JVMBUG_THRESHHOLD1)
506                         {
507                             // Cancel keys with 0 interested ops
508                             if (_jvmBug0)
509                                 Log.debug("seeing JVM BUG(s) - cancelling interestOps==0");
510                             else
511                             {
512                                 _jvmBug0=true;
513                                 Log.info("seeing JVM BUG(s) - cancelling interestOps==0");
514                             }
515                             Iterator iter = selector.keys().iterator();
516                             while(iter.hasNext())
517                             {
518                                 SelectionKey k = (SelectionKey) iter.next();
519                                 if (k.isValid()&&k.interestOps()==0)
520                                 {
521                                     k.cancel();
522                                 }
523                             }
524                             return;
525                         }
526                     }
527                     else
528                         _jvmBug=0;
529                 }
530                 else 
531                 {
532                     selector.selectNow();
533                     _jvmBug=0;
534                 }
535 
536                 // have we been destroyed while sleeping
537                 if (_selector==null || !selector.isOpen())
538                     return;
539 
540                 // Look for things to do
541                 Iterator iter = selector.selectedKeys().iterator();
542                 while (iter.hasNext())
543                 {
544                     key = (SelectionKey) iter.next();
545                                         
546                     try
547                     {
548                         if (!key.isValid())
549                         {
550                             key.cancel();
551                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
552                             if (endpoint != null)
553                                 endpoint.doUpdateKey();
554                             continue;
555                         }
556 
557                         Object att = key.attachment();
558                         if (att instanceof SelectChannelEndPoint)
559                         {
560                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
561                             endpoint.dispatch();
562                         }
563                         else if (key.isAcceptable())
564                         {
565                             SocketChannel channel = acceptChannel(key);
566                             if (channel==null)
567                                 continue;
568 
569                             channel.configureBlocking(false);
570 
571                             // TODO make it reluctant to leave 0
572                             _nextSet=++_nextSet%_selectSet.length;
573 
574                             // Is this for this selectset
575                             if (_nextSet==_setID)
576                             {
577                                 // bind connections to this select set.
578                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
579                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
580                                 cKey.attach(endpoint);
581                                 if (endpoint != null)
582                                     endpoint.dispatch();
583                             }
584                             else
585                             {
586                                 // nope - give it to another.
587                                 _selectSet[_nextSet].addChange(channel);
588                                 _selectSet[_nextSet].wakeup();
589                             }
590                         }
591                         else if (key.isConnectable())
592                         {
593                             // Complete a connection of a registered channel
594                             SocketChannel channel = (SocketChannel)key.channel();
595                             boolean connected=false;
596                             try
597                             {
598                                 connected=channel.finishConnect();
599                             }
600                             catch(Exception e)
601                             {
602                                 connectionFailed(channel,e,att);
603                             }
604                             finally
605                             {
606                                 if (connected)
607                                 {
608                                     key.interestOps(SelectionKey.OP_READ);
609                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
610                                     key.attach(endpoint);
611                                     endpoint.dispatch();
612                                 }
613                                 else
614                                 {
615                                     key.cancel();
616                                 }
617                             }
618                         }
619                         else
620                         {
621                             // Wrap readable registered channel in an endpoint
622                             SocketChannel channel = (SocketChannel)key.channel();
623                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
624                             key.attach(endpoint);
625                             if (key.isReadable())
626                                 endpoint.dispatch();                           
627                         }
628                         key = null;
629                     }
630                     catch (CancelledKeyException e)
631                     {
632                         Log.ignore(e);
633                     }
634                     catch (Exception e)
635                     {
636                         if (isRunning())
637                             Log.warn(e);
638                         else
639                             Log.ignore(e);
640 
641                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
642                         {
643                             key.interestOps(0);
644 
645                             key.cancel();
646                         } 
647                     }
648                 }
649                 
650                 // Everything always handled
651                 selector.selectedKeys().clear();
652 
653                 // tick over the timers
654                 _idleTimeout.tick(now);
655                 _retryTimeout.tick(now);
656                 
657             }
658             catch (CancelledKeyException e)
659             {
660                 Log.ignore(e);
661             }
662             finally
663             {
664                 _selecting=false;
665             }
666         }
667 
668         /* ------------------------------------------------------------ */
669         public SelectorManager getManager()
670         {
671             return SelectorManager.this;
672         }
673 
674         /* ------------------------------------------------------------ */
675         public long getNow()
676         {
677             return _idleTimeout.getNow();
678         }
679         
680         /* ------------------------------------------------------------ */
681         public void scheduleIdle(Timeout.Task task)
682         {
683             synchronized (this)
684             {
685                 if (_idleTimeout.getDuration() <= 0)
686                     return;
687                 
688                 task.schedule(_idleTimeout);
689             }
690         }
691 
692         /* ------------------------------------------------------------ */
693         public void scheduleTimeout(Timeout.Task task, long timeout)
694         {
695             synchronized (this)
696             {
697                 _retryTimeout.schedule(task, timeout);
698             }
699         }
700 
701         /* ------------------------------------------------------------ */
702         public void wakeup()
703         {
704             Selector selector = _selector;
705             if (selector!=null)
706                 selector.wakeup();
707         }
708 
709         /* ------------------------------------------------------------ */
710         Selector getSelector()
711         {
712             return _selector;
713         }
714 
715         /* ------------------------------------------------------------ */
716         void stop() throws Exception
717         {
718             boolean selecting=true;
719             while(selecting)
720             {
721                 wakeup();
722                 selecting=_selecting;
723             }
724             
725             ArrayList keys=new ArrayList(_selector.keys());
726             Iterator iter =keys.iterator();
727 
728             while (iter.hasNext())
729             {
730                 SelectionKey key = (SelectionKey)iter.next();
731                 if (key==null)
732                     continue;
733                 Object att=key.attachment();
734                 if (att instanceof EndPoint)
735                 {
736                     EndPoint endpoint = (EndPoint)att;
737                     try
738                     {
739                         endpoint.close();
740                     }
741                     catch(IOException e)
742                     {
743                         Log.ignore(e);
744                     }
745                 }
746             }
747             
748             synchronized (this)
749             {
750                 selecting=_selecting;
751                 while(selecting)
752                 {
753                     wakeup();
754                     selecting=_selecting;
755                 }
756                 
757                 _idleTimeout.cancelAll();
758                 _retryTimeout.cancelAll();
759                 try
760                 {
761                     if (_selector != null)
762                         _selector.close();
763                 }
764                 catch (IOException e)
765                 {
766                     Log.ignore(e);
767                 } 
768                 _selector=null;
769             }
770         }
771     }
772 
773     /* ------------------------------------------------------------ */
774     private static class ChangeSelectableChannel
775     {
776         final SelectableChannel _channel;
777         final Object _attachment;
778         
779         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
780         {
781             super();
782             _channel = channel;
783             _attachment = attachment;
784         }
785     }
786 
787     /* ------------------------------------------------------------ */
788     private interface ChangeTask
789     {
790         public void run();
791     }
792 }