xrootd
|
00001 00002 // // 00003 // XrdClientConn // 00004 // // 00005 // Author: Fabrizio Furano (INFN Padova, 2004) // 00006 // Adapted from TXNetFile (root.cern.ch) originally done by // 00007 // Alvise Dorigo, Fabrizio Furano // 00008 // INFN Padova, 2003 // 00009 // // 00010 // High level handler of connections to xrootd. // 00011 // // 00013 00014 // $Id$ 00015 00016 #ifndef XRD_CONN_H 00017 #define XRD_CONN_H 00018 00019 00020 #include "XrdClient/XrdClientConst.hh" 00021 00022 #include "time.h" 00023 #include "XrdClient/XrdClientConnMgr.hh" 00024 #include "XrdClient/XrdClientMessage.hh" 00025 #include "XrdClient/XrdClientUrlInfo.hh" 00026 #include "XrdClient/XrdClientReadCache.hh" 00027 #include "XrdOuc/XrdOucHash.hh" 00028 00029 #define ConnectionManager XrdClientConn::GetConnectionMgr() 00030 00031 class XrdClientAbs; 00032 class XrdSecProtocol; 00033 00034 class XrdClientConn { 00035 00036 public: 00037 00038 enum ESrvErrorHandlerRetval { 00039 kSEHRReturnMsgToCaller = 0, 00040 kSEHRBreakLoop = 1, 00041 kSEHRContinue = 2, 00042 kSEHRReturnNoMsgToCaller = 3, 00043 kSEHRRedirLimitReached = 4 00044 }; 00045 enum EThreeStateReadHandler { 00046 kTSRHReturnMex = 0, 00047 kTSRHReturnNullMex = 1, 00048 kTSRHContinue = 2 00049 }; 00050 00051 // To keep info about an open session 00052 struct SessionIDInfo { 00053 char id[16]; 00054 }; 00055 00056 int fLastDataBytesRecv; 00057 int fLastDataBytesSent; 00058 XErrorCode fOpenError; 00059 00060 XrdOucString fRedirOpaque; // Opaque info returned by the server when 00061 00062 // redirecting. To be used in the next opens 00063 XrdClientConn(); 00064 virtual ~XrdClientConn(); 00065 00066 inline bool CacheWillFit(long long bytes) { 00067 if (!fMainReadCache) 00068 return FALSE; 00069 return fMainReadCache->WillFit(bytes); 00070 } 00071 00072 bool CheckHostDomain(XrdOucString hostToCheck); 00073 short Connect(XrdClientUrlInfo Host2Conn, 00074 XrdClientAbsUnsolMsgHandler *unsolhandler); 00075 void Disconnect(bool ForcePhysicalDisc); 00076 virtual bool GetAccessToSrv(); 00077 XReqErrorType GoBackToRedirector(); 00078 00079 XrdOucString GetClientHostDomain() { return fgClientHostDomain; } 00080 00081 00082 static XrdClientPhyConnection *GetPhyConn(int LogConnID); 00083 00084 00085 // --------- Cache related stuff 00086 00087 long GetDataFromCache(const void *buffer, 00088 long long begin_offs, 00089 long long end_offs, 00090 bool PerfCalc, 00091 XrdClientIntvList &missingblks, 00092 long &outstandingblks ); 00093 00094 bool SubmitDataToCache(XrdClientMessage *xmsg, 00095 long long begin_offs, 00096 long long end_offs); 00097 00098 bool SubmitRawDataToCache(const void *buffer, 00099 long long begin_offs, 00100 long long end_offs); 00101 00102 void SubmitPlaceholderToCache(long long begin_offs, 00103 long long end_offs) { 00104 if (fMainReadCache) 00105 fMainReadCache->PutPlaceholder(begin_offs, end_offs); 00106 } 00107 00108 00109 void RemoveAllDataFromCache(bool keepwriteblocks=true) { 00110 if (fMainReadCache) 00111 fMainReadCache->RemoveItems(keepwriteblocks); 00112 } 00113 00114 void RemoveDataFromCache(long long begin_offs, 00115 long long end_offs, bool remove_overlapped = false) { 00116 if (fMainReadCache) 00117 fMainReadCache->RemoveItems(begin_offs, end_offs, remove_overlapped); 00118 } 00119 00120 void RemovePlaceholdersFromCache() { 00121 if (fMainReadCache) 00122 fMainReadCache->RemovePlaceholders(); 00123 } 00124 00125 void PrintCache() { 00126 if (fMainReadCache) 00127 fMainReadCache->PrintCache(); 00128 } 00129 00130 00131 bool GetCacheInfo( 00132 // The actual cache size 00133 int &size, 00134 00135 // The number of bytes submitted since the beginning 00136 long long &bytessubmitted, 00137 00138 // The number of bytes found in the cache (estimate) 00139 long long &byteshit, 00140 00141 // The number of reads which did not find their data 00142 // (estimate) 00143 long long &misscount, 00144 00145 // miss/totalreads ratio (estimate) 00146 float &missrate, 00147 00148 // number of read requests towards the cache 00149 long long &readreqcnt, 00150 00151 // ratio between bytes found / bytes submitted 00152 float &bytesusefulness 00153 ) { 00154 if (!fMainReadCache) return false; 00155 00156 fMainReadCache->GetInfo(size, 00157 bytessubmitted, 00158 byteshit, 00159 misscount, 00160 missrate, 00161 readreqcnt, 00162 bytesusefulness); 00163 return true; 00164 } 00165 00166 00167 void SetCacheSize(int CacheSize) { 00168 if (!fMainReadCache && CacheSize) 00169 fMainReadCache = new XrdClientReadCache(); 00170 00171 if (fMainReadCache) 00172 fMainReadCache->SetSize(CacheSize); 00173 } 00174 00175 void SetCacheRmPolicy(int RmPolicy) { 00176 if (fMainReadCache) 00177 fMainReadCache->SetBlkRemovalPolicy(RmPolicy); 00178 } 00179 00180 void UnPinCacheBlk(long long begin_offs, long long end_offs) { 00181 fMainReadCache->UnPinCacheBlk(begin_offs, end_offs); 00182 // Also use this to signal the possibility to proceed for a hard checkpoint 00183 fWriteWaitAck->Broadcast(); 00184 } 00185 00186 00187 // ------------------- 00188 00189 00190 int GetLogConnID() const { return fLogConnID; } 00191 00192 ERemoteServerType GetServerType() const { return fServerType; } 00193 00194 kXR_unt16 GetStreamID() const { return fPrimaryStreamid; } 00195 00196 inline XrdClientUrlInfo *GetLBSUrl() { return fLBSUrl; } 00197 inline XrdClientUrlInfo GetCurrentUrl() { return fUrl; } 00198 inline XrdClientUrlInfo GetRedirUrl() { return fREQUrl; } 00199 00200 XErrorCode GetOpenError() const { return fOpenError; } 00201 virtual XReqErrorType GoToAnotherServer(XrdClientUrlInfo &newdest); 00202 bool IsConnected() const { return fConnected; } 00203 bool IsPhyConnConnected(); 00204 00205 struct ServerResponseHeader 00206 LastServerResp; 00207 00208 struct ServerResponseBody_Error 00209 LastServerError; 00210 00211 void ClearLastServerError() { 00212 memset(&LastServerError, 0, sizeof(LastServerError)); 00213 LastServerError.errnum = kXR_noErrorYet; 00214 } 00215 00216 UnsolRespProcResult ProcessAsynResp(XrdClientMessage *unsolmsg); 00217 00218 virtual bool SendGenCommand(ClientRequest *req, 00219 const void *reqMoreData, 00220 void **answMoreDataAllocated, 00221 void *answMoreData, bool HasToAlloc, 00222 char *CmdName, int substreamid = 0); 00223 00224 int GetOpenSockFD() const { return fOpenSockFD; } 00225 00226 void SetClientHostDomain(const char *src) { fgClientHostDomain = src; } 00227 void SetConnected(bool conn) { fConnected = conn; } 00228 00229 void SetOpenError(XErrorCode err) { fOpenError = err; } 00230 00231 // Gets a parallel stream id to use to set the return path for a re 00232 int GetParallelStreamToUse(int reqsperstream); 00233 int GetParallelStreamCount(); // Returns the total number of connected streams 00234 00235 void SetRedirHandler(XrdClientAbs *rh) { fRedirHandler = rh; } 00236 00237 void SetRequestedDestHost(char *newh, kXR_int32 port) { 00238 fREQUrl = fUrl; 00239 fREQUrl.Host = newh; 00240 fREQUrl.Port = port; 00241 fREQUrl.SetAddrFromHost(); 00242 } 00243 00244 // Puts this instance in pause state for wsec seconds. 00245 // A value <= 0 revokes immediately the pause state 00246 void SetREQPauseState(kXR_int32 wsec) { 00247 // Lock mutex 00248 fREQWait->Lock(); 00249 00250 if (wsec > 0) 00251 fREQWaitTimeLimit = time(0) + wsec; 00252 else { 00253 fREQWaitTimeLimit = 0; 00254 fREQWait->Broadcast(); 00255 } 00256 00257 // UnLock mutex 00258 fREQWait->UnLock(); 00259 } 00260 00261 // Puts this instance in connect-pause state for wsec seconds. 00262 // Any future connection attempt will not happen before wsec 00263 // and the first one will be towards the given host 00264 void SetREQDelayedConnectState(kXR_int32 wsec) { 00265 // Lock mutex 00266 fREQConnectWait->Lock(); 00267 00268 if (wsec > 0) 00269 fREQConnectWaitTimeLimit = time(0) + wsec; 00270 else { 00271 fREQConnectWaitTimeLimit = 0; 00272 fREQConnectWait->Broadcast(); 00273 } 00274 00275 // UnLock mutex 00276 fREQConnectWait->UnLock(); 00277 } 00278 00279 void SetSID(kXR_char *sid); 00280 inline void SetUrl(XrdClientUrlInfo thisUrl) { fUrl = thisUrl; } 00281 00282 // Sends the request to the server, through logconn with ID LogConnID 00283 // The request is sent with a streamid 'child' of the current one, then marked as pending 00284 // Its answer will be caught asynchronously 00285 XReqErrorType WriteToServer_Async(ClientRequest *req, 00286 const void* reqMoreData, 00287 int substreamid = 0); 00288 00289 static XrdClientConnectionMgr *GetConnectionMgr() 00290 { return fgConnectionMgr;} //Instance of the conn manager 00291 00292 void GetSessionID(SessionIDInfo &sess) { 00293 XrdOucString sessname; 00294 char buf[20]; 00295 00296 snprintf(buf, 20, "%d", fUrl.Port); 00297 00298 sessname = fUrl.HostAddr; 00299 if (sessname.length() <= 0) 00300 sessname = fUrl.Host; 00301 00302 sessname += ":"; 00303 sessname += buf; 00304 00305 sess = *( fSessionIDRepo.Find(sessname.c_str()) ); 00306 } 00307 00308 long GetServerProtocol() { return fServerProto; } 00309 00310 short GetMaxRedirCnt() const { return fMaxGlobalRedirCnt; } 00311 void SetMaxRedirCnt(short mx) {fMaxGlobalRedirCnt = mx; } 00312 short GetRedirCnt() const { return fGlobalRedirCnt; } 00313 00314 bool DoWriteSoftCheckPoint(); 00315 bool DoWriteHardCheckPoint(); 00316 void UnPinCacheBlk(); 00317 00318 00319 // To give a max number of seconds for an operation to complete, no matter what happens inside 00320 // e.g. redirections, sleeps, failed connection attempts etc. 00321 void SetOpTimeLimit(int delta_secs); 00322 bool IsOpTimeLimitElapsed(time_t timenow); 00323 00324 00325 protected: 00326 void SetLogConnID(int cid) { fLogConnID = cid; } 00327 void SetStreamID(kXR_unt16 sid) { fPrimaryStreamid = sid; } 00328 00329 00330 00331 // The handler which first tried to connect somewhere 00332 XrdClientAbsUnsolMsgHandler *fUnsolMsgHandler; 00333 00334 XrdClientUrlInfo fUrl; // The current URL 00335 XrdClientUrlInfo *fLBSUrl; // Needed to save the load balancer url 00336 XrdClientUrlInfo fREQUrl; // For explicitly requested redirs 00337 00338 short fGlobalRedirCnt; // Number of redirections 00339 00340 private: 00341 00342 static XrdOucString fgClientHostDomain; // Save the client's domain name 00343 bool fConnected; 00344 bool fGettingAccessToSrv; // To avoid recursion in desperate situations 00345 time_t fGlobalRedirLastUpdateTimestamp; // Timestamp of last redirection 00346 00347 int fLogConnID; // Logical connection ID used 00348 kXR_unt16 fPrimaryStreamid; // Streamid used for normal communication 00349 // NB it's a copy of the one contained in 00350 // the logconn 00351 00352 short fMaxGlobalRedirCnt; 00353 XrdClientReadCache *fMainReadCache; 00354 00355 // The time limit for a transaction 00356 time_t fOpTimeLimit; 00357 00358 XrdClientAbs *fRedirHandler; // Pointer to a class inheriting from 00359 // XrdClientAbs providing methods 00360 // to handle a redir at higher level 00361 00362 XrdOucString fRedirInternalToken; // Token returned by the server when 00363 // redirecting. To be used in the next logins 00364 00365 XrdSysCondVar *fREQWaitResp; // For explicitly requested delayed async responses 00366 ServerResponseBody_Attn_asynresp * 00367 fREQWaitRespData; // For explicitly requested delayed async responses 00368 00369 time_t fREQWaitTimeLimit; // For explicitly requested pause state 00370 XrdSysCondVar *fREQWait; // For explicitly requested pause state 00371 time_t fREQConnectWaitTimeLimit; // For explicitly requested delayed reconnect 00372 XrdSysCondVar *fREQConnectWait; // For explicitly requested delayed reconnect 00373 00374 long fServerProto; // The server protocol 00375 ERemoteServerType fServerType; // Server type as returned by doHandShake() 00376 00377 static XrdOucHash<SessionIDInfo> 00378 fSessionIDRepo; // The repository of session IDs, shared. 00379 // Association between 00380 // <hostname>:<port> and a SessionIDInfo struct 00381 00382 int fOpenSockFD; // Descriptor of the underlying socket 00383 static XrdClientConnectionMgr *fgConnectionMgr; //Instance of the Connection Manager 00384 00385 XrdSysCondVar *fWriteWaitAck; 00386 XrdClientVector<ClientRequest> fWriteReqsToRetry; // To store the write reqs to retry in case of a disconnection 00387 00388 bool CheckErrorStatus(XrdClientMessage *, short &, char *); 00389 void CheckPort(int &port); 00390 void CheckREQPauseState(); 00391 void CheckREQConnectWaitState(); 00392 bool CheckResp(struct ServerResponseHeader *resp, const char *method); 00393 XrdClientMessage *ClientServerCmd(ClientRequest *req, 00394 const void *reqMoreData, 00395 void **answMoreDataAllocated, 00396 void *answMoreData, 00397 bool HasToAlloc, 00398 int substreamid = 0); 00399 XrdSecProtocol *DoAuthentication(char *plist, int plsiz); 00400 00401 ERemoteServerType DoHandShake(short log); 00402 00403 bool DoLogin(); 00404 bool DomainMatcher(XrdOucString dom, XrdOucString domlist); 00405 00406 XrdOucString GetDomainToMatch(XrdOucString hostname); 00407 00408 ESrvErrorHandlerRetval HandleServerError(XReqErrorType &, XrdClientMessage *, 00409 ClientRequest *); 00410 bool MatchStreamid(struct ServerResponseHeader *ServerResponse); 00411 00412 // Sends a close request, without waiting for an answer 00413 // useful (?) to be sent just before closing a badly working stream 00414 bool PanicClose(); 00415 00416 XrdOucString ParseDomainFromHostname(XrdOucString hostname); 00417 00418 XrdClientMessage *ReadPartialAnswer(XReqErrorType &, size_t &, 00419 ClientRequest *, bool, void**, 00420 EThreeStateReadHandler &); 00421 00422 void ClearSessionID(); 00423 00424 XReqErrorType WriteToServer(ClientRequest *req, 00425 const void* reqMoreData, 00426 short LogConnID, 00427 int substreamid = 0); 00428 00429 bool WaitResp(int secsmax); 00430 }; 00431 00432 00433 00434 #endif