00001 #ifndef _qpid_agent_ManagementAgentImpl_
00002 #define _qpid_agent_ManagementAgentImpl_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "ManagementAgent.h"
00024 #include "qpid/client/Connection.h"
00025 #include "qpid/client/SubscriptionManager.h"
00026 #include "qpid/client/Session.h"
00027 #include "qpid/client/AsyncSession.h"
00028 #include "qpid/client/Message.h"
00029 #include "qpid/client/MessageListener.h"
00030 #include "qpid/sys/Thread.h"
00031 #include "qpid/sys/Runnable.h"
00032 #include "qpid/sys/Mutex.h"
00033 #include "qpid/framing/Uuid.h"
00034 #include <iostream>
00035 #include <sstream>
00036 #include <deque>
00037
00038 namespace qpid {
00039 namespace management {
00040
00041 class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
00042 {
00043 public:
00044
00045 ManagementAgentImpl();
00046 virtual ~ManagementAgentImpl() {};
00047
00048
00049
00050
00051 int getMaxThreads() { return 1; }
00052 void init(std::string brokerHost = "localhost",
00053 uint16_t brokerPort = 5672,
00054 uint16_t intervalSeconds = 10,
00055 bool useExternalThread = false,
00056 std::string storeFile = "");
00057 bool isConnected() { return connected; }
00058 std::string& getLastFailure() { return lastFailure; }
00059 void registerClass(std::string& packageName,
00060 std::string& className,
00061 uint8_t* md5Sum,
00062 management::ManagementObject::writeSchemaCall_t schemaCall);
00063 void registerEvent(std::string& packageName,
00064 std::string& eventName,
00065 uint8_t* md5Sum,
00066 management::ManagementObject::writeSchemaCall_t schemaCall);
00067 ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
00068 void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
00069 uint32_t pollCallbacks(uint32_t callLimit = 0);
00070 int getSignalFd();
00071
00072 uint16_t getInterval() { return interval; }
00073 void periodicProcessing();
00074
00075 private:
00076
00077 struct SchemaClassKey {
00078 std::string name;
00079 uint8_t hash[16];
00080 };
00081
00082 struct SchemaClassKeyComp {
00083 bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
00084 {
00085 if (lhs.name != rhs.name)
00086 return lhs.name < rhs.name;
00087 else
00088 for (int i = 0; i < 16; i++)
00089 if (lhs.hash[i] != rhs.hash[i])
00090 return lhs.hash[i] < rhs.hash[i];
00091 return false;
00092 }
00093 };
00094
00095 struct SchemaClass {
00096 management::ManagementObject::writeSchemaCall_t writeSchemaCall;
00097 uint8_t kind;
00098
00099 SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
00100 const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
00101 };
00102
00103 struct QueuedMethod {
00104 QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
00105 sequence(_seq), replyTo(_reply), body(_body) {}
00106
00107 uint32_t sequence;
00108 std::string replyTo;
00109 std::string body;
00110 };
00111
00112 typedef std::deque<QueuedMethod*> MethodQueue;
00113 typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
00114 typedef std::map<std::string, ClassMap> PackageMap;
00115
00116 PackageMap packages;
00117 AgentAttachment attachment;
00118 management::ManagementObjectMap managementObjects;
00119 management::ManagementObjectMap newManagementObjects;
00120 MethodQueue methodQueue;
00121
00122 void received (client::Message& msg);
00123
00124 uint16_t interval;
00125 bool extThread;
00126 int writeFd;
00127 int readFd;
00128 uint64_t nextObjectId;
00129 std::string storeFile;
00130 sys::Mutex agentLock;
00131 sys::Mutex addLock;
00132 framing::Uuid systemId;
00133 std::string host;
00134 uint16_t port;
00135 bool connected;
00136 std::string lastFailure;
00137
00138 bool clientWasAdded;
00139 uint32_t requestedBrokerBank;
00140 uint32_t requestedAgentBank;
00141 uint32_t assignedBrokerBank;
00142 uint32_t assignedAgentBank;
00143 uint16_t bootSequence;
00144
00145 # define MA_BUFFER_SIZE 65536
00146 char outputBuffer[MA_BUFFER_SIZE];
00147 char eventBuffer[MA_BUFFER_SIZE];
00148
00149 friend class ConnectionThread;
00150 class ConnectionThread : public sys::Runnable
00151 {
00152 bool operational;
00153 ManagementAgentImpl& agent;
00154 framing::Uuid sessionId;
00155 client::Connection connection;
00156 client::Session session;
00157 client::SubscriptionManager* subscriptions;
00158 std::stringstream queueName;
00159 sys::Mutex connLock;
00160 void run();
00161 public:
00162 ConnectionThread(ManagementAgentImpl& _agent) :
00163 operational(false), agent(_agent), subscriptions(0) {}
00164 ~ConnectionThread();
00165 void sendBuffer(qpid::framing::Buffer& buf,
00166 uint32_t length,
00167 const std::string& exchange,
00168 const std::string& routingKey);
00169 void bindToBank(uint32_t brokerBank, uint32_t agentBank);
00170 };
00171
00172 class PublishThread : public sys::Runnable
00173 {
00174 ManagementAgentImpl& agent;
00175 void run();
00176 public:
00177 PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {}
00178 };
00179
00180 ConnectionThread connThreadBody;
00181 sys::Thread connThread;
00182 PublishThread pubThreadBody;
00183 sys::Thread pubThread;
00184
00185 static const std::string storeMagicNumber;
00186
00187 void startProtocol();
00188 void storeData(bool requested=false);
00189 void retrieveData();
00190 PackageMap::iterator findOrAddPackage(const std::string& name);
00191 void moveNewObjectsLH();
00192 void addClassLocal (uint8_t classKind,
00193 PackageMap::iterator pIter,
00194 const std::string& className,
00195 uint8_t* md5Sum,
00196 management::ManagementObject::writeSchemaCall_t schemaCall);
00197 void encodePackageIndication (framing::Buffer& buf,
00198 PackageMap::iterator pIter);
00199 void encodeClassIndication (framing::Buffer& buf,
00200 PackageMap::iterator pIter,
00201 ClassMap::iterator cIter);
00202 void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
00203 bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
00204 void sendCommandComplete (std::string replyToKey, uint32_t sequence,
00205 uint32_t code = 0, std::string text = std::string("OK"));
00206 void handleAttachResponse (qpid::framing::Buffer& inBuffer);
00207 void handlePackageRequest (qpid::framing::Buffer& inBuffer);
00208 void handleClassQuery (qpid::framing::Buffer& inBuffer);
00209 void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
00210 void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
00211 void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
00212 void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
00213 void handleConsoleAddedIndication();
00214 };
00215
00216 }}
00217
00218 #endif