001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.distributable; 022 023import java.lang.reflect.Method; 024import java.text.MessageFormat; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.TreeMap; 029import java.util.Vector; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.Lock; 034 035import net.sf.hajdbc.DatabaseCluster; 036import net.sf.hajdbc.LockManager; 037 038import org.jgroups.Address; 039import org.jgroups.Message; 040import org.jgroups.MessageListener; 041import org.jgroups.blocks.GroupRequest; 042import org.jgroups.blocks.MethodCall; 043import org.jgroups.blocks.RpcDispatcher; 044import org.jgroups.util.Rsp; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * LockManager implementation that leverages a JGroups 2-phase voting adapter for obtain remote write locks. 050 * 051 * @author Paul Ferraro 052 */ 053public class DistributableLockManager extends AbstractMembershipListener implements LockManager, MessageListener 054{ 055 private static final String CHANNEL = "{0}-lock"; //$NON-NLS-1$ 056 057 static Logger logger = LoggerFactory.getLogger(DistributableLockManager.class); 058 059 protected RpcDispatcher dispatcher; 060 protected int timeout; 061 private LockManager lockManager; 062 private Map<Address, Map<String, Lock>> addressMap = new ConcurrentHashMap<Address, Map<String, Lock>>(); 063 064 /** 065 * Constructs a new DistributableLock. 066 * @param <D> either java.sql.Driver or javax.sql.Datasource 067 * @param databaseCluster a database cluster 068 * @param decorator a decorator 069 * @throws Exception 070 */ 071 public <D> DistributableLockManager(DatabaseCluster<D> databaseCluster, DistributableDatabaseClusterDecorator decorator) throws Exception 072 { 073 super(decorator.createChannel(MessageFormat.format(CHANNEL, databaseCluster.getId()))); 074 075 this.lockManager = databaseCluster.getLockManager(); 076 077 this.timeout = decorator.getTimeout(); 078 079 this.dispatcher = new RpcDispatcher(this.channel, this, this, this); 080 } 081 082 /** 083 * @see net.sf.hajdbc.Lifecycle#start() 084 */ 085 @Override 086 public void start() throws Exception 087 { 088 this.channel.connect(this.channel.getClusterName()); 089 090 this.lockManager.start(); 091 } 092 093 /** 094 * @see net.sf.hajdbc.Lifecycle#stop() 095 */ 096 @Override 097 public void stop() 098 { 099 this.channel.close(); 100 101 this.lockManager.stop(); 102 } 103 104 /** 105 * Read locks are local. 106 * @see net.sf.hajdbc.LockManager#readLock(java.lang.String) 107 */ 108 @Override 109 public Lock readLock(String object) 110 { 111 return this.lockManager.readLock(object); 112 } 113 114 /** 115 * Write locks are distributed. 116 * @see net.sf.hajdbc.LockManager#writeLock(java.lang.String) 117 */ 118 @Override 119 public Lock writeLock(String object) 120 { 121 return new DistributableLock(object, this.lockManager.writeLock(object)); 122 } 123 124 /** 125 * Votes on the specified decree. 126 * @param decree a lock decree 127 * @return true, if success, false if failure 128 */ 129 public boolean vote(LockDecree decree) 130 { 131 Map<String, Lock> lockMap = this.addressMap.get(decree.getAddress()); 132 133 // Vote negatively for decrees from non-members 134 if (lockMap == null) 135 { 136 return false; 137 } 138 139 return decree.vote(this.lockManager, lockMap); 140 } 141 142 /** 143 * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberJoined(org.jgroups.Address) 144 */ 145 @Override 146 protected void memberJoined(Address address) 147 { 148 this.addressMap.put(address, new HashMap<String, Lock>()); 149 } 150 151 /** 152 * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberLeft(org.jgroups.Address) 153 */ 154 @Override 155 protected void memberLeft(Address address) 156 { 157 Map<String, Lock> lockMap = this.addressMap.remove(address); 158 159 for (Lock lock: lockMap.values()) 160 { 161 lock.unlock(); 162 } 163 } 164 165 /** 166 * @see org.jgroups.MessageListener#getState() 167 */ 168 @Override 169 public byte[] getState() 170 { 171 return null; 172 } 173 174 /** 175 * @see org.jgroups.MessageListener#receive(org.jgroups.Message) 176 */ 177 @Override 178 public void receive(Message message) 179 { 180 // Do nothing 181 } 182 183 /** 184 * @see org.jgroups.MessageListener#setState(byte[]) 185 */ 186 @Override 187 public void setState(byte[] arg0) 188 { 189 // Do nothing 190 } 191 192 private class DistributableLock implements Lock 193 { 194 private LockDecree acquireDecree; 195 private LockDecree releaseDecree; 196 private Lock lock; 197 198 /** 199 * @param object 200 * @param lock 201 */ 202 public DistributableLock(String object, Lock lock) 203 { 204 Address address = DistributableLockManager.this.channel.getLocalAddress(); 205 206 this.acquireDecree = new AcquireLockDecree(object, address); 207 this.releaseDecree = new ReleaseLockDecree(object, address); 208 209 this.lock = lock; 210 } 211 212 /** 213 * @see java.util.concurrent.locks.Lock#lock() 214 */ 215 @Override 216 public void lock() 217 { 218 while (!DistributableLockManager.this.isMembershipEmpty()) 219 { 220 if (this.tryLockFairly()) return; 221 222 Thread.yield(); 223 } 224 225 this.lock.lock(); 226 } 227 228 /** 229 * @see java.util.concurrent.locks.Lock#lockInterruptibly() 230 */ 231 @Override 232 public void lockInterruptibly() throws InterruptedException 233 { 234 while (!DistributableLockManager.this.isMembershipEmpty()) 235 { 236 if (this.tryLockFairly()) return; 237 238 if (Thread.currentThread().isInterrupted()) 239 { 240 throw new InterruptedException(); 241 } 242 243 Thread.yield(); 244 } 245 246 this.lock.lockInterruptibly(); 247 } 248 249 /** 250 * @see java.util.concurrent.locks.Lock#tryLock() 251 */ 252 @Override 253 public boolean tryLock() 254 { 255 if (this.lock.tryLock()) 256 { 257 if (this.tryRemoteLock()) 258 { 259 return true; 260 } 261 262 this.lock.unlock(); 263 } 264 265 return false; 266 } 267 268 /** 269 * Like {@link #tryLock()}, but do not barge on other waiting threads 270 * @return true, if lock acquired, false otherwise 271 * @throws InterruptedException 272 */ 273 private boolean tryLockFairly() 274 { 275 try 276 { 277 if (this.lock.tryLock(0, TimeUnit.SECONDS)) 278 { 279 if (this.tryRemoteLock()) 280 { 281 return true; 282 } 283 284 this.lock.unlock(); 285 } 286 } 287 catch (InterruptedException e) 288 { 289 Thread.currentThread().interrupt(); 290 } 291 292 return false; 293 } 294 295 /** 296 * @see java.util.concurrent.locks.Lock#tryLock(long, java.util.concurrent.TimeUnit) 297 */ 298 @Override 299 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException 300 { 301 // Convert timeout to milliseconds 302 long ms = unit.toMillis(timeout); 303 304 long stopTime = System.currentTimeMillis() + ms; 305 306 do 307 { 308 if (DistributableLockManager.this.isMembershipEmpty()) 309 { 310 return this.lock.tryLock(ms, TimeUnit.MILLISECONDS); 311 } 312 313 if (this.tryLockFairly()) 314 { 315 return true; 316 } 317 318 if (Thread.currentThread().isInterrupted()) 319 { 320 throw new InterruptedException(); 321 } 322 323 ms = stopTime - System.currentTimeMillis(); 324 } 325 while (ms >= 0); 326 327 return false; 328 } 329 330 /** 331 * @see java.util.concurrent.locks.Lock#unlock() 332 */ 333 @Override 334 public void unlock() 335 { 336 this.remoteUnlock(); 337 338 this.lock.unlock(); 339 } 340 341 private boolean tryRemoteLock() 342 { 343 Map<Boolean, Vector<Address>> map = null; 344 345 try 346 { 347 map = this.remoteLock(); 348 349 return map.get(false).isEmpty(); 350 } 351 finally 352 { 353 if (map != null) 354 { 355 this.remoteUnlock(map.get(true)); 356 } 357 } 358 } 359 360 private Map<Boolean, Vector<Address>> remoteLock() 361 { 362 return DistributableLockManager.this.remoteVote(this.acquireDecree, null, DistributableLockManager.this.timeout); 363 } 364 365 private Map<Boolean, Vector<Address>> remoteUnlock() 366 { 367 return this.remoteUnlock(null); 368 } 369 370 private Map<Boolean, Vector<Address>> remoteUnlock(Vector<Address> address) 371 { 372 return DistributableLockManager.this.remoteVote(this.releaseDecree, address, 0); 373 } 374 375 /** 376 * @see java.util.concurrent.locks.Lock#newCondition() 377 */ 378 @Override 379 public Condition newCondition() 380 { 381 throw new UnsupportedOperationException(); 382 } 383 } 384 385 Map<Boolean, Vector<Address>> remoteVote(LockDecree decree, Vector<Address> addresses, long timeout) 386 { 387 Map<Boolean, Vector<Address>> map = new TreeMap<Boolean, Vector<Address>>(); 388 389 int size = (addresses != null) ? addresses.size() : this.getMembershipSize(); 390 391 map.put(true, new Vector<Address>(size)); 392 map.put(false, new Vector<Address>(size)); 393 394 if (size > 0) 395 { 396 try 397 { 398 Method method = this.getClass().getMethod("vote", LockDecree.class); //$NON-NLS-1$ 399 400 MethodCall call = new MethodCall(method, new Object[] { decree }); 401 402 Collection<Rsp> responses = this.dispatcher.callRemoteMethods(addresses, call, GroupRequest.GET_ALL, timeout).values(); 403 404 for (Rsp response: responses) 405 { 406 Object value = response.wasReceived() ? response.getValue() : false; 407 408 map.get((value != null) ? value : false).add(response.getSender()); 409 } 410 } 411 catch (NoSuchMethodException e) 412 { 413 throw new IllegalStateException(e); 414 } 415 } 416 417 return map; 418 } 419}