Alps  1.5.7
AlpsKnowledgeBrokerMPI.h
Go to the documentation of this file.
1 /*===========================================================================*
2  * This file is part of the Abstract Library for Parallel Search (ALPS). *
3  * *
4  * ALPS is distributed under the Eclipse Public License as part of the *
5  * COIN-OR repository (http://www.coin-or.org). *
6  * *
7  * Authors: *
8  * *
9  * Yan Xu, Lehigh University *
10  * Ted Ralphs, Lehigh University *
11  * *
12  * Conceptual Design: *
13  * *
14  * Yan Xu, Lehigh University *
15  * Ted Ralphs, Lehigh University *
16  * Laszlo Ladanyi, IBM T.J. Watson Research Center *
17  * Matthew Saltzman, Clemson University *
18  * *
19  * *
20  * Copyright (C) 2001-2019, Lehigh University, Yan Xu, and Ted Ralphs. *
21  *===========================================================================*/
22 
23 #ifndef AlpsKnowledgeBrokerMPI_h_
24 #define AlpsKnowledgeBrokerMPI_h_
25 
26 #include <cmath>
27 #include <iosfwd>
28 
29 #undef SEEK_SET
30 #undef SEEK_END
31 #undef SEEK_CUR
32 #include "mpi.h"
33 
34 #include "AlpsEnumProcessT.h"
35 #include "AlpsKnowledge.h"
36 #include "AlpsKnowledgeBroker.h"
37 #include "AlpsParams.h"
38 
39 //#############################################################################
40 
42 
43  private:
47 
48  protected:
49 
56 
58  int hubNum_;
59 
62 
64  MPI_Comm clusterComm_;
65 
67  MPI_Comm hubComm_;
68 
70  MPI_Group hubGroup_;
71 
74 
77 
80 
82  int* hubRanks_;
83 
86 
89 
92 
95 
97  bool hubWork_;
98 
100  MPI_Request subTreeRequest_;
101 
103  MPI_Request solRequestL_;
104  MPI_Request solRequestR_;
105 
107  MPI_Request modelKnowRequestL_;
108  MPI_Request modelKnowRequestR_;
109 
111  MPI_Request forwardRequestL_;
112  MPI_Request forwardRequestR_;
114 
121 
124 
129 
135  double workQuality_;
136 
139 
142 
145 
149 
152 
155 
158 
161 
164 
168 
171 
174 
177 
180 
183 
186 
189 
193 
200 
203 
207 
211 
214 
218 
225 
232 
235 
238 
240  double rampUpTime_;
241 
244 
246  double idleTime_;
247 
249  double msgTime_;
250 
254 
257 
260 
263 
266 
269 
272 
275 
278 
281 
285 
289 
292 
295 
298 
301 
304 
305  protected:
306 
308  void init();
309 
317 
320  void hubMain();
321 
324  void workerMain();
326 
328  // The same subtree will be explored next time if it still have
329  // unexplored nodes.
331  double unitTime,
332  AlpsExitStatus & exitStatus,
333  int & numNodesProcessed,
334  int & numNodesBranched,
335  int & numNodesDiscarded,
336  int & numNodesPartial,
337  int & depth,
338  bool & betterSolution);
339 
341  void processMessages(char *&buffer,
342  MPI_Status &status,
343  MPI_Request &request);
344 
347  void rootInitHub();
349 
352  void spiralHub();
353  void spiralWorker();
354 
355  //------------------------------------------------------
356 
361  void masterAskHubDonate(int donorID,
362  int receiverID,
363  double receiverWorkload);
364 
366  void hubAskWorkerDonate(int donorID,
367  int receiverID,
368  double receiverWorkload);
369 
372 
373  virtual int getNumNodeLeftSystem()
374  { return static_cast<int>(systemWorkQuantity_); }
375 
377  void donateWork(char*& buf,
378  int tag,
379  MPI_Status* status,
380  int recvID = -1,
381  double recvWL = 0.0);
382 
384  void hubAllocateDonation(char*& buf, MPI_Status* status);
385 
388 
390  void hubSatisfyWorkerRequest(char*& buf, MPI_Status* status);
391 
393  // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
394  void hubReportStatus(int tag, MPI_Comm comm);
395 
397  // NOTE: comm is clusterComm_ or MPI_COMM_WORLD.
398  void hubUpdateCluStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
399 
401  void hubsShareWork(char*& buf, MPI_Status* status);
402 
405 
407  // NOTE: comm is hubComm or MPI_COMM_WORLD.
408  void masterUpdateSysStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
409 
412 
415 
417  // NOTE: comm is clusterComm_ or MPI_COMM_WORLD.
418  void workerReportStatus(int tag, MPI_Comm comm);
420 
421  //------------------------------------------------------
422 
429 
431  void workerRecvIndices(char *&bufLarge);
432 
434  void masterSendIndices(char *&bufLarge);
436 
437  //------------------------------------------------------
438 
444  void broadcastModel(const int id, const int source);
445 
448 
451  bool unpackSetIncumbent(char*& buf, MPI_Status* status);
452 
454  void collectBestSolution(int destination);
455 
459 
462  // Not used
463  void tellHubRecv();
464 
470  char*& buf,
471  int& size,
472  int& position,
473  MPI_Comm comm);
474 
477  int& position,
478  MPI_Comm comm,
479  int size = -1);
480 
483  // NOTE: comm is hubComm_ or clusterComm_
484  void receiveSizeBuf(char*& buf,
485  int sender,
486  int tag,
487  MPI_Comm comm,
488  MPI_Status* status);
489 
492  // NOTE: comm is hubComm_ or clusterComm_
493  void receiveRampUpNode(int sender,
494  MPI_Comm comm,
495  MPI_Status* status);
496 
499  void receiveSubTree(char*& buf, int sender, MPI_Status* status);
500 
502  // NOTE: comm is hubComm_ or clusterComm_.
503  void sendSizeBuf(char*& buf,
504  int size,
505  int position,
506  const int target,
507  const int tag,
508  MPI_Comm comm);
509 
512  // NOTE: comm is hubComm_ or clusterComm_.
513  void sendRampUpNode(const int target, MPI_Comm comm);
514 
517  void sendNodeModelGen(int receiver, int doUnitWork);
518 
520  bool sendSubTree(const int target, AlpsSubTree*& st, int tag);
521 
523  // NOTE: comm is hubComm_ or clusterComm_.
524  void sendFinishInit(const int target, MPI_Comm comm);
526 
529 
530 
532 
534  // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
535  void sendModelKnowledge(MPI_Comm comm, int receiver=-1);
536 
538  // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
539  void receiveModelKnowledge(MPI_Comm comm);
540 
545  void incSendCount(const char* how, int s = 1);
547  void decSendCount(const char* how, int s = 1);
549  void incRecvCount(const char* how, int s = 1);
551  void decRecvCount(const char* how, int s = 1);
553 
556 
559 
561  void changeWorkingSubTree(double & changeWorkThreshold);
562 
564  void sendErrorCodeToMaster(int errorCode);
565 
567  void recvErrorCode(char *& bufLarge);
568 
571 
574 
575  public:
576 
580  :
582  {
583  init();
584  }
585 
588  char* argv[],
589  AlpsModel& model)
590  :
592  {
593  init();
594  initializeSearch(argc, argv, model);
595  }
596 
599 
601  virtual int getProcRank() const { return globalRank_; }
602 
604  virtual int getMasterRank() const { return masterRank_; }
605 
607  virtual AlpsProcessType getProcType() const { return processType_; }
608 
621  void initializeSearch(int argc, char* argv[], AlpsModel& model);
622 
624  void search(AlpsModel *model);
625 
634 
638  virtual double getIncumbentValue() const {
639  double bestObj = ALPS_OBJ_MAX;
642  if (incumbentValue_ > bestObj) {
643  return bestObj;
644  }
645  }
646  return incumbentValue_;
647  }
648 
650  virtual double getBestQuality() const {
651  if (globalRank_ == masterRank_) {
654  }
655  else {
656  return ALPS_OBJ_MAX;
657  }
658  }
659  else {
660  return ALPS_OBJ_MAX;
661  }
662  }
663 
665  virtual double getBestEstimateQuality() { return systemWorkQuality_; }
666 
668  virtual void printBestSolution(char* outputFile = 0) const;
669 
671  virtual void searchLog();
673 
674  //------------------------------------------------------
675 
681  int sender,
682  int receiver,
683  char *& msgBuffer,
684  int msgSize,
685  int msgTag,
686  MPI_Comm comm,
687  bool blocking);
688 
691  int sender,
692  int receiver,
693  char *& msgBuffer,
694  int msgSize,
695  int msgTag,
696  MPI_Comm comm,
697  MPI_Status* status,
698  bool blocking);
699 
702  int sender,
703  int receiver,
704  char *& msgBuffer,
705  int msgSize,
706  int msgTag,
707  MPI_Comm comm,
708  bool blocking);
710 
711 };
712 #endif
713 
714 //#############################################################################
AlpsProcessType
This enumerative constant describes the various process types.
AlpsReturnStatus
Definition: Alps.h:118
AlpsKnowledgeType
Definition: Alps.h:88
@ AlpsKnowledgeTypeSolution
Definition: Alps.h:92
AlpsExitStatus
Definition: Alps.h:101
#define ALPS_OBJ_MAX
Definition: Alps.h:145
This data structure is to contain the packed form of an encodable knowledge.
Definition: AlpsEncoded.h:25
int clusterSize_
The actual size of the cluster to which the process belongs.
double clusterWorkQuantity_
The workload quantity of the cluster to which the process belongs.
int incumbentID_
The process id that store the incumbent.
void masterForceHubTerm()
Master tell hubs to terminate due to reaching limits or other reason.
void hubAllocateDonation(char *&buf, MPI_Status *status)
Hub allocates the donated workload to its workers.
int clusterRank_
The local rank of the process in clusterComm_.
void incSendCount(const char *how, int s=1)
Increment the number of sent message.
bool blockTermCheck_
Indicate whether do termination check.
AlpsProcessType * processTypeList_
The AlpsProcessType of all process.
bool updateIncumbent_
Indicate whether the incumbent value is updated between two checking point.
AlpsKnowledgeBrokerMPI(int argc, char *argv[], AlpsModel &model)
Useful construtor.
virtual int getProcRank() const
Query the global rank of the process.
double workQuantity_
The workload quantity of the workload on the process.
void masterMain(AlpsTreeNode *root)
Master generates subtrees and sends them to hubs in Round-Robin way.
double * workerWorkQuantities_
The workload quantities of workers in the cluster to which this proces belongs.
void rootSearch(AlpsTreeNode *root)
This function.
virtual double getBestQuality() const
The master queries the quality of the best solution it knowns.
void incRecvCount(const char *how, int s=1)
Increment the number of received message.
bool * workerReported_
Indicate which worker has been reported its work.
bool blockWorkerReport_
Indicate whether a worker need to report state to its hub.
double msgTime_
The time spent processing messages (include idle).
int sendCount_
The number of new messages sent by the process after last survey.
double systemWorkQuantityForce_
The workload quantity of the whole system before forcing termination.
void receiveRampUpNode(int sender, MPI_Comm comm, MPI_Status *status)
First receive the size and the contend of a node, then construct a subtree with this received node.
int haltSearch_
Temporily halt search.
bool blockHubReport_
Indicate whether a hub need to report state to master.
void sendIncumbent()
Sent the incumbent value and rank to its two child if eixt.
int hubNum_
The Number of hubs.
bool * hubReported_
Indicate which hub has been reported its work.
int * hubNodeProcesseds_
To record how many nodes processed for each hub.
void receiveSubTree(char *&buf, int sender, MPI_Status *status)
Receive a subtree from the sender process and add it into the subtree pool.
void deleteSubTrees()
Delete subTrees in pools and the active subtree.
MPI_Request subTreeRequest_
Send subtree request.
int masterDoBalance_
Whether master do load balance.
void search(AlpsModel *model)
Search best solution for a given model.
void workerRecvIndices(char *&bufLarge)
A worker receive node index from master.
int myHubRank_
The global rank of its hub for a worker.
void sendFinishInit(const int target, MPI_Comm comm)
Send finish initialization signal to the target process.
MPI_Comm clusterComm_
Communicator of the cluster to which the process belongs.
void packEncoded(AlpsEncoded *enc, char *&buf, int &size, int &position, MPI_Comm comm)
Pack an AlpsEncoded instance into buf.
void rootInitMaster(AlpsTreeNode *root)
Static load balancing: Root Initialization.
double systemWorkQuality_
The workload quality of the whole system.
AlpsSubTree * rampUpSubTree_
A subtree used in during up.
int recvCount_
The number of new messages received by the process after last survey.
void spiralDonateNode()
Unpack msg and donate a node.
int systemSendCount_
The total number of messages sent by the all processes.
void masterSendIndices(char *&bufLarge)
Master send a batch of node indices to the receiving worker.
MPI_Request forwardRequestL_
Forward model knoledge request.
MPI_Group hubGroup_
MPI_Group consists of all hubs.
MPI_Request modelKnowRequestL_
Send model knoledge request.
AlpsProcessType processType_
The AlpsProcessType of this process.
int modelGenPos_
Size of the shared knowledge.
int * hubRanks_
The global ranks of the hubs.
AlpsReturnStatus doOneUnitWork(int unitWork, double unitTime, AlpsExitStatus &exitStatus, int &numNodesProcessed, int &numNodesBranched, int &numNodesDiscarded, int &numNodesPartial, int &depth, bool &betterSolution)
Explore a subtree from subtree pool for certain units of work/time.
~AlpsKnowledgeBrokerMPI()
Destructor.
virtual double getIncumbentValue() const
The process queries the quality of the incumbent this process stores.
AlpsTimer masterTimer_
Master timer.
char * largeBuffer2_
Large message buffer.
int clusterNodeProcessed_
To record how many nodes by a cluster.
void spiralRecvProcessNode()
Unpack the node, explore it and send load info to master.
void refreshSysStatus()
The master re-calculate the system status.
int unitWorkNodes_
Number of nodes in one unit of work.
AlpsTimer workerTimer_
Worker timer.
double * workerWorkQualities_
The workload qualities of workers in the cluster to which this proces belongs.
void hubReportStatus(int tag, MPI_Comm comm)
A hub reports its status (workload and msg counts) to the master.
bool allHubReported_
Indicate whether all hubs have reported status to master at least once.
void receiveSizeBuf(char *&buf, int sender, int tag, MPI_Comm comm, MPI_Status *status)
Receive the size of buffer, allocate memory for buffer, then receive the message and put it in buffer...
MPI_Comm hubComm_
Communicator consists of all hubs.
AlpsKnowledgeBrokerMPI()
Default construtor.
void decSendCount(const char *how, int s=1)
Decrement the number of sent message.
bool unpackSetIncumbent(char *&buf, MPI_Status *status)
unpack the incumbent value, then store it and the id of the process having the incumbent in AlpsDataP...
void hubForceWorkerTerm()
Hub tell workers to terminate due to reaching limits or other reason.
void masterBalanceHubs()
Master balance the workload of hubs.
void workerAskIndices()
A worker ask for node index from master.
void updateWorkloadInfo()
Calculate the work quality and quantity on this process.
double * hubWorkQuantities_
The workload quantities of all clusters/hubs.
virtual int getMasterRank() const
Query the global rank of the Master.
double incumbentValue_
Incumbent value.
void donateWork(char *&buf, int tag, MPI_Status *status, int recvID=-1, double recvWL=0.0)
A worker donate its workload to the specified worker.
int processNum_
The Number of processes launched.
AlpsTimer hubTimer_
Hub timer.
void broadcastModel(const int id, const int source)
Broadcast the model from source to other processes.
int userClusterSize_
The user reqested size of a cluster.
bool sendSubTree(const int target, AlpsSubTree *&st, int tag)
Send a given subtree to the target process.
int hubDoBalance_
Whether a hub do load balance.
char * attachBuffer_
Buffer attached to MPI when sharing generated knowledge.
int modelGenID_
The global rank of the process that share generated model knowledge.
void masterUpdateSysStatus(char *&buf, MPI_Status *status, MPI_Comm comm)
Master unpack the status of a hub from buf and update system status.
double rampUpTime_
The time spent in ramp up.
double workQuality_
The workload quality of the process.
void refreshClusterStatus()
A hub adds its status to the cluster's status.
void changeWorkingSubTree(double &changeWorkThreshold)
Change subtree to be explored if it is too worse.
void sendModelKnowledge(MPI_Comm comm, int receiver=-1)
Set generated knowlege (related to model) to receiver.
void sendSizeBuf(char *&buf, int size, int position, const int target, const int tag, MPI_Comm comm)
Send the size and content of a buffer to the target process.
void workerMain()
Worker first receive subtrees, then start to explore them.
void sendRampUpNode(const int target, MPI_Comm comm)
Send the size and the content of the best node of a given subtree to the target process.
virtual AlpsProcessType getProcType() const
Query the type (master, hub, or worker) of the process.
virtual void searchLog()
Log search statistics.
void sendKnowledge(AlpsKnowledgeType type, int sender, int receiver, char *&msgBuffer, int msgSize, int msgTag, MPI_Comm comm, bool blocking)
Set knowlege.
void hubsShareWork(char *&buf, MPI_Status *status)
Two hubs share their workload.
int clusterSendCount_
The number of new messages sent by the processes in clusterComm_ after last survey.
AlpsPsStats psStats_
More statistics.
double idleTime_
The time spent waiting for work.
void recvErrorCode(char *&bufLarge)
Receive error code and set solution status.
double clusterWorkQuality_
The workload quality of the cluster to which the process belong.
void receiveModelKnowledge(MPI_Comm comm)
Receive generated knowlege (related to model) from sender.
int globalRank_
The rank of the process in MPI_COMM_WORLD.
void spiralMaster(AlpsTreeNode *root)
Static load balancing: spiral.
char * largeBuffer_
Large message buffer.
void initializeSearch(int argc, char *argv[], AlpsModel &model)
This function.
double rampDownTime_
The time spent in ramp down.
void hubUpdateCluStatus(char *&buf, MPI_Status *status, MPI_Comm comm)
A hub unpacks the status of a worker from buffer.
virtual double getBestEstimateQuality()
Get best estimalted quality in system.
void masterAskHubDonate(int donorID, int receiverID, double receiverWorkload)
Master asks a hub to donate its workload to another hub.
AlpsEncoded * unpackEncoded(char *&buf, int &position, MPI_Comm comm, int size=-1)
Unpack the given buffer into an AlpsEncoded instance.
double * hubWorkQualities_
The workload qualities of hubs.
double masterBalancePeriod_
The period that master do load balancing.
char * smallBuffer_
Small message buffer.
bool hubWork_
Whether hub should also work as a worker.
int clusterRecvCount_
The number of new messages received by the processes in clusterComm_ after last survey.
int systemRecvCount_
The total number of messages sent by the all processes.
double systemWorkQuantity_
The workload quantity of the whole system.
virtual void printBestSolution(char *outputFile=0) const
Master prints out the best solution that it knows.
void hubBalanceWorkers()
Hub balances the workloads of its workers.
void sendErrorCodeToMaster(int errorCode)
Send error code to master.
void processMessages(char *&buffer, MPI_Status &status, MPI_Request &request)
Processing messages.
void receiveKnowledge(AlpsKnowledgeType type, int sender, int receiver, char *&msgBuffer, int msgSize, int msgTag, MPI_Comm comm, MPI_Status *status, bool blocking)
Receive knowlege.
void hubSatisfyWorkerRequest(char *&buf, MPI_Status *status)
Hub satisfies the workload rquest from a worker.
void tellMasterRecv()
Inform master that a proc has received workload during a load balance initialized by master.
double hubReportPeriod_
The period that a hub load balancing and report cluster status.
void requestKnowledge(AlpsKnowledgeType type, int sender, int receiver, char *&msgBuffer, int msgSize, int msgTag, MPI_Comm comm, bool blocking)
Request knowlege.
void collectBestSolution(int destination)
Send the best solution from the process having it to destination.
void tellHubRecv()
Inform hub that a proc has received workload during a load balance initialized by a hub.
void sendNodeModelGen(int receiver, int doUnitWork)
Send a node from rampUpSubTree's node pool and generated model knowledge.
bool forceTerminate_
Terminate due to reaching limits (time and node) or other reason.
int masterRank_
The global rank of the master.
void init()
Initialize member data.
bool blockAskForWork_
Indicate whether a worker need to as for work from its hub.
void hubMain()
Hub generates subtrees and sends them to workers in Round-Robin way.
void decRecvCount(const char *how, int s=1)
Decrement the number of sent message.
MPI_Request solRequestL_
Send model knoledge request.
void hubAskWorkerDonate(int donorID, int receiverID, double receiverWorkload)
Hub asks a worker to donate its workload to another worker.
void workerReportStatus(int tag, MPI_Comm comm)
A worker report its status (workload and msg counts) to its hub.
int * workerNodeProcesseds_
To record how many nodes processed for each worker in a cluster.
The base class of knowledge broker class.
virtual bool hasKnowledge(AlpsKnowledgeType kt) const
Query whether there are knowledges in the given type of knowledge pools.
virtual int getNumKnowledges(AlpsKnowledgeType kt) const
Query the number of knowledge in the given type of a knowledge pool.
virtual std::pair< AlpsKnowledge *, double > getBestKnowledge(AlpsKnowledgeType kt) const
Get the best knowledge in the given type of knowledge pools.
This class contains the data pertaining to a particular subtree in the search tree.
Definition: AlpsSubTree.h:47
This class holds one node of the search tree.
Definition: AlpsTreeNode.h:50