XRootD
Loading...
Searching...
No Matches
XrdXrootdPgrwAio.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d P g r w A i o . c c */
4/* */
5/* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cerrno>
32#include <cstdio>
33#include <sys/uio.h>
34
35#include "Xrd/XrdLink.hh"
36#include "Xrd/XrdScheduler.hh"
38#include "XrdOuc/XrdOucCRC.hh"
40#include "XrdSys/XrdSysError.hh"
48
49#define TRACELINK dataLink
50
51/******************************************************************************/
52/* G l o b a l S t a t i c s */
53/******************************************************************************/
54
56
57namespace XrdXrootd
58{
59extern XrdSysError eLog;
60extern XrdScheduler *Sched;
61}
62using namespace XrdXrootd;
63
64/******************************************************************************/
65/* S t a t i c M e m e b e r s */
66/******************************************************************************/
67
68const char *XrdXrootdPgrwAio::TraceID = "PgrwAio";
69
70/******************************************************************************/
71/* L o c a l S t a t i c s */
72/******************************************************************************/
73
74namespace
75{
76XrdSysMutex fqMutex;
77XrdXrootdPgrwAio *fqFirst = 0;
78int numFree = 0;
79
80static const int maxKeep = 64; // 4 MB to keep in reserve
81}
82
83/******************************************************************************/
84/* A l l o c */
85/******************************************************************************/
86
89 XrdXrootdFile *fP,
91{
92 XrdXrootdPgrwAio *reqP;
93
94// Obtain a preallocated aio request object
95//
96 fqMutex.Lock();
97 if ((reqP = fqFirst))
98 {fqFirst = reqP->nextPgrw;
99 numFree--;
100 }
101 fqMutex.UnLock();
102
103// If we have no object, create a new one
104//
105 if (!reqP) reqP = new XrdXrootdPgrwAio;
106
107// Initialize the object and return it
108//
109 reqP->Init(protP, resp, fP);
110 reqP->nextPgrw = 0;
111 reqP->badCSP = bcsP;
112 return reqP;
113}
114
115/******************************************************************************/
116/* Private: C o p y F 2 L _ A d d 2 Q */
117/******************************************************************************/
118
119bool XrdXrootdPgrwAio::CopyF2L_Add2Q(XrdXrootdAioPgrw *aioP)
120{
121 const char *eMsg;
122 int dlen, rc;
123
124// Dispatch the requested number of aio requests if we have enough data
125//
126 if (dataLen > 0)
127 {if (!aioP && !(aioP = XrdXrootdAioPgrw::Alloc(this)))
128 {if (inFlight) return true;
129 SendError(ENOMEM, "insufficient memory");
130 return false;
131 }
132 if (!(dlen = aioP->Setup2Send(dataOffset, dataLen, eMsg)))
133 {SendError(EINVAL, eMsg);
134 aioP->Recycle();
135 return false;
136 }
137 if ((rc = dataFile->XrdSfsp->pgRead((XrdSfsAio *)aioP)) != SFS_OK)
138 {SendFSError(rc);
139 aioP->Recycle();
140 return false;
141 }
142 inFlight++;
143 TRACEP(FSAIO, "pgrd beg " <<dlen <<'@' <<dataOffset
144 <<" inF=" <<int(inFlight));
145 dataOffset += dlen;
146 dataLen -= dlen;
147 if (dataLen <= 0)
148 {dataFile->aioFob->Schedule(Protocol);
149 aioState |= aioSchd;
150 }
151 }
152 return true;
153}
154
155/******************************************************************************/
156/* Private: C o p y F 2 L */
157/******************************************************************************/
158
159void XrdXrootdPgrwAio::CopyF2L()
160{
161 XrdXrootdAioBuff *bP;
162 XrdXrootdAioPgrw *aioP;
163
164// Pick a finished element off the pendQ. Wait for an oustanding buffer if we
165// reached our buffer limit. Otherwise, ask for a return if we can start anew.
166// Note: We asked getBuff() if it returns nil to not release the lock.
167//
169 if (!(bP = getBuff(doWait)))
170 {if (isDone || !CopyF2L_Add2Q()) break;
171 continue;
172 }
173
174// Step 1: do some tracing
175//
176 TRACEP(FSAIO,"pgrd end "<<bP->sfsAio.aio_nbytes<<'@'<<bP->sfsAio.aio_offset
177 <<" result="<<bP->Result<<" D-S="<<isDone<<'-'<<int(Status)
178 <<" inF="<<int(inFlight));
179
180// Step 2: Validate this buffer
181//
182 if (!Validate(bP))
183 {if (bP != finalRead) bP->Recycle();
184 continue;
185 }
186
187// Step 3: Get a pointer to the derived type (we avoid dynamic cast)
188//
189 aioP = bP->pgrwP;
190
191// Step 4: If this aio request was simulated (indicated by cksVec being nil)
192// we have to compute the checksums and reset the pointer via noChkSums().
193//
194 if (aioP->noChkSums() && aioP->Result > 0)
196 aioP->sfsAio.aio_offset, aioP->Result, aioP->cksVec);
197
198// Step 5: If this is the last block to be read then save it for final status
199//
200 if (inFlight == 0 && dataLen == 0 && !finalRead)
201 {finalRead = aioP;
202 break;
203 }
204
205// Step 8: Send the data to the client and if successful, see if we need to
206// schedule more data to be read from the data source.
207//
208 if (!isDone && SendData(aioP) && dataLen) {if (!CopyF2L_Add2Q(aioP)) break;}
209 else aioP->Recycle();
210
211 } while(inFlight > 0);
212
213// If we are here then the request has finished. If all went well,
214// fire off the final response.
215//
216 if (!isDone) SendData(finalRead, true);
217 if (finalRead) finalRead->Recycle();
218
219// If we encountered a fatal link error then cancel any pending aio reads on
220// this link. Otherwise if we have not yet scheduled the next aio, do so.
221//
222 if (aioState & aioDead) dataFile->aioFob->Reset(Protocol);
223 else if (!(aioState & aioSchd)) dataFile->aioFob->Schedule(Protocol);
224
225// Do a quick drain if something is still in flight for logging purposes.
226// If the quick drain wasn't successful, then draining will be done in
227// the background; which, of course, might never complete. Otherwise, recycle.
228//
229 if (!inFlight) Recycle(true);
230 else Recycle(Drain());
231}
232
233/******************************************************************************/
234/* Private: C o p y L 2 F */
235/******************************************************************************/
236
237int XrdXrootdPgrwAio::CopyL2F()
238{
239 XrdXrootdAioBuff *bP;
240 XrdXrootdAioPgrw *aioP;
241 const char *eMsg;
242 int dLen, ioVNum, rc;
243
244// Pick a finished element off the pendQ. If there are no elements then get
245// a new one if we can. Otherwise, we will have to wait for one to come back.
246// Unlike read() writes are bound to a socket and we cannot reliably
247// give up the thread by returning to level 0.
248//
250 if (!(bP = getBuff(doWait)))
251 {if (isDone) return 0;
252 if (!(aioP = XrdXrootdAioPgrw::Alloc(this)))
253 {SendError(ENOMEM, "insufficient memory");
254 return 0;
255 }
256 } else {
257 aioP = bP->pgrwP;
258
259 TRACEP(FSAIO,"pgwr end "<<aioP->sfsAio.aio_nbytes<<'@'
260 <<aioP->sfsAio.aio_offset<<" result="<<aioP->Result
261 <<" D-S="<<isDone<<'-'<<int(Status)<<" inF="<<int(inFlight));
262
263// If the aio failed, send an error
264//
265 if (aioP->Result <= 0)
266 {SendError(-aioP->Result, 0);
267 aioP->Recycle();
268 return 0; // Caller will drain
269 }
270
271// If we have no data or status was posted, ignore the result
272//
273 if (dataLen <= 0 || isDone)
274 {aioP->Recycle();
275 continue;
276 }
277 }
278
279// Setup the aio object
280//
281 dLen = aioP->Setup2Recv(dataOffset, dataLen, eMsg);
282 if (!dLen)
283 {SendError(EINVAL, eMsg);
284 aioP->Recycle();
285 return 0;
286 }
288 dataLen -= dLen;
289
290// Get the iovec information
291//
292 struct iovec *ioV = aioP->iov4Recv(ioVNum);
293
294// Issue the read to get the data into the buffer
295//
296 if ((rc = Protocol->getData(this, "pgWrite", ioV, ioVNum)))
297 {if (rc > 0) pendWrite = aioP;
298 else {aioP->Recycle(); // rc must be < 0!
299 dataLen = 0;
300 }
301 return rc;
302 }
303
304// Complete the write operation
305//
306 if (!CopyL2F(aioP)) return 0;
307
308 } while(inFlight);
309
310// If we finished successfully, send off final response otherwise its an error.
311//
312 if (!isDone)
313 {if (!dataLen) return SendDone();
314 SendError(EIDRM, "pgWrite encountered an impossible condition");
315 eLog.Emsg("PgrwAio", "pgWrite logic error for",
316 dataLink->ID, dataFile->FileKey);
317 }
318
319// Cleanup as we don't know where we will return
320//
321 return 0;
322}
323
324/******************************************************************************/
325
326bool XrdXrootdPgrwAio::CopyL2F(XrdXrootdAioBuff *bP)
327{
328
329// Verify the checksums. Upon success, write out the data.
330//
331 if (VerCks(bP->pgrwP))
332 {int rc = dataFile->XrdSfsp->pgWrite((XrdSfsAio *)bP);
333 if (rc != SFS_OK) {SendFSError(rc); bP->Recycle();}
334 else {inFlight++;
335 TRACEP(FSAIO, "pgwr beg " <<bP->sfsAio.aio_nbytes <<'@'
336 <<bP->sfsAio.aio_offset
337 <<" inF=" <<int(inFlight));
338 return true;
339 }
340 }
341 return false;
342}
343
344/******************************************************************************/
345/* D o I t */
346/******************************************************************************/
347
348// This method is invoked when we have run out of aio objects but have inflight
349// objects during reading. In that case, we must relinquish the thread. When an
350// aio object completes it will reschedule this object on a new thread.
351
353{
354// Reads run disconnected as they will never read from the link.
355//
356 if (aioState & aioRead) CopyF2L();
357}
358
359/******************************************************************************/
360/* R e a d */
361/******************************************************************************/
362
363void XrdXrootdPgrwAio::Read(long long offs, int dlen)
364{
365
366// Setup the copy from the file to the network
367//
368 dataOffset = highOffset = offs;
369 dataLen = dlen;
371
372// Reads run disconnected and are self-terminating, so we need to inclreas the
373// refcount for the link we will be using to prevent it from disaapearing.
374// Recycle will decrement it but does so only for reads. We always up the file
375// refcount and number of requests.
376//
377 dataLink->setRef(1);
378 dataFile->Ref(1);
379 Protocol->aioUpdReq(1);
380
381// Schedule ourselves to run this asynchronously and return
382//
383 dataFile->aioFob->Schedule(this);
384}
385
386/******************************************************************************/
387/* R e c y c l e */
388/******************************************************************************/
389
391{
392// Update request count, file and link reference count
393//
394 if (!(aioState & aioHeld))
395 {Protocol->aioUpdReq(-1);
396 if (aioState & aioRead)
397 {dataLink->setRef(-1);
398 dataFile->Ref(-1);
399 }
400 aioState |= aioHeld;
401 }
402
403// Do some traceing
404//
405 TRACEP(FSAIO,"pgrw recycle "<<(release ? "" : "hold ")
406 <<(aioState & aioRead ? 'R' : 'W')<<" D-S="
407 <<isDone<<'-'<<int(Status));
408
409// Place the object on the free queue if possible
410//
411 if (release)
412 {fqMutex.Lock();
413 if (numFree >= maxKeep)
414 {fqMutex.UnLock();
415 delete this;
416 } else {
417 nextPgrw = fqFirst;
418 fqFirst = this;
419 numFree++;
420 fqMutex.UnLock();
421 }
422 }
423}
424
425/******************************************************************************/
426/* Private: S e n d D a t a */
427/******************************************************************************/
428
429bool XrdXrootdPgrwAio::SendData(XrdXrootdAioBuff *bP, bool final)
430{
431 static const int infoLen = sizeof(kXR_int64);
432 struct pgReadResponse
434 kXR_int64 ofs;
435 } pgrResp;
436 int rc;
437
438// Preinitialize the header
439//
440 pgrResp.rsp.bdy.requestid = kXR_pgread - kXR_1stRequest;
441 pgrResp.rsp.bdy.resptype = (final ? XrdProto::kXR_FinalResult
443 memset(pgrResp.rsp.bdy.reserved, 0, sizeof(pgrResp.rsp.bdy.reserved));
444
445// Send the data; we might not have any (typically in a final response)
446//
447 if (bP)
448 {int iovLen, iovNum;
449 struct iovec *ioVec = bP->pgrwP->iov4Send(iovNum, iovLen, true);
450 pgrResp.ofs = htonll(bP->sfsAio.aio_offset);
451// char trBuff[512];
452// snprintf(trBuff, sizeof(trBuff), "Aio PGR: %d@%ld (%ld)\n",
453// iovLen, bP->sfsAio.aio_offset, (bP->sfsAio.aio_offset>>12));
454// std::cerr<<trBuff<<std::flush;
455 rc = Response.Send(pgrResp.rsp, infoLen, ioVec, iovNum, iovLen);
456 } else {
457 pgrResp.rsp.bdy.dlen = 0;
458 pgrResp.ofs = htonll(dataOffset);
459 rc = Response.Send(pgrResp.rsp, infoLen);
460 }
461
462// Diagnose any errors
463//
464 if (rc || final)
465 {isDone = true;
466 dataLen = 0;
467 if (rc) aioState |= aioDead;
468 }
469 return rc == 0;
470}
471
472/******************************************************************************/
473/* Private: S e n d D o n e */
474/******************************************************************************/
475
476int XrdXrootdPgrwAio::SendDone()
477{
478 static const int infoLen = sizeof(kXR_int64);
479 struct {ServerResponseStatus rsp;
480 ServerResponseBody_pgWrite info; // info.offset
481 } pgwResp;
482 char *buff;
483 int n, rc;
484
485// Preinitialize the header
486//
487 pgwResp.rsp.bdy.requestid = kXR_pgwrite - kXR_1stRequest;
488 pgwResp.rsp.bdy.resptype = XrdProto::kXR_FinalResult;
489 pgwResp.info.offset = htonll(highOffset);
490 memset(pgwResp.rsp.bdy.reserved, 0, sizeof(pgwResp.rsp.bdy.reserved));
491
492// Get any checksum correction information we should turn
493//
494 buff = badCSP->boInfo(n);
495
496// Send the final response
497//
498 if ((rc = Response.Send(pgwResp.rsp, infoLen, buff, n))) dataLen = 0;
499 isDone = true;
500 if (rc) aioState |= aioDead;
501 return rc;
502}
503
504/******************************************************************************/
505/* V e r C k s */
506/******************************************************************************/
507
508bool XrdXrootdPgrwAio::VerCks(XrdXrootdAioPgrw *aioP)
509{
510 off_t dOffset = aioP->sfsAio.aio_offset;
511 uint32_t *csVec, *csVP, csVal;
512 int ioVNum, dLen;
513
514// Get the iovec information as this will drive the checksum
515//
516 struct iovec *ioV = aioP->iov4Data(ioVNum);
517 csVP = csVec = (uint32_t*)ioV[0].iov_base;
518
519// Verify each page or page segment
520//
521 for (int i = 1; i < ioVNum; i +=2)
522 {dLen = ioV[i].iov_len;
523 csVal = ntohl(*csVP); *csVP++ = csVal;
524 if (!XrdOucCRC::Ver32C(ioV[i].iov_base, dLen, csVal))
525 {const char *eMsg = badCSP->boAdd(dataFile, dOffset, dLen);
526 if (eMsg) {SendError(ETOOMANYREFS, eMsg);
527 aioP->Recycle();
528 return false;
529 }
530 }
531 dOffset += dLen;
532 }
533
534// All done, while we may have checksum error there is nothing we can do about
535// it and it's up to the client to send corrections.
536//
537 return true;
538}
539
540/******************************************************************************/
541/* W r i t e */
542/******************************************************************************/
543
544int XrdXrootdPgrwAio::Write(long long offs, int dlen)
545{
546
547// Update request count. Note that dataLink and dataFile references are
548// handled outboard as writes are inextricably tied to the data link.
549//
550 Protocol->aioUpdReq(1);
551
552// Setup the copy from the network to the file
553//
555 dataOffset = highOffset = offs;
556 dataLen = dlen;
557
558// Since this thread can't do anything else since it's blocked by the socket
559// we simply initiate the write operation via a simulated getData() callback.
560//
561 return gdDone();
562}
@ kXR_1stRequest
Definition XProtocol.hh:111
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_pgwrite
Definition XProtocol.hh:138
long long kXR_int64
Definition XPtypes.hh:98
XrdOucTrace * XrdXrootdTrace
#define eMsg(x)
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
#define SFS_OK
#define TRACEP(act, x)
static bool Ver32C(const void *data, size_t count, const uint32_t csval, uint32_t *csbad=0)
Definition XrdOucCRC.cc:222
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
uint32_t * cksVec
Definition XrdSfsAio.hh:63
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void Recycle()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
virtual void Recycle() override
XrdXrootdAioPgrw *const pgrwP
int Setup2Send(off_t offs, int dlen, const char *&eMsg)
struct iovec * iov4Send(int &iovNum, int &iovLen, bool cs2net=false)
struct iovec * iov4Data(int &iovNum)
struct iovec * iov4Recv(int &iovNum)
void Recycle() override
bool noChkSums(bool reset=true)
static XrdXrootdAioPgrw * Alloc(XrdXrootdAioTask *arp)
int Setup2Recv(off_t offs, int dlen, const char *&eMsg)
static const int aioPage
int gdDone() override
XrdXrootdFile * dataFile
bool Validate(XrdXrootdAioBuff *aioP)
XrdXrootdAioBuff * getBuff(bool wait)
void SendError(int rc, const char *eText)
XrdXrootdResponse Response
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void SendFSError(int rc)
static const int aioRead
static const int aioSchd
static const int aioHeld
RAtomic_uchar inFlight
static const int aioDead
XrdXrootdProtocol * Protocol
void DoIt() override
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
void Recycle(bool release) override
@ kXR_PartialResult
@ kXR_FinalResult
XrdScheduler * Sched
XrdSysError eLog