001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.sql; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.FileWriter; 027import java.io.IOException; 028import java.io.InputStream; 029import java.net.URL; 030import java.nio.channels.Channels; 031import java.nio.channels.FileChannel; 032import java.nio.channels.WritableByteChannel; 033import java.sql.Connection; 034import java.sql.SQLException; 035import java.sql.Statement; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.Comparator; 039import java.util.HashMap; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.NoSuchElementException; 044import java.util.Set; 045import java.util.TreeMap; 046import java.util.TreeSet; 047import java.util.concurrent.Callable; 048import java.util.concurrent.CopyOnWriteArrayList; 049import java.util.concurrent.ExecutionException; 050import java.util.concurrent.ExecutorService; 051import java.util.concurrent.Future; 052import java.util.concurrent.SynchronousQueue; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.locks.Lock; 056 057import javax.management.DynamicMBean; 058import javax.management.JMException; 059import javax.management.MBeanRegistration; 060import javax.management.MBeanServer; 061import javax.management.ObjectName; 062 063import net.sf.hajdbc.Balancer; 064import net.sf.hajdbc.Database; 065import net.sf.hajdbc.DatabaseActivationListener; 066import net.sf.hajdbc.DatabaseCluster; 067import net.sf.hajdbc.DatabaseClusterDecorator; 068import net.sf.hajdbc.DatabaseClusterFactory; 069import net.sf.hajdbc.DatabaseClusterMBean; 070import net.sf.hajdbc.DatabaseDeactivationListener; 071import net.sf.hajdbc.DatabaseEvent; 072import net.sf.hajdbc.DatabaseMetaDataCache; 073import net.sf.hajdbc.DatabaseMetaDataCacheFactory; 074import net.sf.hajdbc.Dialect; 075import net.sf.hajdbc.LockManager; 076import net.sf.hajdbc.Messages; 077import net.sf.hajdbc.StateManager; 078import net.sf.hajdbc.SynchronizationContext; 079import net.sf.hajdbc.SynchronizationListener; 080import net.sf.hajdbc.SynchronizationStrategy; 081import net.sf.hajdbc.local.LocalLockManager; 082import net.sf.hajdbc.local.LocalStateManager; 083import net.sf.hajdbc.sync.SynchronizationContextImpl; 084import net.sf.hajdbc.sync.SynchronizationStrategyBuilder; 085import net.sf.hajdbc.util.concurrent.CronThreadPoolExecutor; 086 087import org.jibx.runtime.BindingDirectory; 088import org.jibx.runtime.IMarshallingContext; 089import org.jibx.runtime.IUnmarshallingContext; 090import org.jibx.runtime.JiBXException; 091import org.quartz.CronExpression; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095/** 096 * @author Paul Ferraro 097 * @param <D> either java.sql.Driver or javax.sql.DataSource 098 * @since 1.0 099 */ 100public abstract class AbstractDatabaseCluster<D> implements DatabaseCluster<D>, DatabaseClusterMBean, MBeanRegistration 101{ 102 /** This is a work-around for Java 1.4, where Boolean does not implement Comparable */ 103 private static final Comparator<Boolean> booleanComparator = new Comparator<Boolean>() 104 { 105 @Override 106 public int compare(Boolean value1, Boolean value2) 107 { 108 return this.valueOf(value1) - this.valueOf(value2); 109 } 110 111 private int valueOf(Boolean value) 112 { 113 return value.booleanValue() ? 1 : 0; 114 } 115 }; 116 117 static Logger logger = LoggerFactory.getLogger(AbstractDatabaseCluster.class); 118 119// private static final Method isValidMethod = Methods.findMethod(Connection.class, "isValid", Integer.TYPE); 120 121 private String id; 122 private Balancer<D> balancer; 123 private Dialect dialect; 124 private DatabaseMetaDataCacheFactory databaseMetaDataCacheFactory; 125 private DatabaseMetaDataCache databaseMetaDataCache; 126 private String defaultSynchronizationStrategyId; 127 private CronExpression failureDetectionExpression; 128 private CronExpression autoActivationExpression; 129 private int minThreads; 130 private int maxThreads; 131 private int maxIdle; 132 private TransactionMode transactionMode; 133 private boolean identityColumnDetectionEnabled; 134 private boolean sequenceDetectionEnabled; 135 private boolean currentDateEvaluationEnabled; 136 private boolean currentTimeEvaluationEnabled; 137 private boolean currentTimestampEvaluationEnabled; 138 private boolean randEvaluationEnabled; 139 140 private MBeanServer server; 141 private URL url; 142 private Map<String, SynchronizationStrategy> synchronizationStrategyMap = new HashMap<String, SynchronizationStrategy>(); 143 private DatabaseClusterDecorator decorator; 144 private Map<String, Database<D>> databaseMap = new HashMap<String, Database<D>>(); 145 private ExecutorService executor; 146 private CronThreadPoolExecutor cronExecutor = new CronThreadPoolExecutor(2); 147 private LockManager lockManager = new LocalLockManager(); 148 private StateManager stateManager = new LocalStateManager(this); 149 private volatile boolean active = false; 150 private List<DatabaseActivationListener> activationListenerList = new CopyOnWriteArrayList<DatabaseActivationListener>(); 151 private List<DatabaseDeactivationListener> deactivationListenerList = new CopyOnWriteArrayList<DatabaseDeactivationListener>(); 152 private List<SynchronizationListener> synchronizationListenerList = new CopyOnWriteArrayList<SynchronizationListener>(); 153 154 protected AbstractDatabaseCluster(String id, URL url) 155 { 156 this.id = id; 157 this.url = url; 158 } 159 160 /** 161 * @see net.sf.hajdbc.DatabaseCluster#getId() 162 */ 163 @Override 164 public String getId() 165 { 166 return this.id; 167 } 168 169 /** 170 * @see net.sf.hajdbc.DatabaseClusterMBean#getVersion() 171 */ 172 @Override 173 public String getVersion() 174 { 175 return DatabaseClusterFactory.getVersion(); 176 } 177 178 /** 179 * @see net.sf.hajdbc.DatabaseCluster#getAliveMap(java.util.Collection) 180 */ 181 @Override 182 public Map<Boolean, List<Database<D>>> getAliveMap(Collection<Database<D>> databases) 183 { 184 Map<Database<D>, Future<Boolean>> futureMap = new TreeMap<Database<D>, Future<Boolean>>(); 185 186 for (final Database<D> database: databases) 187 { 188 Callable<Boolean> task = new Callable<Boolean>() 189 { 190 public Boolean call() throws Exception 191 { 192 return AbstractDatabaseCluster.this.isAlive(database); 193 } 194 }; 195 196 futureMap.put(database, this.executor.submit(task)); 197 } 198 199 Map<Boolean, List<Database<D>>> map = new TreeMap<Boolean, List<Database<D>>>(booleanComparator); 200 201 int size = databases.size(); 202 203 map.put(false, new ArrayList<Database<D>>(size)); 204 map.put(true, new ArrayList<Database<D>>(size)); 205 206 for (Map.Entry<Database<D>, Future<Boolean>> futureMapEntry: futureMap.entrySet()) 207 { 208 try 209 { 210 map.get(futureMapEntry.getValue().get()).add(futureMapEntry.getKey()); 211 } 212 catch (ExecutionException e) 213 { 214 // isAlive does not throw an exception 215 throw new IllegalStateException(e); 216 } 217 catch (InterruptedException e) 218 { 219 Thread.currentThread().interrupt(); 220 } 221 } 222 223 return map; 224 } 225 226 boolean isAlive(Database<D> database) 227 { 228 try 229 { 230 this.test(database); 231 232 return true; 233 } 234 catch (SQLException e) 235 { 236 logger.warn(Messages.getMessage(Messages.DATABASE_NOT_ALIVE, database, this), e); 237 238 return false; 239 } 240 } 241 242 private void test(Database<D> database) throws SQLException 243 { 244 Connection connection = null; 245 246 try 247 { 248 connection = database.connect(database.createConnectionFactory()); 249 250 Statement statement = connection.createStatement(); 251 252 statement.execute(this.dialect.getSimpleSQL()); 253 254 statement.close(); 255 } 256 finally 257 { 258 if (connection != null) 259 { 260 try 261 { 262 connection.close(); 263 } 264 catch (SQLException e) 265 { 266 logger.warn(e.toString(), e); 267 } 268 } 269 } 270 } 271/* 272 boolean isAliveNew(Database<D> database) 273 { 274 Connection connection = null; 275 276 try 277 { 278 connection = database.connect(database.createConnectionFactory()); 279 280 return this.isAlive(connection); 281 } 282 catch (SQLException e) 283 { 284 logger.warn(Messages.getMessage(Messages.DATABASE_NOT_ALIVE, database, this), e); 285 286 return false; 287 } 288 finally 289 { 290 if (connection != null) 291 { 292 try 293 { 294 connection.close(); 295 } 296 catch (SQLException e) 297 { 298 logger.warn(e.getMessage(), e); 299 } 300 } 301 } 302 } 303 304 private boolean isAlive(Connection connection) 305 { 306 if (isValidMethod != null) 307 { 308 try 309 { 310 return connection.isValid(0); 311 } 312 catch (SQLException e) 313 { 314 // isValid not yet supported 315 } 316 } 317 318 try 319 { 320 Statement statement = connection.createStatement(); 321 322 statement.execute(this.dialect.getSimpleSQL()); 323 324 statement.close(); 325 326 return true; 327 } 328 catch (SQLException e) 329 { 330 logger.warn(e.toString(), e); 331 332 return false; 333 } 334 } 335*/ 336 /** 337 * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager) 338 */ 339 @Override 340 public boolean deactivate(Database<D> database, StateManager manager) 341 { 342 synchronized (this.balancer) 343 { 344 this.unregister(database); 345 // Reregister database mbean using "inactive" interface 346 this.register(database, database.getInactiveMBean()); 347 348 boolean removed = this.balancer.remove(database); 349 350 if (removed) 351 { 352 DatabaseEvent event = new DatabaseEvent(database); 353 354 manager.deactivated(event); 355 356 for (DatabaseDeactivationListener listener: this.deactivationListenerList) 357 { 358 listener.deactivated(event); 359 } 360 } 361 362 return removed; 363 } 364 } 365 366 /** 367 * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager) 368 */ 369 @Override 370 public boolean activate(Database<D> database, StateManager manager) 371 { 372 synchronized (this.balancer) 373 { 374 this.unregister(database); 375 // Reregister database mbean using "active" interface 376 this.register(database, database.getActiveMBean()); 377 378 if (database.isDirty()) 379 { 380 this.export(); 381 382 database.clean(); 383 } 384 385 boolean added = this.balancer.add(database); 386 387 if (added) 388 { 389 DatabaseEvent event = new DatabaseEvent(database); 390 391 manager.activated(event); 392 393 for (DatabaseActivationListener listener: this.activationListenerList) 394 { 395 listener.activated(event); 396 } 397 } 398 399 return added; 400 } 401 } 402 403 /** 404 * @see net.sf.hajdbc.DatabaseClusterMBean#getActiveDatabases() 405 */ 406 @Override 407 public Set<String> getActiveDatabases() 408 { 409 Set<String> databaseSet = new TreeSet<String>(); 410 411 for (Database<D> database: this.balancer.all()) 412 { 413 databaseSet.add(database.getId()); 414 } 415 416 return databaseSet; 417 } 418 419 /** 420 * @see net.sf.hajdbc.DatabaseClusterMBean#getInactiveDatabases() 421 */ 422 @Override 423 public Set<String> getInactiveDatabases() 424 { 425 synchronized (this.databaseMap) 426 { 427 Set<String> databaseSet = new TreeSet<String>(this.databaseMap.keySet()); 428 429 for (Database<D> database: this.balancer.all()) 430 { 431 databaseSet.remove(database.getId()); 432 } 433 434 return databaseSet; 435 } 436 } 437 438 /** 439 * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String) 440 */ 441 @Override 442 public Database<D> getDatabase(String id) 443 { 444 synchronized (this.databaseMap) 445 { 446 Database<D> database = this.databaseMap.get(id); 447 448 if (database == null) 449 { 450 throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_DATABASE, id, this)); 451 } 452 453 return database; 454 } 455 } 456 457 /** 458 * @see net.sf.hajdbc.DatabaseClusterMBean#getDefaultSynchronizationStrategy() 459 */ 460 @Override 461 public String getDefaultSynchronizationStrategy() 462 { 463 return this.defaultSynchronizationStrategyId; 464 } 465 466 /** 467 * @see net.sf.hajdbc.DatabaseClusterMBean#getSynchronizationStrategies() 468 */ 469 @Override 470 public Set<String> getSynchronizationStrategies() 471 { 472 return new TreeSet<String>(this.synchronizationStrategyMap.keySet()); 473 } 474 475 /** 476 * @see net.sf.hajdbc.DatabaseCluster#getBalancer() 477 */ 478 @Override 479 public Balancer<D> getBalancer() 480 { 481 return this.balancer; 482 } 483 484 /** 485 * @see net.sf.hajdbc.DatabaseCluster#getTransactionalExecutor() 486 */ 487 @Override 488 public ExecutorService getTransactionalExecutor() 489 { 490 return this.transactionMode.getTransactionExecutor(this.executor); 491 } 492 493 /** 494 * @see net.sf.hajdbc.DatabaseCluster#getNonTransactionalExecutor() 495 */ 496 @Override 497 public ExecutorService getNonTransactionalExecutor() 498 { 499 return this.executor; 500 } 501 502 /** 503 * @see net.sf.hajdbc.DatabaseCluster#getDialect() 504 */ 505 @Override 506 public Dialect getDialect() 507 { 508 return this.dialect; 509 } 510 511 /** 512 * @see net.sf.hajdbc.DatabaseCluster#getDatabaseMetaDataCache() 513 */ 514 @Override 515 public DatabaseMetaDataCache getDatabaseMetaDataCache() 516 { 517 return this.databaseMetaDataCache; 518 } 519 520 /** 521 * @see net.sf.hajdbc.DatabaseCluster#getLockManager() 522 */ 523 @Override 524 public LockManager getLockManager() 525 { 526 return this.lockManager; 527 } 528 529 /** 530 * @see net.sf.hajdbc.DatabaseClusterMBean#isAlive(java.lang.String) 531 */ 532 @Override 533 public boolean isAlive(String id) 534 { 535 return this.isAlive(this.getDatabase(id)); 536 } 537 538 /** 539 * @see net.sf.hajdbc.DatabaseClusterMBean#deactivate(java.lang.String) 540 */ 541 @Override 542 public void deactivate(String databaseId) 543 { 544 if (this.deactivate(this.getDatabase(databaseId), this.stateManager)) 545 { 546 logger.info(Messages.getMessage(Messages.DATABASE_DEACTIVATED, databaseId, this)); 547 } 548 } 549 550 /** 551 * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String) 552 */ 553 @Override 554 public void activate(String databaseId) 555 { 556 this.activate(databaseId, this.getDefaultSynchronizationStrategy()); 557 } 558 559 /** 560 * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String, java.lang.String) 561 */ 562 @Override 563 public void activate(String databaseId, String strategyId) 564 { 565 SynchronizationStrategy strategy = this.synchronizationStrategyMap.get(strategyId); 566 567 if (strategy == null) 568 { 569 throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_SYNC_STRATEGY, strategyId)); 570 } 571 572 try 573 { 574 if (this.activate(this.getDatabase(databaseId), strategy)) 575 { 576 logger.info(Messages.getMessage(Messages.DATABASE_ACTIVATED, databaseId, this)); 577 } 578 } 579 catch (SQLException e) 580 { 581 logger.warn(Messages.getMessage(Messages.DATABASE_ACTIVATE_FAILED, databaseId, this), e); 582 583 SQLException exception = e.getNextException(); 584 585 while (exception != null) 586 { 587 logger.error(exception.getMessage(), exception); 588 589 exception = exception.getNextException(); 590 } 591 592 throw new IllegalStateException(e.toString()); 593 } 594 catch (InterruptedException e) 595 { 596 logger.warn(e.toString(), e); 597 598 Thread.currentThread().interrupt(); 599 } 600 } 601 602 protected void register(Database<D> database, DynamicMBean mbean) 603 { 604 try 605 { 606 ObjectName name = DatabaseClusterFactory.getObjectName(this.id, database.getId()); 607 608 this.server.registerMBean(mbean, name); 609 } 610 catch (JMException e) 611 { 612 logger.error(e.toString(), e); 613 614 throw new IllegalStateException(e); 615 } 616 } 617 618 /** 619 * @see net.sf.hajdbc.DatabaseClusterMBean#remove(java.lang.String) 620 */ 621 @Override 622 public void remove(String id) 623 { 624 synchronized (this.databaseMap) 625 { 626 Database<D> database = this.getDatabase(id); 627 628 if (this.balancer.all().contains(database)) 629 { 630 throw new IllegalStateException(Messages.getMessage(Messages.DATABASE_STILL_ACTIVE, id, this)); 631 } 632 633 this.unregister(database); 634 635 this.databaseMap.remove(id); 636 637 this.export(); 638 } 639 } 640 641 private void unregister(Database<D> database) 642 { 643 try 644 { 645 ObjectName name = DatabaseClusterFactory.getObjectName(this.id, database.getId()); 646 647 if (this.server.isRegistered(name)) 648 { 649 this.server.unregisterMBean(name); 650 } 651 } 652 catch (JMException e) 653 { 654 logger.error(e.toString(), e); 655 656 throw new IllegalStateException(e); 657 } 658 } 659 660 /** 661 * @see net.sf.hajdbc.DatabaseCluster#isActive() 662 */ 663 @Override 664 public boolean isActive() 665 { 666 return this.active; 667 } 668 669 /** 670 * @see net.sf.hajdbc.Lifecycle#start() 671 */ 672 public synchronized void start() throws Exception 673 { 674 if (this.active) return; 675 676 this.lockManager.start(); 677 this.stateManager.start(); 678 679 this.executor = new ThreadPoolExecutor(this.minThreads, this.maxThreads, this.maxIdle, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 680 681 Set<String> databaseSet = this.stateManager.getInitialState(); 682 683 if (databaseSet != null) 684 { 685 for (String databaseId: databaseSet) 686 { 687 Database<D> database = this.getDatabase(databaseId); 688 689 if (database != null) 690 { 691 this.activate(database, this.stateManager); 692 } 693 } 694 } 695 else 696 { 697 for (Database<D> database: this.getAliveMap(this.databaseMap.values()).get(true)) 698 { 699 this.activate(database, this.stateManager); 700 } 701 } 702 703 this.databaseMetaDataCache = this.databaseMetaDataCacheFactory.createCache(this); 704 705 try 706 { 707 this.flushMetaDataCache(); 708 } 709 catch (IllegalStateException e) 710 { 711 // Ignore - cache will initialize lazily. 712 } 713 714 if (this.failureDetectionExpression != null) 715 { 716 this.cronExecutor.schedule(new FailureDetectionTask(), this.failureDetectionExpression); 717 } 718 719 if (this.autoActivationExpression != null) 720 { 721 this.cronExecutor.schedule(new AutoActivationTask(), this.autoActivationExpression); 722 } 723 724 this.active = true; 725 } 726 727 /** 728 * @see net.sf.hajdbc.Lifecycle#stop() 729 */ 730 public synchronized void stop() 731 { 732 if (!this.active) return; 733 734 this.active = false; 735 736 this.balancer.clear(); 737 738 this.stateManager.stop(); 739 this.lockManager.stop(); 740 741 this.cronExecutor.shutdownNow(); 742 743 if (this.executor != null) 744 { 745 this.executor.shutdownNow(); 746 } 747 } 748 749 /** 750 * @see net.sf.hajdbc.DatabaseClusterMBean#flushMetaDataCache() 751 */ 752 @Override 753 public void flushMetaDataCache() 754 { 755 try 756 { 757 this.databaseMetaDataCache.flush(); 758 } 759 catch (SQLException e) 760 { 761 throw new IllegalStateException(e.toString(), e); 762 } 763 } 764 765 /** 766 * @see net.sf.hajdbc.DatabaseCluster#isIdentityColumnDetectionEnabled() 767 */ 768 @Override 769 public boolean isIdentityColumnDetectionEnabled() 770 { 771 return this.identityColumnDetectionEnabled; 772 } 773 774 /** 775 * @see net.sf.hajdbc.DatabaseCluster#isSequenceDetectionEnabled() 776 */ 777 @Override 778 public boolean isSequenceDetectionEnabled() 779 { 780 return this.sequenceDetectionEnabled; 781 } 782 783 /** 784 * @see net.sf.hajdbc.DatabaseCluster#isCurrentDateEvaluationEnabled() 785 */ 786 @Override 787 public boolean isCurrentDateEvaluationEnabled() 788 { 789 return this.currentDateEvaluationEnabled; 790 } 791 792 /** 793 * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimeEvaluationEnabled() 794 */ 795 @Override 796 public boolean isCurrentTimeEvaluationEnabled() 797 { 798 return this.currentTimeEvaluationEnabled; 799 } 800 801 /** 802 * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimestampEvaluationEnabled() 803 */ 804 @Override 805 public boolean isCurrentTimestampEvaluationEnabled() 806 { 807 return this.currentTimestampEvaluationEnabled; 808 } 809 810 /** 811 * @see net.sf.hajdbc.DatabaseCluster#isRandEvaluationEnabled() 812 */ 813 @Override 814 public boolean isRandEvaluationEnabled() 815 { 816 return this.randEvaluationEnabled; 817 } 818 819 /** 820 * @see java.lang.Object#toString() 821 */ 822 @Override 823 public String toString() 824 { 825 return this.getId(); 826 } 827 828 /** 829 * @see java.lang.Object#equals(java.lang.Object) 830 */ 831 @SuppressWarnings("unchecked") 832 @Override 833 public boolean equals(Object object) 834 { 835 if ((object == null) || !(object instanceof DatabaseCluster)) return false; 836 837 String id = ((DatabaseCluster) object).getId(); 838 839 return (id != null) && id.equals(this.id); 840 } 841 842 /** 843 * @see java.lang.Object#hashCode() 844 */ 845 @Override 846 public int hashCode() 847 { 848 return this.id.hashCode(); 849 } 850 851 protected DatabaseClusterDecorator getDecorator() 852 { 853 return this.decorator; 854 } 855 856 protected void setDecorator(DatabaseClusterDecorator decorator) 857 { 858 this.decorator = decorator; 859 } 860 861 protected void add(Database<D> database) 862 { 863 String id = database.getId(); 864 865 synchronized (this.databaseMap) 866 { 867 if (this.databaseMap.containsKey(id)) 868 { 869 throw new IllegalArgumentException(Messages.getMessage(Messages.DATABASE_ALREADY_EXISTS, id, this)); 870 } 871 872 this.register(database, database.getInactiveMBean()); 873 874 this.databaseMap.put(id, database); 875 } 876 } 877 878 protected Iterator<Database<D>> getDatabases() 879 { 880 synchronized (this.databaseMap) 881 { 882 return this.databaseMap.values().iterator(); 883 } 884 } 885 886 /** 887 * @see net.sf.hajdbc.DatabaseCluster#getStateManager() 888 */ 889 @Override 890 public StateManager getStateManager() 891 { 892 return this.stateManager; 893 } 894 895 /** 896 * @see net.sf.hajdbc.DatabaseCluster#setStateManager(net.sf.hajdbc.StateManager) 897 */ 898 @Override 899 public void setStateManager(StateManager stateManager) 900 { 901 this.stateManager = stateManager; 902 } 903 904 /** 905 * @see net.sf.hajdbc.DatabaseCluster#setLockManager(net.sf.hajdbc.LockManager) 906 */ 907 @Override 908 public void setLockManager(LockManager lockManager) 909 { 910 this.lockManager = lockManager; 911 } 912 913 /** 914 * @see net.sf.hajdbc.DatabaseClusterMBean#getUrl() 915 */ 916 @Override 917 public URL getUrl() 918 { 919 return this.url; 920 } 921 922 private boolean activate(Database<D> database, SynchronizationStrategy strategy) throws SQLException, InterruptedException 923 { 924 Lock lock = this.lockManager.writeLock(LockManager.GLOBAL); 925 926 lock.lockInterruptibly(); 927 928 try 929 { 930 SynchronizationContext<D> context = new SynchronizationContextImpl<D>(this, database); 931 932 if (context.getActiveDatabaseSet().contains(database)) 933 { 934 return false; 935 } 936 937 this.test(database); 938 939 try 940 { 941 DatabaseEvent event = new DatabaseEvent(database); 942 943 logger.info(Messages.getMessage(Messages.DATABASE_SYNC_START, database, this)); 944 945 for (SynchronizationListener listener: this.synchronizationListenerList) 946 { 947 listener.beforeSynchronization(event); 948 } 949 950 strategy.synchronize(context); 951 952 logger.info(Messages.getMessage(Messages.DATABASE_SYNC_END, database, this)); 953 954 for (SynchronizationListener listener: this.synchronizationListenerList) 955 { 956 listener.afterSynchronization(event); 957 } 958 959 return this.activate(database, this.stateManager); 960 } 961 finally 962 { 963 context.close(); 964 } 965 } 966 catch (NoSuchElementException e) 967 { 968 return this.activate(database, this.stateManager); 969 } 970 finally 971 { 972 lock.unlock(); 973 } 974 } 975 976 /** 977 * @see javax.management.MBeanRegistration#postDeregister() 978 */ 979 @Override 980 public void postDeregister() 981 { 982 this.stop(); 983 984 this.unregisterDatabases(); 985 } 986 987 private void unregisterDatabases() 988 { 989 synchronized (this.databaseMap) 990 { 991 Iterator<Database<D>> databases = this.databaseMap.values().iterator(); 992 993 while (databases.hasNext()) 994 { 995 this.unregister(databases.next()); 996 997 databases.remove(); 998 } 999 } 1000 } 1001 1002 /** 1003 * @see javax.management.MBeanRegistration#postRegister(java.lang.Boolean) 1004 */ 1005 @Override 1006 public void postRegister(Boolean registered) 1007 { 1008 if (!registered) 1009 { 1010 this.postDeregister(); 1011 } 1012 } 1013 1014 /** 1015 * @see javax.management.MBeanRegistration#preDeregister() 1016 */ 1017 @Override 1018 public void preDeregister() throws Exception 1019 { 1020 // Nothing to do 1021 } 1022 1023 /** 1024 * @see javax.management.MBeanRegistration#preRegister(javax.management.MBeanServer, javax.management.ObjectName) 1025 */ 1026 @Override 1027 public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception 1028 { 1029 this.server = server; 1030 1031 InputStream inputStream = null; 1032 1033 logger.info(Messages.getMessage(Messages.HA_JDBC_INIT, this.getVersion(), this.url)); 1034 1035 try 1036 { 1037 inputStream = this.url.openStream(); 1038 1039 IUnmarshallingContext context = BindingDirectory.getFactory(this.getClass()).createUnmarshallingContext(); 1040 1041 context.setDocument(inputStream, null); 1042 1043 context.setUserContext(this); 1044 1045 context.unmarshalElement(); 1046 1047 if (this.decorator != null) 1048 { 1049 this.decorator.decorate(this); 1050 } 1051 1052 this.start(); 1053 1054 return name; 1055 } 1056 catch (IOException e) 1057 { 1058 logger.error(Messages.getMessage(Messages.CONFIG_NOT_FOUND, this.url), e); 1059 1060 throw e; 1061 } 1062 catch (JiBXException e) 1063 { 1064 logger.error(Messages.getMessage(Messages.CONFIG_LOAD_FAILED, this.url), e); 1065 1066 this.unregisterDatabases(); 1067 1068 throw e; 1069 } 1070 catch (Exception e) 1071 { 1072 logger.error(Messages.getMessage(Messages.CLUSTER_START_FAILED, this), e); 1073 1074 this.postDeregister(); 1075 1076 throw e; 1077 } 1078 finally 1079 { 1080 if (inputStream != null) 1081 { 1082 try 1083 { 1084 inputStream.close(); 1085 } 1086 catch (IOException e) 1087 { 1088 logger.warn(e.toString(), e); 1089 } 1090 } 1091 } 1092 } 1093 1094 private void export() 1095 { 1096 File file = null; 1097 WritableByteChannel outputChannel = null; 1098 FileChannel fileChannel = null; 1099 1100 try 1101 { 1102 file = File.createTempFile("ha-jdbc", ".xml"); //$NON-NLS-1$ //$NON-NLS-2$ 1103 1104 IMarshallingContext context = BindingDirectory.getFactory(this.getClass()).createMarshallingContext(); 1105 1106 context.setIndent(1, System.getProperty("line.separator"), '\t'); //$NON-NLS-1$ 1107 1108 // This method closes the writer 1109 context.marshalDocument(this, null, null, new FileWriter(file)); 1110 1111 fileChannel = new FileInputStream(file).getChannel(); 1112 1113 outputChannel = this.getOutputChannel(this.url); 1114 1115 fileChannel.transferTo(0, file.length(), outputChannel); 1116 } 1117 catch (Exception e) 1118 { 1119 logger.warn(Messages.getMessage(Messages.CONFIG_STORE_FAILED, this.url), e); 1120 } 1121 finally 1122 { 1123 if (outputChannel != null) 1124 { 1125 try 1126 { 1127 outputChannel.close(); 1128 } 1129 catch (IOException e) 1130 { 1131 logger.warn(e.getMessage(), e); 1132 } 1133 } 1134 1135 if (fileChannel != null) 1136 { 1137 try 1138 { 1139 fileChannel.close(); 1140 } 1141 catch (IOException e) 1142 { 1143 logger.warn(e.getMessage(), e); 1144 } 1145 } 1146 1147 if (file != null) 1148 { 1149 file.delete(); 1150 } 1151 } 1152 } 1153 1154 /** 1155 * We cannot use URLConnection for files because Sun's implementation does not support output. 1156 */ 1157 private WritableByteChannel getOutputChannel(URL url) throws IOException 1158 { 1159 return this.isFile(url) ? new FileOutputStream(this.toFile(url)).getChannel() : Channels.newChannel(url.openConnection().getOutputStream()); 1160 } 1161 1162 private boolean isFile(URL url) 1163 { 1164 return url.getProtocol().equals("file"); //$NON-NLS-1$ 1165 } 1166 1167 private File toFile(URL url) 1168 { 1169 return new File(url.getPath()); 1170 } 1171 1172 protected void addSynchronizationStrategyBuilder(SynchronizationStrategyBuilder builder) throws Exception 1173 { 1174 this.synchronizationStrategyMap.put(builder.getId(), builder.buildStrategy()); 1175 } 1176 1177 protected Iterator<SynchronizationStrategyBuilder> getSynchronizationStrategyBuilders() throws Exception 1178 { 1179 List<SynchronizationStrategyBuilder> builderList = new ArrayList<SynchronizationStrategyBuilder>(this.synchronizationStrategyMap.size()); 1180 1181 for (Map.Entry<String, SynchronizationStrategy> mapEntry: this.synchronizationStrategyMap.entrySet()) 1182 { 1183 builderList.add(SynchronizationStrategyBuilder.getBuilder(mapEntry.getKey(), mapEntry.getValue())); 1184 } 1185 1186 return builderList.iterator(); 1187 } 1188 1189 /** 1190 * @see net.sf.hajdbc.DatabaseClusterMBean#addActivationListener(net.sf.hajdbc.DatabaseActivationListener) 1191 */ 1192 @Override 1193 public void addActivationListener(DatabaseActivationListener listener) 1194 { 1195 this.activationListenerList.add(listener); 1196 } 1197 1198 /** 1199 * @see net.sf.hajdbc.DatabaseClusterMBean#addDeactivationListener(net.sf.hajdbc.DatabaseDeactivationListener) 1200 */ 1201 @Override 1202 public void addDeactivationListener(DatabaseDeactivationListener listener) 1203 { 1204 this.deactivationListenerList.add(listener); 1205 } 1206 1207 /** 1208 * @see net.sf.hajdbc.DatabaseClusterMBean#addSynchronizationListener(net.sf.hajdbc.SynchronizationListener) 1209 */ 1210 @Override 1211 public void addSynchronizationListener(SynchronizationListener listener) 1212 { 1213 this.synchronizationListenerList.add(listener); 1214 } 1215 1216 /** 1217 * @see net.sf.hajdbc.DatabaseClusterMBean#removeActivationListener(net.sf.hajdbc.DatabaseActivationListener) 1218 */ 1219 @Override 1220 public void removeActivationListener(DatabaseActivationListener listener) 1221 { 1222 this.activationListenerList.remove(listener); 1223 } 1224 1225 /** 1226 * @see net.sf.hajdbc.DatabaseClusterMBean#removeDeactivationListener(net.sf.hajdbc.DatabaseDeactivationListener) 1227 */ 1228 @Override 1229 public void removeDeactivationListener(DatabaseDeactivationListener listener) 1230 { 1231 this.deactivationListenerList.remove(listener); 1232 } 1233 1234 /** 1235 * @see net.sf.hajdbc.DatabaseClusterMBean#removeSynchronizationListener(net.sf.hajdbc.SynchronizationListener) 1236 */ 1237 @Override 1238 public void removeSynchronizationListener(SynchronizationListener listener) 1239 { 1240 this.synchronizationListenerList.remove(listener); 1241 } 1242 1243 class FailureDetectionTask implements Runnable 1244 { 1245 /** 1246 * @see java.lang.Runnable#run() 1247 */ 1248 @Override 1249 public void run() 1250 { 1251 Set<Database<D>> databaseSet = AbstractDatabaseCluster.this.getBalancer().all(); 1252 1253 if (databaseSet.size() > 1) 1254 { 1255 Map<Boolean, List<Database<D>>> aliveMap = AbstractDatabaseCluster.this.getAliveMap(databaseSet); 1256 1257 // Deactivate the dead databases, so long as at least one is alive 1258 // Skip deactivation if membership is empty in case of cluster panic 1259 if (!aliveMap.get(true).isEmpty() && !AbstractDatabaseCluster.this.getStateManager().isMembershipEmpty()) 1260 { 1261 for (Database<D> database: aliveMap.get(false)) 1262 { 1263 if (AbstractDatabaseCluster.this.deactivate(database, AbstractDatabaseCluster.this.getStateManager())) 1264 { 1265 logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this)); 1266 } 1267 } 1268 } 1269 } 1270 } 1271 } 1272 1273 class AutoActivationTask implements Runnable 1274 { 1275 /** 1276 * @see java.lang.Runnable#run() 1277 */ 1278 @Override 1279 public void run() 1280 { 1281 for (String databaseId: AbstractDatabaseCluster.this.getInactiveDatabases()) 1282 { 1283 AbstractDatabaseCluster.this.activate(databaseId); 1284 } 1285 } 1286 } 1287}