XRootD
Loading...
Searching...
No Matches
XrdCmsBaseFS.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s B a s e F S . c c */
4/* */
5/* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cerrno>
32#include <ctime>
33#include <sys/time.h>
34#include <sys/types.h>
35#include <cstdio>
36
38#include "XProtocol/XPtypes.hh"
39
43#include "XrdCms/XrdCmsTrace.hh"
44
45#include "XrdOss/XrdOss.hh"
46
47#include "XrdSfs/XrdSfsFlags.hh"
48
49#include "XrdSys/XrdSysError.hh"
50#include "XrdSys/XrdSysTimer.hh"
51
52using namespace XrdCms;
53
54/******************************************************************************/
55/* E x t e r n a l T h r e a d I n t e r f a c e s */
56/******************************************************************************/
57
58void *XrdCmsBasePacer(void *carg)
59 {((XrdCmsBaseFS *)carg)->Pacer();
60 return (void *)0;
61 }
62
63void *XrdCmsBaseRunner(void *carg)
64 {((XrdCmsBaseFS *)carg)->Runner();
65 return (void *)0;
66 }
67
68/******************************************************************************/
69/* Private: B y p a s s */
70/******************************************************************************/
71
72int XrdCmsBaseFS::Bypass()
73{
74 static XrdSysTimer Window;
75
76// If we are not timing requests, we can bypass (typically checked beforehand)
77//
78 if (!theQ.rLimit) return 1;
79
80// If this is a fixed rate queue then we cannot bypass
81//
82 if (Fixed) return 0;
83
84// Check if we can reset the number of requests that can be issued inline. We
85// do this to bypass the queue unless until we get flooded by requests.
86//
87 theQ.Mutex.Lock();
88 if (!theQ.rLeft && !theQ.pqFirst)
89 {unsigned long Interval = 0;
90 Window.Report(Interval);
91 if (Interval >= 450)
92 {theQ.rLeft = theQ.rAgain;
93 Window.Reset();
94 std::cerr <<"BYPASS " <<Interval <<"ms left=" <<theQ.rLeft <<std::endl;
95 }
96 }
97
98// At this point we may or may not have freebies left
99//
100 if (theQ.rLeft > 0)
101 {theQ.rLeft--; theQ.Mutex.UnLock();
102 return 1;
103 }
104
105// This request must be queued
106//
107 theQ.Mutex.UnLock();
108 return 0;
109}
110
111/******************************************************************************/
112/* Public: E x i s t s */
113/******************************************************************************/
114
116{
117 int aOK, fnPos;
118
119// If we cannot do this locally, then we need to forward the request but only
120// if we have a route. Otherwise, just indicate that queueing is necessary.
121//
122 if (!lclStat)
123 {aOK = (!theQ.rLimit || noLim || (!Fixed && Bypass()));
124 if (Who.rovec) Queue(Arg, Who, -(Arg.PathLen-1), !aOK);
125 return 0;
126 }
127
128// If directory checking is enabled, find where the directory component ends
129// and then check if we even have this directory.
130//
131 if (dmLife)
132 {for (fnPos=Arg.PathLen-2;fnPos >= 0 && Arg.Path[fnPos] != '/';fnPos--) {}
133 if (fnPos > 0 && !hasDir(Arg.Path, fnPos)) return -1;
134 } else fnPos = 0;
135
136// If we are not limiting requests, or not limiting everyone and this is not
137// a meta-manager, or we are not timing requests and can skip the queue; then
138// issue the fstat() inline and report back the result.
139//
140 if (!theQ.rLimit || noLim || (Fixed && Bypass()))
141 return Exists(Arg.Path, fnPos);
142
143// We can't do this now, so forcibly queue the request
144//
145 if (Who.rovec) Queue(Arg, Who, fnPos, 1);
146 return 0;
147}
148
149/******************************************************************************/
150
151int XrdCmsBaseFS::Exists(char *Path, int fnPos, int UpAT)
152{
153 EPNAME("Exists");
154 static struct dMoP dirMiss = {0}, dirPres = {1};
155 static int badDStat = 0;
156 static int badFStat = 0;
157 struct stat buf;
158 int eCnt, fRC, dRC;
159 int Opts = (UpAT ? XRDOSS_resonly|XRDOSS_updtatm : XRDOSS_resonly);
160
161// If directory checking is enabled, find where the directory component ends
162// if so requested.
163//
164 if (fnPos < 0 && dmLife)
165 {for (fnPos = -(fnPos+1); fnPos >= 0 && Path[fnPos] != '/'; fnPos--) {}
166 if (fnPos > 0 && !hasDir(Path, fnPos)) return -1;
167 }
168
169// Issue stat() via oss plugin. If it succeeds, return result.
170//
171 if (!(fRC = Config.ossFS->Stat(Path, &buf, Opts)))
172 {if ((buf.st_mode & S_IFMT) == S_IFREG)
173 return (buf.st_mode & XRDSFS_POSCPEND ? CmsHaveRequest::Pending
175
176 return (buf.st_mode & S_IFMT) == S_IFDIR ? CmsHaveRequest::Online : -1;
177 }
178
179// The entry does not exist but if we are a staging server then it may be in
180// the prepare queue in which case we must say that it is pending arrival.
181//
182 if (Config.DiskSS && PrepQ.Exists(Path)) return CmsHaveRequest::Pending;
183
184// The entry does not exist. Check if the directory exists and if not, put it
185// in our directory missing table so we don't keep hitting this directory.
186// This is disabled by default and enabled by the cms.dfs directive.
187//
188 if (fnPos > 0 && dmLife)
189 {struct dMoP *xVal = &dirMiss;
190 int xLife = dmLife;
191 Path[fnPos] = '\0';
192 if (!(dRC = Config.ossFS->Stat(Path, &buf, XRDOSS_resonly)))
193 {xLife = dpLife; xVal = &dirPres;}
194 if (dRC && dRC != -ENOENT)
195 {fsMutex.Lock(); eCnt = badDStat++; fsMutex.UnLock();
196 if (!(eCnt & 0xff))
197 {char buff[80];
198 snprintf(buff, sizeof(buff), "to stat dir (events=%d)", eCnt+1);
199 Say.Emsg("Exists", dRC, buff, Path);
200 Path[fnPos] = '/';
201 }
202 } else {
203 fsMutex.Lock();
204 fsDirMP.Rep(Path, xVal, xLife, Hash_keepdata);
205 fsMutex.UnLock();
206 DEBUG("add " <<xLife <<(xVal->Present ? " okdir ":" nodir ") <<Path);
207 Path[fnPos] = '/';
208 }
209 } else {
210 if (fRC && fRC != -ENOENT)
211 {fsMutex.Lock(); eCnt = badFStat++; fsMutex.UnLock();
212 if (!(eCnt & 0xff))
213 {char buff[80];
214 snprintf(buff, sizeof(buff), "to stat file (events=%d)", eCnt+1);
215 Say.Emsg("Exists", fRC, buff, Path);
216 }
217 }
218 }
219 return -1;
220}
221
222/******************************************************************************/
223/* Private: h a s D i r */
224/******************************************************************************/
225
226int XrdCmsBaseFS::hasDir(char *Path, int fnPos)
227{
228 struct dMoP *dP;
229 int Have;
230
231// Strip to directory and check if we have it
232//
233 Path[fnPos] = '\0';
234 fsMutex.Lock();
235 Have = ((dP = fsDirMP.Find(Path)) ? dP->Present : 1);
236 fsMutex.UnLock();
237 Path[fnPos] = '/';
238 return Have;
239}
240
241/******************************************************************************/
242/* I n i t */
243/******************************************************************************/
244
245void XrdCmsBaseFS::Init(int Opts, int DMLife, int DPLife)
246{
247
248// Set values.
249//
250 dmLife = DMLife;
251 dpLife = DPLife ? DPLife : DMLife * 10;
252 Server = (Opts & Servr) != 0;
253 lclStat = (Opts & Cntrl) != 0 || Server;
254 preSel = (Opts & Immed) == 0;
255 dfsSys = (Opts & DFSys) != 0;
256}
257
258/******************************************************************************/
259/* L i m i t */
260/******************************************************************************/
261
262void XrdCmsBaseFS::Limit(int rLim, int Qmax)
263{
264
265// Establish the limits
266//
267 if (rLim < 0) {theQ.rAgain=theQ.rLeft = -1; rLim = -rLim; Fixed = 1;}
268 else {theQ.rAgain = theQ.rLeft = (rLim > 1 ? rLim/2 : 1); Fixed = 0;}
269 theQ.rLimit = (rLim <= 1000 ? rLim : 0);
270 if (Qmax > 0) theQ.qMax = Qmax;
271 else if (!(theQ.qMax = theQ.rLimit*2 + theQ.rLimit/2)) theQ.qMax = 1;
272}
273
274/******************************************************************************/
275/* P a c e r */
276/******************************************************************************/
277
279{
280 XrdCmsBaseFR *rP;
281 int inQ, rqRate = 1000/theQ.rLimit;
282
283// Process requests at the given rate
284//
285do{theQ.pqAvail.Wait();
286 theQ.Mutex.Lock(); inQ = 1;
287 while((rP = theQ.pqFirst))
288 {if (!(theQ.pqFirst = rP->Next)) {theQ.pqLast = 0; inQ = 0;}
289 theQ.Mutex.UnLock();
290 if (rP->PDirLen > 0 && !hasDir(rP->Path, rP->PDirLen))
291 {delete rP; continue;}
292 theQ.Mutex.Lock();
293 if (theQ.rqFirst) {theQ.rqLast->Next = rP; theQ.rqLast = rP;}
294 else {theQ.rqFirst = theQ.rqLast = rP; theQ.rqAvail.Post();}
295 theQ.Mutex.UnLock();
296 XrdSysTimer::Wait(rqRate);
297 if (!inQ) break;
298 theQ.Mutex.Lock();
299 }
300 if (inQ) theQ.Mutex.UnLock();
301 } while(1);
302}
303
304/******************************************************************************/
305/* Q u e u e */
306/******************************************************************************/
307
308void XrdCmsBaseFS::Queue(XrdCmsRRData &Arg, XrdCmsPInfo &Who,
309 int fnpos, int Force)
310{
311 EPNAME("Queue");
312 static int noMsg = 1;
313 XrdCmsBaseFR *rP;
314 int Msg, n, prevHWM;
315
316// If we can bypass the queue and execute this now. Avoid the grabbing the buff.
317//
318 if (!Force)
319 {XrdCmsBaseFR myReq(&Arg, Who, fnpos);
320 Xeq(&myReq);
321 return;
322 }
323
324// Queue this request for callback after an appropriate time.
325// We will also steal the underlying data buffer from the Arg.
326//
327 DEBUG("inq " <<theQ.qNum <<" pace " <<Arg.Path);
328 rP = new XrdCmsBaseFR(Arg, Who, fnpos);
329
330// Add the element to the queue
331//
332 theQ.Mutex.Lock();
333 n = ++theQ.qNum; prevHWM = theQ.qHWM;
334 if ((Msg = (n > prevHWM))) theQ.qHWM = n;
335 if (theQ.pqFirst) {theQ.pqLast->Next = rP; theQ.pqLast = rP;}
336 else {theQ.pqFirst = theQ.pqLast = rP; theQ.pqAvail.Post();}
337 theQ.Mutex.UnLock();
338
339// Issue a warning message if we have an excessive number of requests queued
340//
341 if (n > theQ.qMax && Msg && (n-prevHWM > 3 || noMsg))
342 {int Pct = n/theQ.qMax;
343 char Buff[80];
344 noMsg = 0;
345 sprintf(Buff, "Queue overrun %d%%; %d requests now queued.", Pct, n);
346 Say.Emsg("Pacer", Buff);
347 }
348}
349
350/******************************************************************************/
351/* R u n n e r */
352/******************************************************************************/
353
355{
356 XrdCmsBaseFR *rP;
357 int inQ;
358
359// Process requests at the given rate
360//
361do{theQ.rqAvail.Wait();
362 theQ.Mutex.Lock(); inQ = 1;
363 while((rP = theQ.rqFirst))
364 {if (!(theQ.rqFirst = rP->Next)) {theQ.rqLast = 0; inQ = 0;}
365 theQ.qNum--;
366 theQ.Mutex.UnLock();
367 Xeq(rP); delete rP;
368 if (!inQ) break;
369 theQ.Mutex.Lock();
370 }
371 if (inQ) theQ.Mutex.UnLock();
372 } while(1);
373}
374
375/******************************************************************************/
376/* S t a r t */
377/******************************************************************************/
378
380{
381 EPNAME("Start");
382 void *Me = (void *)this;
383 pthread_t tid;
384
385// Issue some debugging here so we know how we are starting up
386//
387 DEBUG("Srv=" <<int(Server) <<" dfs=" <<int(dfsSys) <<" lcl=" <<int(lclStat)
388 <<" Pre=" <<int(preSel) <<" dmLife=" <<dmLife <<' ' <<dpLife);
389 DEBUG("Lim=" <<theQ.rLimit <<' ' <<theQ.rAgain <<" fix=" <<int(Fixed)
390 <<" Qmax=" <<theQ.qMax);
391
392// Set the passthru option if we can't do this locally and have no limit
393//
394 Punt = (!theQ.rLimit && !lclStat);
395
396// If we need to throttle we will need two threads for the queue. The first is
397// the pacer thread that feeds the runner thread at a fixed rate.
398//
399 if (theQ.rLimit)
400 {if (XrdSysThread::Run(&tid, XrdCmsBasePacer, Me, 0, "fsQ pacer")
401 || XrdSysThread::Run(&tid, XrdCmsBaseRunner, Me, 0, "fsQ runner"))
402 {Say.Emsg("cmsd", errno, "start baseFS queue handler");
403 theQ.rLimit = 0;
404 }
405 }
406}
407
408/******************************************************************************/
409/* Pricate: X e q */
410/******************************************************************************/
411
412void XrdCmsBaseFS::Xeq(XrdCmsBaseFR *rP)
413{
414 int rc;
415
416// If we are not doing local stat calls, callback indicating a forward is needed
417//
418 if (!lclStat)
419 {if (cBack) (*cBack)(rP, 0);
420 return;
421 }
422
423// Check if we can avoid doing a stat()
424//
425 if (dmLife && rP->PDirLen > 0 && !hasDir(rP->Path, rP->PDirLen))
426 {if (cBack) (*cBack)(rP, -1);
427 return;
428 }
429
430// If we have exceeded the queue limit and this is a meta-manager request
431// then just deep-six it. Local requests must complete
432//
433 if (theQ.qNum > theQ.qMax)
434 {Say.Emsg("Xeq", "Queue limit exceeded; ignoring lkup for", rP->Path);
435 return;
436 }
437
438// Perform a local stat() and if we don't have the file
439//
440 rc = Exists(rP->Path, rP->PDirLen);
441 if (cBack) (*cBack)(rP, rc);
442}
#define DEBUG(x)
#define EPNAME(x)
void * XrdCmsBaseRunner(void *carg)
void * XrdCmsBasePacer(void *carg)
#define XRDOSS_resonly
Definition XrdOss.hh:486
#define XRDOSS_updtatm
Definition XrdOss.hh:487
@ Hash_keepdata
Definition XrdOucHash.hh:57
#define stat(a, b)
Definition XrdPosix.hh:101
bool Exists
XrdOucString Path
bool Force
#define XRDSFS_POSCPEND
XrdCmsBaseFR * Next
static const int Immed
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
static const int Servr
void Init(int Opts, int DMlife, int DPLife)
static const int Cntrl
static const int DFSys
SMask_t rovec
T * Find(const char *KeyVal, time_t *KeyTime=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
unsigned long Report(double &)
static void Wait(int milliseconds)
XrdSysError Say
XrdCmsPrepare PrepQ
XrdCmsConfig Config