dmlite  0.4
poolcontainer.h
Go to the documentation of this file.
1 /// @file include/dmlite/cpp/utils/poolcontainer.h
2 /// @brief Pooling
3 /// @author Alejandro Álvarez Ayllón <aalvarez@cern.ch>
4 #ifndef DMLITE_CPP_UTILS_POOLCONTAINER_H
5 #define DMLITE_CPP_UTILS_POOLCONTAINER_H
6 
7 #include <errno.h>
8 #include <map>
9 #include <pthread.h>
10 #include <semaphore.h>
11 #include <syslog.h>
12 #include <queue>
13 #include "../exceptions.h"
14 
15 namespace dmlite {
16 
17  /// Classes implementing this interface creates the actual element
18  /// since the pool is agnosstic
19  template <class E>
21  public:
22  /// Destructor
23  virtual ~PoolElementFactory() {};
24 
25  /// Creates an element
26  virtual E create() = 0;
27 
28  /// Destroys an element
29  virtual void destroy(E) = 0;
30 
31  /// Check it is still valid
32  virtual bool isValid(E) = 0;
33  };
34 
35 
36  /// Implements a pool of whichever resource
37  template <class E>
38  class PoolContainer {
39  public:
40  /// Constructor
41  /// @param factory The factory to use when spawning a new resource.
42  /// @param n The number of resources to keep.
43  PoolContainer(PoolElementFactory<E>* factory, int n): max_(n), factory_(factory)
44  {
45  pthread_mutex_init(&mutex_, NULL);
46  sem_init(&available_, 0, n);
47  }
48 
49  /// Destructor
51  {
52  // Free 'free'
53  while (free_.size() > 0) {
54  E e = free_.front();
55  free_.pop();
56  factory_->destroy(e);
57  }
58  // Freeing used is dangerous, as we might block if the client code
59  // forgot about something. Assume the memory leak :(
60  if (used_.size() > 0) {
61  syslog(LOG_USER | LOG_WARNING, "%ld used elements from a pool not released on destruction!", (long)used_.size());
62  }
63  // Destroy locks
64  pthread_mutex_destroy(&mutex_);
65  sem_destroy(&available_);
66  }
67 
68  /// Acquires a free resource.
69  E acquire(bool block = true)
70  {
71  E e;
72  // Wait for one free
73  if (!block) {
74  int v;
75  sem_getvalue(&available_, &v);
76  if (v <= 0)
77  throw DmException(DM_RESOURCE_UNAVAILABLE, std::string("No resources available"));
78  }
79  sem_wait(&available_);
80  // Critical section
81  pthread_mutex_lock(&mutex_);
82  // If there is any in the queue, give one from there
83  if (free_.size() > 0) {
84  e = free_.front();
85  free_.pop();
86  // May have expired!
87  if (!factory_->isValid(e)) {
88  factory_->destroy(e);
89  e = factory_->create();
90  }
91  }
92  else {
93  // None created, so create it now
94  e = factory_->create();
95  }
96  // Keep track of used
97  used_.insert(std::pair<E, unsigned>(e, 1));
98  // End of critical section
99  pthread_mutex_unlock(&mutex_);
100  return e;
101  }
102 
103  /// Increases the reference count of a resource.
104  E acquire(E e)
105  {
106  // Critical section
107  pthread_mutex_lock(&mutex_);
108 
109  // Make sure it is there
110  typename std::map<E, unsigned>::const_iterator i = used_.find(e);
111  if (i == used_.end())
112  throw DmException(DM_INVALID_VALUE, std::string("The resource has not been locked previously!"));
113 
114  // Increase
115  used_[e]++;
116 
117  // End
118  pthread_mutex_unlock(&mutex_);
119  return e;
120  }
121 
122  /// Releases a resource
123  /// @param e The resource to release.
124  /// @return The reference count after releasing.
125  unsigned release(E e)
126  {
127  // Critical section
128  pthread_mutex_lock(&mutex_);
129  // Decrease reference count
130  unsigned remaining = --used_[e];
131  // No one else using it (hopefully...)
132  if (used_[e] == 0) {
133  // Remove from used
134  used_.erase(e);
135  // If the free size is less than the maximum, push to free and notify
136  if ((long)free_.size() < max_) {
137  free_.push(e);
138  sem_post(&available_);
139  }
140  else {
141  // If we are fine, destroy
142  factory_->destroy(e);
143  }
144  }
145  // End of critical section
146  pthread_mutex_unlock(&mutex_);
147 
148  return remaining;
149  }
150 
151  /// Count the number of instances
152  unsigned refCount(E e)
153  {
154  typename std::map<E, unsigned>::const_iterator i = used_.find(e);
155  if (i == used_.end())
156  return 0;
157  return used_[e];
158  }
159 
160  /// Change the pool size
161  /// @param ns The new size.
162  void resize(int ns)
163  {
164  int total, sv;
165  // The resizing will be done as we get requests
166  pthread_mutex_lock(&mutex_);
167  max_ = ns;
168  // Reduce the semaphore size if needed
169  // Do NOT take into account used, as it will auto-regulate as they free
170  sem_getvalue(&available_, &sv);
171  while (sv > max_) {
172  sem_wait(&available_);
173  --sv;
174  }
175  // Increment the semaphore size if needed
176  // Take into account the used
177  total = sv + used_.size();
178  while (total < max_) {
179  sem_post(&available_);
180  ++total;
181  }
182  // End of critical
183  pthread_mutex_unlock(&mutex_);
184  }
185 
186  private:
187  int max_;
188 
190 
191  std::queue<E> free_;
192  std::map<E, unsigned> used_;
193 
194  pthread_mutex_t mutex_;
195  sem_t available_;
196  };
197 
198 };
199 
200 #endif // DMLITE_CPP_UTILS_POOLCONTAINER_H