1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import javax.servlet.http.Cookie;
23
24 import org.mortbay.io.Buffer;
25 import org.mortbay.io.ByteArrayBuffer;
26 import org.mortbay.jetty.HttpHeaders;
27 import org.mortbay.jetty.client.security.Authorization;
28 import org.mortbay.jetty.client.security.SecurityListener;
29 import org.mortbay.jetty.servlet.PathMap;
30 import org.mortbay.log.Log;
31
32
33
34
35
36 public class HttpDestination
37 {
38 private ByteArrayBuffer _hostHeader;
39 private final Address _address;
40 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42 private final HttpClient _client;
43 private final boolean _ssl;
44 private int _maxConnections;
45 private int _pendingConnections=0;
46 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
47 private int _newConnection=0;
48 private Address _proxy;
49 private Authorization _proxyAuthentication;
50 private PathMap _authorizations;
51 private List<Cookie> _cookies;
52
53 public void dump() throws IOException
54 {
55 synchronized (this)
56 {
57 System.err.println(this);
58 System.err.println("connections="+_connections.size());
59 System.err.println("idle="+_idle.size());
60 System.err.println("pending="+_pendingConnections);
61 for (HttpConnection c : _connections)
62 {
63 if (!c.isIdle())
64 c.dump();
65 }
66 }
67 }
68
69
70 private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
71
72
73 HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
74 {
75 _client=pool;
76 _address=address;
77 _ssl=ssl;
78 _maxConnections=maxConnections;
79 String addressString = address.getHost();
80 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81 _hostHeader = new ByteArrayBuffer(addressString);
82 }
83
84
85 public Address getAddress()
86 {
87 return _address;
88 }
89
90
91 public Buffer getHostHeader()
92 {
93 return _hostHeader;
94 }
95
96
97 public HttpClient getHttpClient()
98 {
99 return _client;
100 }
101
102
103 public boolean isSecure()
104 {
105 return _ssl;
106 }
107
108
109 public void addAuthorization(String pathSpec,Authorization authorization)
110 {
111 synchronized (this)
112 {
113 if (_authorizations==null)
114 _authorizations=new PathMap();
115 _authorizations.put(pathSpec,authorization);
116 }
117
118
119 }
120
121
122 public void addCookie(Cookie cookie)
123 {
124 synchronized (this)
125 {
126 if (_cookies==null)
127 _cookies=new ArrayList<Cookie>();
128 _cookies.add(cookie);
129 }
130
131
132 }
133
134
135
136
137
138
139
140
141
142
143 private HttpConnection getConnection(long timeout) throws IOException
144 {
145 HttpConnection connection = null;
146
147 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
148 {
149 int totalConnections = 0;
150 boolean starting = false;
151 synchronized (this)
152 {
153 totalConnections = _connections.size() + _pendingConnections;
154 if (totalConnections < _maxConnections)
155 {
156 _newConnection++;
157 startNewConnection();
158 starting = true;
159 }
160 }
161
162 if (!starting)
163 {
164 try
165 {
166 Thread.currentThread().sleep(200);
167 timeout-=200;
168 }
169 catch (InterruptedException e)
170 {
171 Log.ignore(e);
172 }
173 }
174 else
175 {
176 try
177 {
178 Object o = _newQueue.take();
179 if (o instanceof HttpConnection)
180 {
181 connection = (HttpConnection)o;
182 }
183 else
184 throw (IOException)o;
185 }
186 catch (InterruptedException e)
187 {
188 Log.ignore(e);
189 }
190 }
191 }
192 return connection;
193 }
194
195
196 public HttpConnection reserveConnection(long timeout) throws IOException
197 {
198 HttpConnection connection = getConnection(timeout);
199 if (connection != null)
200 connection.setReserved(true);
201 return connection;
202 }
203
204
205 public HttpConnection getIdleConnection() throws IOException
206 {
207 long now = System.currentTimeMillis();
208 long idleTimeout=_client.getIdleTimeout();
209 HttpConnection connection = null;
210 while (true)
211 {
212 synchronized (this)
213 {
214 if (connection!=null)
215 {
216 _connections.remove(connection);
217 connection.close();
218 connection=null;
219 }
220 if (_idle.size() > 0)
221 connection = _idle.remove(_idle.size()-1);
222 }
223
224 if (connection==null)
225 return null;
226
227 long last = connection.getLast();
228 if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
229 return connection;
230
231 }
232 }
233
234
235 protected void startNewConnection()
236 {
237 try
238 {
239 synchronized (this)
240 {
241 _pendingConnections++;
242 }
243 _client._connector.startConnection(this);
244 }
245 catch(Exception e)
246 {
247 onConnectionFailed(e);
248 }
249 }
250
251
252 public void onConnectionFailed(Throwable throwable)
253 {
254 Throwable connect_failure=null;
255
256 synchronized (this)
257 {
258 _pendingConnections--;
259 if (_newConnection>0)
260 {
261 connect_failure=throwable;
262 _newConnection--;
263 }
264 else if (_queue.size()>0)
265 {
266 HttpExchange ex=_queue.removeFirst();
267 ex.getEventListener().onConnectionFailed(throwable);
268 }
269 }
270
271 if(connect_failure!=null)
272 {
273 try
274 {
275 _newQueue.put(connect_failure);
276 }
277 catch (InterruptedException e)
278 {
279 Log.ignore(e);
280 }
281 }
282 }
283
284
285 public void onException(Throwable throwable)
286 {
287 synchronized (this)
288 {
289 _pendingConnections--;
290 if (_queue.size()>0)
291 {
292 HttpExchange ex=_queue.removeFirst();
293 ex.getEventListener().onException(throwable);
294 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
295 }
296 }
297 }
298
299
300 public void onNewConnection(HttpConnection connection) throws IOException
301 {
302 HttpConnection q_connection=null;
303
304 synchronized (this)
305 {
306 _pendingConnections--;
307 _connections.add(connection);
308
309 if (_newConnection>0)
310 {
311 q_connection=connection;
312 _newConnection--;
313 }
314 else if (_queue.size()==0)
315 {
316 _idle.add(connection);
317 }
318 else
319 {
320 HttpExchange ex=_queue.removeFirst();
321 connection.send(ex);
322 }
323 }
324
325 if (q_connection!=null)
326 {
327 try
328 {
329 _newQueue.put(q_connection);
330 }
331 catch (InterruptedException e)
332 {
333 Log.ignore(e);
334 }
335 }
336 }
337
338
339 public void returnConnection(HttpConnection connection, boolean close) throws IOException
340 {
341 if (connection.isReserved())
342 connection.setReserved(false);
343
344 if (close)
345 {
346 try
347 {
348 connection.close();
349 }
350 catch(IOException e)
351 {
352 Log.ignore(e);
353 }
354 }
355
356 if (!_client.isStarted())
357 return;
358
359 if (!close && connection.getEndPoint().isOpen())
360 {
361 synchronized (this)
362 {
363 if (_queue.size()==0)
364 {
365 connection.setLast(System.currentTimeMillis());
366 _idle.add(connection);
367 }
368 else
369 {
370 HttpExchange ex = _queue.removeFirst();
371 connection.send(ex);
372 }
373 this.notifyAll();
374 }
375 }
376 else
377 {
378 synchronized (this)
379 {
380 _connections.remove(connection);
381 if (!_queue.isEmpty())
382 startNewConnection();
383 }
384 }
385 }
386
387
388 public void send(HttpExchange ex) throws IOException
389 {
390 LinkedList<String> listeners = _client.getRegisteredListeners();
391
392 if (listeners != null)
393 {
394
395 for (int i = listeners.size(); i > 0; --i)
396 {
397 String listenerClass = listeners.get(i - 1);
398
399 try
400 {
401 Class listener = Class.forName(listenerClass);
402 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
403 HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
404 ex.setEventListener(elistener);
405 }
406 catch (Exception e)
407 {
408 Log.debug(e);
409 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
410 }
411 }
412 }
413
414
415 if ( _client.hasRealms() )
416 {
417 ex.setEventListener( new SecurityListener( this, ex ) );
418 }
419
420 doSend(ex);
421 }
422
423
424 public void resend(HttpExchange ex) throws IOException
425 {
426 ex.getEventListener().onRetry();
427 doSend(ex);
428 }
429
430
431 protected void doSend(HttpExchange ex) throws IOException
432 {
433
434
435 if (_cookies!=null)
436 {
437 StringBuilder buf=null;
438 for (Cookie cookie : _cookies)
439 {
440 if (buf==null)
441 buf=new StringBuilder();
442 else
443 buf.append("; ");
444 buf.append(cookie.getName());
445 buf.append("=");
446 buf.append(cookie.getValue());
447 }
448 if (buf!=null)
449 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
450 }
451
452
453 if (_authorizations!=null)
454 {
455 Authorization auth= (Authorization)_authorizations.match(ex.getURI());
456 if (auth !=null)
457 ((Authorization)auth).setCredentials(ex);
458 }
459
460 HttpConnection connection = getIdleConnection();
461 if (connection != null)
462 {
463 boolean sent = connection.send(ex);
464 if (!sent) connection = null;
465 }
466
467 if (connection == null)
468 {
469 synchronized (this)
470 {
471 _queue.add(ex);
472 if (_connections.size() + _pendingConnections < _maxConnections)
473 {
474 startNewConnection();
475 }
476 }
477 }
478 }
479
480
481 public synchronized String toString()
482 {
483 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
484 }
485
486
487 public synchronized String toDetailString()
488 {
489 StringBuilder b = new StringBuilder();
490 b.append(toString());
491 b.append('\n');
492 synchronized(this)
493 {
494 for (HttpConnection connection : _connections)
495 {
496 if (connection._exchange!=null)
497 {
498 b.append(connection.toDetailString());
499 if (_idle.contains(connection))
500 b.append(" IDLE");
501 b.append('\n');
502 }
503 }
504 }
505 b.append("--");
506 b.append('\n');
507
508 return b.toString();
509 }
510
511
512 public void setProxy(Address proxy)
513 {
514 _proxy=proxy;
515 }
516
517
518 public Address getProxy()
519 {
520 return _proxy;
521 }
522
523
524 public Authorization getProxyAuthentication()
525 {
526 return _proxyAuthentication;
527 }
528
529
530 public void setProxyAuthentication(Authorization authentication)
531 {
532 _proxyAuthentication = authentication;
533 }
534
535
536 public boolean isProxied()
537 {
538 return _proxy!=null;
539 }
540
541
542 public void close() throws IOException
543 {
544 synchronized (this)
545 {
546 for (HttpConnection connection : _connections)
547 {
548 connection.close();
549 }
550 }
551 }
552
553 }