XRootD
Loading...
Searching...
No Matches
XrdMpxStats.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d M p x S t a t s . c c */
4/* */
5/* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <unistd.h>
33#include <sys/types.h>
34#include <sys/uio.h>
35
36#include "XrdApps/XrdMpxXml.hh"
37#include "XrdNet/XrdNetAddr.hh"
38#include "XrdNet/XrdNetOpts.hh"
40#include "XrdSys/XrdSysError.hh"
45
46/******************************************************************************/
47/* G l o b a l V a r i a b l e s */
48/******************************************************************************/
49
50namespace XrdMpx
51{
53
54 XrdSysError Say(&Logger, "mpxstats");
55
56static const int addSender = 0x0001;
57
58 int Opts;
59};
60
61using namespace XrdMpx;
62
63/******************************************************************************/
64/* X r d M p x O u t */
65/******************************************************************************/
66
68{
69public:
70
74 int Dlen;
75 char Data[8190];
76 char Pad[2];
77 };
78
79void Add(statsBuff *sbP);
80
82
83void *Run(XrdMpxXml *xP);
84
85 XrdMpxOut() : Ready(0), inQ(0), Free(0) {}
87
88private:
89
90XrdSysMutex myMutex;
91XrdSysSemaphore Ready;
92
93statsBuff *inQ;
94statsBuff *Free;
95};
96
97/******************************************************************************/
98/* X r d M p x O u t : : A d d */
99/******************************************************************************/
100
102{
103
104// Add this to the queue and signal the processing thread
105//
106 myMutex.Lock();
107 sbP->Next = inQ;
108 inQ = sbP;
109 Ready.Post();
110 myMutex.UnLock();
111}
112
113/******************************************************************************/
114/* X r d M p x O u t : : g e t B u f f */
115/******************************************************************************/
116
118{
119 statsBuff *sbP;
120
121// Use an available buffer or allocate one
122//
123 myMutex.Lock();
124 if ((sbP = Free)) Free = sbP->Next;
125 else sbP = new statsBuff;
126 myMutex.UnLock();
127 return sbP;
128}
129
130/******************************************************************************/
131/* X r d M p x O u t : : R u n */
132/******************************************************************************/
133
135{
136 XrdNetAddr theAddr;
137 const char *Host = 0;
138 char *bP, obuff[sizeof(statsBuff)*2];
139 statsBuff *sbP;
140 int wLen, rc;
141
142// Simply loop formating and outputing the buffers
143//
144 while(1)
145 {Ready.Wait();
146 myMutex.Lock();
147 if ((sbP = inQ)) inQ = sbP->Next;
148 myMutex.UnLock();
149 if (!sbP) continue;
150 if (xP)
151 {if (!(Opts & addSender)) Host = 0;
152 else if (theAddr.Set(&(sbP->From.Addr))) Host = 0;
153 else Host = theAddr.Name();
154 wLen = xP->Format(Host, sbP->Data, obuff);
155 bP = obuff;
156 } else {
157 bP = sbP->Data;
158 *(bP + sbP->Dlen) = '\n';
159 wLen = sbP->Dlen+1;
160 }
161
162 while(wLen > 0)
163 {do {rc = write(STDOUT_FILENO, bP, wLen);}
164 while(rc < 0 && errno == EINTR);
165 wLen -= rc; bP += rc;
166 }
167
168 myMutex.Lock(); sbP->Next = Free; Free = sbP; myMutex.UnLock();
169 }
170
171// Should never get here
172//
173 return (void *)0;
174}
175
176/******************************************************************************/
177/* G l o b a l O b j e c t s */
178/******************************************************************************/
179
180namespace XrdMpx
181{
183};
184
185/******************************************************************************/
186/* T h r e a d I n t e r f a c e s */
187/******************************************************************************/
188
189void *mainOutput(void *parg)
190{
191 XrdMpxXml *xP = static_cast<XrdMpxXml *>(parg);
192 return statsQ.Run(xP);
193}
194
195/******************************************************************************/
196/* U s a g e */
197/******************************************************************************/
198
199void Usage(int rc)
200{
201 std::cerr <<"\nUsage: mpxstats [-f {cgi|flat|xml}] -p <port> [-s]" <<std::endl;
202 exit(rc);
203}
204
205/******************************************************************************/
206/* m a i n */
207/******************************************************************************/
208
209int main(int argc, char *argv[])
210{
211 extern char *optarg;
212 extern int opterr, optopt;
213 sigset_t myset;
214 pthread_t tid;
216 XrdMpxOut::statsBuff *sbP = 0;
217 XrdNetSocket mySocket(&Say);
218 XrdMpxXml *xP = 0;
219 SOCKLEN_t fromLen;
220 int Port = 0, retc, udpFD;
221 char buff[64], c;
222 bool Debug;
223
224// Process the options
225//
226 opterr = 0; Debug = false; Opts = 0;
227 if (argc > 1 && '-' == *argv[1])
228 while ((c = getopt(argc,argv,"df:p:s")) && ((unsigned char)c != 0xff))
229 { switch(c)
230 {
231 case 'd': Debug = true;
232 break;
233 case 'f': if (!strcmp(optarg, "cgi" )) fType = XrdMpxXml::fmtCGI;
234 else if (!strcmp(optarg, "flat")) fType = XrdMpxXml::fmtFlat;
235 else if (!strcmp(optarg, "xml" )) fType = XrdMpxXml::fmtXML;
236 else {Say.Emsg(":", "Invalid format - ", optarg); Usage(1);}
237 break;
238 case 'h': Usage(0);
239 break;
240 case 'p': if (!(Port = atoi(optarg)))
241 {Say.Emsg(":", "Invalid port number - ", optarg); Usage(1);}
242 break;
243 case 's': Opts |= addSender;
244 break;
245 default: sprintf(buff,"'%c'", optopt);
246 if (c == ':') Say.Emsg(":", buff, "value not specified.");
247 else Say.Emsg(0, buff, "option is invalid");
248 Usage(1);
249 break;
250 }
251 }
252
253// Make sure port has been specified
254//
255 if (!Port) {Say.Emsg(":", "Port has not been specified."); Usage(1);}
256
257// Turn off sigpipe and host a variety of others before we start any threads
258//
259 signal(SIGPIPE, SIG_IGN); // Solaris optimization
260 sigemptyset(&myset);
261 sigaddset(&myset, SIGPIPE);
262 sigaddset(&myset, SIGCHLD);
263 pthread_sigmask(SIG_BLOCK, &myset, NULL);
264
265// Set the default stack size here
266//
267 if (sizeof(long) > 4) XrdSysThread::setStackSize((size_t)1048576);
268 else XrdSysThread::setStackSize((size_t)786432);
269
270// Create a UDP socket and bind it to a port
271//
272 if (mySocket.Open(0, Port, XRDNET_SERVER|XRDNET_UDPSOCKET, 0) < 0)
273 {Say.Emsg(":", -mySocket.LastError(), "create udp socket"); exit(4);}
274 udpFD = mySocket.Detach();
275
276// Establish format
277//
278 if (fType != XrdMpxXml::fmtXML) xP = new XrdMpxXml(fType, Debug);
279
280// Now run a thread to output whatever we get
281//
282 if ((retc = XrdSysThread::Run(&tid, mainOutput, (void *)xP,
283 XRDSYSTHREAD_BIND, "Output")))
284 {Say.Emsg(":", retc, "create output thread"); exit(4);}
285
286// Now simply wait for the messages
287//
288 fromLen = sizeof(sbP->From);
289 while(1)
290 {sbP = statsQ.getBuff();
291 retc = recvfrom(udpFD, sbP->Data, sizeof(sbP->Data), 0,
292 &sbP->From.Addr, &fromLen);
293 if (retc < 0) {Say.Emsg(":", retc, "recv udp message"); exit(8);}
294 sbP->Dlen = retc;
295 statsQ.Add(sbP);
296 }
297
298// Should never get here
299//
300 return 0;
301}
XrdOucPup XrdCmsParser::Pup & Say
int optopt
int main(int argc, char *argv[])
void * mainOutput(void *parg)
void Usage(int rc)
#define XRDNET_SERVER
Definition XrdNetOpts.hh:99
#define XRDNET_UDPSOCKET
Definition XrdNetOpts.hh:79
struct sockaddr Addr
#define write(a, b, c)
Definition XrdPosix.hh:115
bool Debug
#define SOCKLEN_t
#define XRDSYSTHREAD_BIND
XrdNetSockAddr From
void * Run(XrdMpxXml *xP)
void Add(statsBuff *sbP)
statsBuff * getBuff()
int Format(const char *Host, char *ibuff, char *obuff)
Definition XrdMpxXml.cc:242
const char * Name(const char *eName=0, const char **eText=0)
const char * Set(const char *hSpec, int pNum=PortInSpec)
int Open(const char *path, int port=-1, int flags=0, int sockbuffsz=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void setStackSize(size_t stsz, bool force=false)
XrdMpxOut statsQ
static const int addSender
XrdSysLogger Logger