001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.sql;
022
023import java.sql.SQLException;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.SortedMap;
028import java.util.TreeMap;
029import java.util.concurrent.Callable;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Future;
033
034import net.sf.hajdbc.Database;
035import net.sf.hajdbc.DatabaseCluster;
036import net.sf.hajdbc.Messages;
037
038/**
039 * @author Paul Ferraro
040 * @param <D> 
041 * @param <T> 
042 * @param <R> 
043 */
044public class DatabaseWriteInvocationStrategy<D, T, R> implements InvocationStrategy<D, T, R>
045{
046        private ExecutorService executor;
047        
048        /**
049         * @param executor
050         */
051        public DatabaseWriteInvocationStrategy(ExecutorService executor)
052        {
053                this.executor = executor;
054        }
055        
056        /**
057         * @see net.sf.hajdbc.sql.InvocationStrategy#invoke(net.sf.hajdbc.sql.SQLProxy, net.sf.hajdbc.sql.Invoker)
058         */
059        @Override
060        public R invoke(SQLProxy<D, T> proxy, Invoker<D, T, R> invoker) throws Exception
061        {
062                SortedMap<Database<D>, R> map = this.invokeAll(proxy, invoker);
063                
064                return map.get(map.firstKey());
065        }
066        
067        protected SortedMap<Database<D>, R> invokeAll(SQLProxy<D, T> proxy, final Invoker<D, T, R> invoker) throws Exception
068        {
069                SortedMap<Database<D>, R> resultMap = new TreeMap<Database<D>, R>();
070                SortedMap<Database<D>, Exception> exceptionMap = new TreeMap<Database<D>, Exception>();
071                Map<Database<D>, Future<R>> futureMap = new HashMap<Database<D>, Future<R>>();
072
073                DatabaseCluster<D> cluster = proxy.getDatabaseCluster();
074                
075                Set<Database<D>> databaseSet = cluster.getBalancer().all();
076                
077                proxy.getRoot().retain(databaseSet);
078                
079                if (databaseSet.isEmpty())
080                {
081                        throw new SQLException(Messages.getMessage(Messages.NO_ACTIVE_DATABASES, cluster));
082                }
083                
084                for (final Database<D> database: databaseSet)
085                {
086                        final T object = proxy.getObject(database);
087                        
088                        Callable<R> task = new Callable<R>()
089                        {
090                                public R call() throws Exception
091                                {
092                                        return invoker.invoke(database, object);
093                                }
094                        };
095
096                        futureMap.put(database, this.executor.submit(task));
097                }
098
099                for (Map.Entry<Database<D>, Future<R>> futureMapEntry: futureMap.entrySet())
100                {
101                        Database<D> database = futureMapEntry.getKey();
102                        
103                        try
104                        {
105                                resultMap.put(database, futureMapEntry.getValue().get());
106                        }
107                        catch (ExecutionException e)
108                        {
109                                Throwable cause = e.getCause();
110                                
111                                Exception exception = (cause instanceof Exception) ? (Exception) cause : e;
112                                
113                                exceptionMap.put(database, exception);
114                        }
115                        catch (InterruptedException e)
116                        {
117                                Thread.currentThread().interrupt();
118                                
119                                exceptionMap.put(database, e);
120                        }
121                }
122                
123                // If no databases returned successfully, return an exception back to the caller
124                if (resultMap.isEmpty())
125                {
126                        proxy.handleFailures(exceptionMap);
127                }
128                
129                // If any databases failed, while others succeeded, handle the failures
130                return exceptionMap.isEmpty() ? resultMap : proxy.handlePartialFailure(resultMap, exceptionMap);
131        }
132}