xrootd
|
00001 00002 // // 00003 // XrdClientPhyConnection // 00004 // Author: Fabrizio Furano (INFN Padova, 2004) // 00005 // Adapted from TXNetFile (root.cern.ch) originally done by // 00006 // Alvise Dorigo, Fabrizio Furano // 00007 // INFN Padova, 2003 // 00008 // // 00009 // Class handling physical connections to xrootd servers // 00010 // // 00012 00013 // $Id$ 00014 00015 #ifndef _XrdClientPhyConnection 00016 #define _XrdClientPhyConnection 00017 00018 #include "XrdClient/XrdClientPSock.hh" 00019 #include "XrdClient/XrdClientMessage.hh" 00020 #include "XrdClient/XrdClientUnsolMsg.hh" 00021 #include "XrdClient/XrdClientInputBuffer.hh" 00022 #include "XrdClient/XrdClientUrlInfo.hh" 00023 #include "XrdClient/XrdClientThread.hh" 00024 #include "XrdSys/XrdSysPthread.hh" 00025 #include "XrdSys/XrdSysSemWait.hh" 00026 00027 #include <time.h> // for time_t data type 00028 00029 enum ELoginState { 00030 kNo = 0, 00031 kYes = 1, 00032 kPending = 2 00033 }; 00034 00035 enum ERemoteServerType { 00036 kSTError = -1, // Some error occurred: server type undetermined 00037 kSTNone = 0, // Remote server type un-recognized 00038 kSTRootd = 1, // Remote server type: old rootd server 00039 kSTBaseXrootd = 2, // Remote server type: xrootd dynamic load balancer 00040 kSTDataXrootd = 3 // Remote server type: xrootd data server 00041 }; 00042 00043 class XrdClientSid; 00044 class XrdSecProtocol; 00045 00046 class XrdClientPhyConnection: public XrdClientUnsolMsgSender { 00047 00048 private: 00049 time_t fLastUseTimestamp; 00050 enum ELoginState fLogged; // only 1 login/auth is needed for physical 00051 XrdSecProtocol *fSecProtocol; // authentication protocol 00052 00053 XrdClientInputBuffer 00054 fMsgQ; // The queue used to hold incoming messages 00055 00056 int fRequestTimeout; 00057 bool fMStreamsGoing; 00058 XrdSysRecMutex fRwMutex; // Lock before using the physical channel 00059 // (for reading and/or writing) 00060 00061 XrdSysRecMutex fMutex; 00062 XrdSysRecMutex fMultireadMutex; // Used to arbitrate between multiple 00063 // threads reading msgs from the same conn 00064 00065 XrdClientThread *fReaderthreadhandler[64]; // The thread which is going to pump 00066 // out the data from the socket 00067 00068 int fReaderthreadrunning; 00069 00070 XrdClientUrlInfo fServer; 00071 00072 XrdClientSock *fSocket; 00073 00074 UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m); 00075 00076 XrdSysSemWait fReaderCV; 00077 00078 short fLogConnCnt; // Number of logical connections using this phyconn 00079 00080 XrdClientSid *fSidManager; 00081 00082 public: 00083 long fServerProto; // The server protocol 00084 ERemoteServerType fServerType; 00085 long fTTLsec; 00086 00087 XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h, XrdClientSid *sid); 00088 ~XrdClientPhyConnection(); 00089 00090 XrdClientMessage *BuildMessage(bool IgnoreTimeouts, bool Enqueue); 00091 bool CheckAutoTerm(); 00092 00093 bool Connect(XrdClientUrlInfo RemoteHost, bool isUnix = 0); 00094 void CountLogConn(int d = 1); 00095 void Disconnect(); 00096 00097 ERemoteServerType 00098 DoHandShake(ServerInitHandShake &xbody, 00099 int substreamid = 0); 00100 00101 bool ExpiredTTL(); 00102 short GetLogConnCnt() const { return fLogConnCnt; } 00103 int GetReaderThreadsCnt() { XrdSysMutexHelper l(fMutex); return fReaderthreadrunning; } 00104 00105 long GetTTL() { return fTTLsec; } 00106 00107 XrdSecProtocol *GetSecProtocol() const { return fSecProtocol; } 00108 int GetSocket() { return fSocket ? fSocket->fSocket : -1; } 00109 00110 // Tells to the sock to rebuild the list of interesting selectors 00111 void ReinitFDTable() { if (fSocket) fSocket->ReinitFDTable(); } 00112 00113 int SaveSocket() { fTTLsec = 0; return fSocket ? (fSocket->SaveSocket()) : -1; } 00114 void SetInterrupt() { if (fSocket) fSocket->SetInterrupt(); } 00115 void SetSecProtocol(XrdSecProtocol *sp) { fSecProtocol = sp; } 00116 00117 void StartedReader(); 00118 00119 bool IsAddress(const XrdOucString &addr) { 00120 return ( (fServer.Host == addr) || 00121 (fServer.HostAddr == addr) ); 00122 } 00123 00124 ELoginState IsLogged(); 00125 00126 bool IsPort(int port) { return (fServer.Port == port); }; 00127 bool IsUser(const XrdOucString &usr) { return (fServer.User == usr); }; 00128 bool IsValid(); 00129 00130 00131 void LockChannel(); 00132 00133 // see XrdClientSock for the meaning of the parameters 00134 int ReadRaw(void *buffer, int BufferLength, int substreamid = -1, 00135 int *usedsubstreamid = 0); 00136 00137 XrdClientMessage *ReadMessage(int streamid); 00138 bool ReConnect(XrdClientUrlInfo RemoteHost); 00139 void SetLogged(ELoginState status) { fLogged = status; } 00140 inline void SetTTL(long ttl) { fTTLsec = ttl; } 00141 void StartReader(); 00142 void Touch(); 00143 void UnlockChannel(); 00144 int WriteRaw(const void *buffer, int BufferLength, int substreamid = 0); 00145 00146 int TryConnectParallelStream(int port, int windowsz, int sockid) { return ( fSocket ? fSocket->TryConnectParallelSock(port, windowsz, sockid) : -1); } 00147 int EstablishPendingParallelStream(int tmpid, int newid) { return ( fSocket ? fSocket->EstablishParallelSock(tmpid, newid) : -1); } 00148 void RemoveParallelStream(int substreamid) { if (fSocket) fSocket->RemoveParallelSock(substreamid); } 00149 // Tells if the attempt to establish the parallel streams is ongoing or was done 00150 // and mark it as ongoing or done 00151 bool TestAndSetMStreamsGoing(); 00152 00153 int GetSockIdHint(int reqsperstream) { return ( fSocket ? fSocket->GetSockIdHint(reqsperstream) : 0); } 00154 int GetSockIdCount() {return ( fSocket ? fSocket->GetSockIdCount() : 0); } 00155 void PauseSelectOnSubstream(int substreamid) { if (fSocket) fSocket->PauseSelectOnSubstream(substreamid); } 00156 void RestartSelectOnSubstream(int substreamid) { if (fSocket) fSocket->RestartSelectOnSubstream(substreamid); } 00157 00158 // To prohibit/re-enable a socket descriptor from being looked at by the reader threads 00159 virtual void BanSockDescr(int sockdescr, int sockid) { if (fSocket) fSocket->BanSockDescr(sockdescr, sockid); } 00160 virtual void UnBanSockDescr(int sockdescr) { if (fSocket) fSocket->UnBanSockDescr(sockdescr); } 00161 00162 void ReadLock() { fMultireadMutex.Lock(); } 00163 void ReadUnLock() { fMultireadMutex.UnLock(); } 00164 00165 int WipeStreamid(int streamid) { return fMsgQ.WipeStreamid(streamid); } 00166 }; 00167 00168 00169 00170 00171 // 00172 // Class implementing a trick to automatically unlock an XrdClientPhyConnection 00173 // 00174 class XrdClientPhyConnLocker { 00175 private: 00176 XrdClientPhyConnection *phyconn; 00177 00178 public: 00179 XrdClientPhyConnLocker(XrdClientPhyConnection *phyc) { 00180 // Constructor 00181 phyconn = phyc; 00182 phyconn->LockChannel(); 00183 } 00184 00185 ~XrdClientPhyConnLocker(){ 00186 // Destructor. 00187 phyconn->UnlockChannel(); 00188 } 00189 00190 }; 00191 00192 00193 #endif