001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.commons.collections.buffer; 018 019 import java.io.PrintWriter; 020 import java.io.StringWriter; 021 import java.util.Collection; 022 023 import org.apache.commons.collections.Buffer; 024 import org.apache.commons.collections.BufferUnderflowException; 025 026 /** 027 * Decorates another <code>Buffer</code> to make {@link #get()} and 028 * {@link #remove()} block when the <code>Buffer</code> is empty. 029 * <p> 030 * If either <code>get</code> or <code>remove</code> is called on an empty 031 * <code>Buffer</code>, the calling thread waits for notification that 032 * an <code>add</code> or <code>addAll</code> operation has completed. 033 * <p> 034 * When one or more entries are added to an empty <code>Buffer</code>, 035 * all threads blocked in <code>get</code> or <code>remove</code> are notified. 036 * There is no guarantee that concurrent blocked <code>get</code> or 037 * <code>remove</code> requests will be "unblocked" and receive data in the 038 * order that they arrive. 039 * <p> 040 * This class is Serializable from Commons Collections 3.1. 041 * This class contains an extra field in 3.2, however the serialization 042 * specification will handle this gracefully. 043 * 044 * @author Stephen Colebourne 045 * @author Janek Bogucki 046 * @author Phil Steitz 047 * @author James Carman 048 * @version $Revision: 646777 $ $Date: 2008-04-10 13:33:15 +0100 (Thu, 10 Apr 2008) $ 049 * @since Commons Collections 3.0 050 */ 051 public class BlockingBuffer extends SynchronizedBuffer { 052 053 /** Serialization version. */ 054 private static final long serialVersionUID = 1719328905017860541L; 055 /** The timeout value in milliseconds. */ 056 private final long timeout; 057 058 /** 059 * Factory method to create a blocking buffer. 060 * 061 * @param buffer the buffer to decorate, must not be null 062 * @return a new blocking Buffer 063 * @throws IllegalArgumentException if buffer is null 064 */ 065 public static Buffer decorate(Buffer buffer) { 066 return new BlockingBuffer(buffer); 067 } 068 069 /** 070 * Factory method to create a blocking buffer with a timeout value. 071 * 072 * @param buffer the buffer to decorate, must not be null 073 * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout 074 * @return a new blocking buffer 075 * @throws IllegalArgumentException if the buffer is null 076 * @since Commons Collections 3.2 077 */ 078 public static Buffer decorate(Buffer buffer, long timeoutMillis) { 079 return new BlockingBuffer(buffer, timeoutMillis); 080 } 081 082 //----------------------------------------------------------------------- 083 /** 084 * Constructor that wraps (not copies). 085 * 086 * @param buffer the buffer to decorate, must not be null 087 * @throws IllegalArgumentException if the buffer is null 088 */ 089 protected BlockingBuffer(Buffer buffer) { 090 super(buffer); 091 this.timeout = 0; 092 } 093 094 /** 095 * Constructor that wraps (not copies). 096 * 097 * @param buffer the buffer to decorate, must not be null 098 * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout 099 * @throws IllegalArgumentException if the buffer is null 100 * @since Commons Collections 3.2 101 */ 102 protected BlockingBuffer(Buffer buffer, long timeoutMillis) { 103 super(buffer); 104 this.timeout = (timeoutMillis < 0 ? 0 : timeoutMillis); 105 } 106 107 //----------------------------------------------------------------------- 108 public boolean add(Object o) { 109 synchronized (lock) { 110 boolean result = collection.add(o); 111 lock.notifyAll(); 112 return result; 113 } 114 } 115 116 public boolean addAll(Collection c) { 117 synchronized (lock) { 118 boolean result = collection.addAll(c); 119 lock.notifyAll(); 120 return result; 121 } 122 } 123 124 /** 125 * Gets the next value from the buffer, waiting until an object is 126 * added if the buffer is empty. This method uses the default timeout 127 * set in the constructor. 128 * 129 * @throws BufferUnderflowException if an interrupt is received 130 */ 131 public Object get() { 132 synchronized (lock) { 133 while (collection.isEmpty()) { 134 try { 135 if (timeout <= 0) { 136 lock.wait(); 137 } else { 138 return get(timeout); 139 } 140 } catch (InterruptedException e) { 141 PrintWriter out = new PrintWriter(new StringWriter()); 142 e.printStackTrace(out); 143 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); 144 } 145 } 146 return getBuffer().get(); 147 } 148 } 149 150 /** 151 * Gets the next value from the buffer, waiting until an object is 152 * added for up to the specified timeout value if the buffer is empty. 153 * 154 * @param timeout the timeout value in milliseconds 155 * @throws BufferUnderflowException if an interrupt is received 156 * @throws BufferUnderflowException if the timeout expires 157 * @since Commons Collections 3.2 158 */ 159 public Object get(final long timeout) { 160 synchronized (lock) { 161 final long expiration = System.currentTimeMillis() + timeout; 162 long timeLeft = expiration - System.currentTimeMillis(); 163 while (timeLeft > 0 && collection.isEmpty()) { 164 try { 165 lock.wait(timeLeft); 166 timeLeft = expiration - System.currentTimeMillis(); 167 } catch(InterruptedException e) { 168 PrintWriter out = new PrintWriter(new StringWriter()); 169 e.printStackTrace(out); 170 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); 171 } 172 } 173 if (collection.isEmpty()) { 174 throw new BufferUnderflowException("Timeout expired"); 175 } 176 return getBuffer().get(); 177 } 178 } 179 180 /** 181 * Removes the next value from the buffer, waiting until an object is 182 * added if the buffer is empty. This method uses the default timeout 183 * set in the constructor. 184 * 185 * @throws BufferUnderflowException if an interrupt is received 186 */ 187 public Object remove() { 188 synchronized (lock) { 189 while (collection.isEmpty()) { 190 try { 191 if (timeout <= 0) { 192 lock.wait(); 193 } else { 194 return remove(timeout); 195 } 196 } catch (InterruptedException e) { 197 PrintWriter out = new PrintWriter(new StringWriter()); 198 e.printStackTrace(out); 199 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); 200 } 201 } 202 return getBuffer().remove(); 203 } 204 } 205 206 /** 207 * Removes the next value from the buffer, waiting until an object is 208 * added for up to the specified timeout value if the buffer is empty. 209 * 210 * @param timeout the timeout value in milliseconds 211 * @throws BufferUnderflowException if an interrupt is received 212 * @throws BufferUnderflowException if the timeout expires 213 * @since Commons Collections 3.2 214 */ 215 public Object remove(final long timeout) { 216 synchronized (lock) { 217 final long expiration = System.currentTimeMillis() + timeout; 218 long timeLeft = expiration - System.currentTimeMillis(); 219 while (timeLeft > 0 && collection.isEmpty()) { 220 try { 221 lock.wait(timeLeft); 222 timeLeft = expiration - System.currentTimeMillis(); 223 } catch(InterruptedException e) { 224 PrintWriter out = new PrintWriter(new StringWriter()); 225 e.printStackTrace(out); 226 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); 227 } 228 } 229 if (collection.isEmpty()) { 230 throw new BufferUnderflowException("Timeout expired"); 231 } 232 return getBuffer().remove(); 233 } 234 } 235 236 }