Windows: Kill AFS_LARGEFILES preprocessor symbol
[openafs.git] / src / WINNT / afsd / cm_dcache.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <afs/stds.h>
12
13 #include <windows.h>
14 #include <winsock2.h>
15 #include <nb30.h>
16 #ifdef COMMENT
17 #include <malloc.h>
18 #endif
19 #include <string.h>
20 #include <stdlib.h>
21 #include <osi.h>
22
23 #include "afsd.h"
24
25 #ifdef DEBUG
26 extern void afsi_log(char *pattern, ...);
27 #endif
28
29 #ifdef AFS_FREELANCE_CLIENT
30 extern osi_mutex_t cm_Freelance_Lock;
31 #endif
32
33 /* we can access connp->serverp without holding a lock because that
34    never changes since the connection is made. */
35 #define SERVERHAS64BIT(connp) (!((connp)->serverp->flags & CM_SERVERFLAG_NO64BIT))
36 #define SET_SERVERHASNO64BIT(connp) (cm_SetServerNo64Bit((connp)->serverp, TRUE))
37
38 /* functions called back from the buffer package when reading or writing data,
39  * or when holding or releasing a vnode pointer.
40  */
41 long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
42                  cm_user_t *userp, cm_req_t *reqp)
43 {
44     /* store the data back from this buffer; the buffer is locked and held,
45      * but the vnode involved isn't locked, yet.  It is held by its
46      * reference from the buffer, which won't change until the buffer is
47      * released by our caller.  Thus, we don't have to worry about holding
48      * bufp->scp.
49      */
50     long code, code1;
51     cm_scache_t *scp = vscp;
52     afs_int32 nbytes;
53     afs_int32 save_nbytes;
54     long temp;
55     AFSFetchStatus outStatus;
56     AFSStoreStatus inStatus;
57     osi_hyper_t thyper;
58     AFSVolSync volSync;
59     AFSFid tfid;
60     struct rx_call *rxcallp;
61     struct rx_connection *rxconnp;
62     osi_queueData_t *qdp;
63     cm_buf_t *bufp;
64     afs_uint32 wbytes;
65     char *bufferp;
66     cm_conn_t *connp;
67     osi_hyper_t truncPos;
68     cm_bulkIO_t biod;           /* bulk IO descriptor */
69     int require_64bit_ops = 0;
70     int call_was_64bit = 0;
71     int scp_locked = flags & CM_BUF_WRITE_SCP_LOCKED;
72
73     osi_assertx(userp != NULL, "null cm_user_t");
74     osi_assertx(scp != NULL, "null cm_scache_t");
75
76     memset(&volSync, 0, sizeof(volSync));
77
78     /* now, the buffer may or may not be filled with good data (buf_GetNew
79      * drops lots of locks, and may indeed return a properly initialized
80      * buffer, although more likely it will just return a new, empty, buffer.
81      */
82     if (!scp_locked)
83         lock_ObtainWrite(&scp->rw);
84     if (scp->flags & CM_SCACHEFLAG_DELETED) {
85         if (!scp_locked)
86             lock_ReleaseWrite(&scp->rw);
87         return CM_ERROR_NOSUCHFILE;
88     }
89
90     cm_AFSFidFromFid(&tfid, &scp->fid);
91
92     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
93     (void) cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL);
94
95     code = cm_SetupStoreBIOD(scp, offsetp, length, &biod, userp, reqp);
96     if (code) {
97         osi_Log1(afsd_logp, "cm_SetupStoreBIOD code %x", code);
98         cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
99         if (!scp_locked)
100             lock_ReleaseWrite(&scp->rw);
101         return code;
102     }
103
104     if (biod.length == 0) {
105         osi_Log0(afsd_logp, "cm_SetupStoreBIOD length 0");
106         cm_ReleaseBIOD(&biod, 1, 0, 1); /* should be a NOOP */
107         cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
108         if (!scp_locked)
109             lock_ReleaseWrite(&scp->rw);
110         return 0;
111     }
112
113     /* prepare the output status for the store */
114     scp->mask |= CM_SCACHEMASK_CLIENTMODTIME;
115     cm_StatusFromAttr(&inStatus, scp, NULL);
116     truncPos = scp->length;
117     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
118         && LargeIntegerLessThan(scp->truncPos, truncPos))
119         truncPos = scp->truncPos;
120         scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
121                 
122     /* compute how many bytes to write from this buffer */
123     thyper = LargeIntegerSubtract(scp->length, biod.offset);
124     if (LargeIntegerLessThanZero(thyper)) {
125         /* entire buffer is past EOF */
126         nbytes = 0;
127     }
128     else {
129         /* otherwise write out part of buffer before EOF, but not
130          * more than bufferSize bytes.
131          */
132         if (LargeIntegerGreaterThan(thyper,
133                                     ConvertLongToLargeInteger(biod.length))) {
134             nbytes = biod.length;
135         } else {
136             /* if thyper is less than or equal to biod.length, then we
137                can safely assume that the value fits in a long. */
138             nbytes = thyper.LowPart;
139         }
140     }
141
142     if (LargeIntegerGreaterThan(LargeIntegerAdd(biod.offset,
143                                                  ConvertLongToLargeInteger(nbytes)),
144                                  ConvertLongToLargeInteger(LONG_MAX)) ||
145          LargeIntegerGreaterThan(truncPos,
146                                  ConvertLongToLargeInteger(LONG_MAX))) {
147         require_64bit_ops = 1;
148     }
149         
150     lock_ReleaseWrite(&scp->rw);
151
152     /* now we're ready to do the store operation */
153     save_nbytes = nbytes;
154     do {
155         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
156         if (code) 
157             continue;
158
159     retry:
160         rxconnp = cm_GetRxConn(connp);
161         rxcallp = rx_NewCall(rxconnp);
162         rx_PutConnection(rxconnp);
163
164         if (SERVERHAS64BIT(connp)) {
165             call_was_64bit = 1;
166
167             osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
168                      scp, biod.offset.HighPart, biod.offset.LowPart, nbytes);
169
170             code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
171                                           biod.offset.QuadPart,
172                                           nbytes,
173                                           truncPos.QuadPart);
174             if (code)
175                 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
176             else
177                 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
178         } else {
179             call_was_64bit = 0;
180
181             if (require_64bit_ops) {
182                 osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData.  The operation requires large file support in the server.");
183                 code = CM_ERROR_TOOBIG;
184             } else {
185                 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
186                          scp, biod.offset.HighPart, biod.offset.LowPart, nbytes);
187
188                 code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
189                                             biod.offset.LowPart, nbytes, truncPos.LowPart);
190                 if (code)
191                     osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
192                 else
193                     osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
194             }
195         }
196
197         if (code == 0) {
198             /* write the data from the the list of buffers */
199             qdp = NULL;
200             while(nbytes > 0) {
201                 afs_uint32 buf_offset;
202                 if (qdp == NULL) {
203                     qdp = biod.bufListEndp;
204                     buf_offset = offsetp->LowPart % cm_data.buf_blockSize;
205                 } else {
206                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
207                     buf_offset = 0;
208                 }
209                 osi_assertx(qdp != NULL, "null osi_queueData_t");
210                 bufp = osi_GetQData(qdp);
211                 bufferp = bufp->datap + buf_offset;
212                 wbytes = nbytes;
213                 if (wbytes > cm_data.buf_blockSize - buf_offset)
214                     wbytes = cm_data.buf_blockSize - buf_offset;
215
216                 /* write out wbytes of data from bufferp */
217                 temp = rx_Write(rxcallp, bufferp, wbytes);
218                 if (temp != wbytes) {
219                     osi_Log3(afsd_logp, "rx_Write failed bp 0x%p, %d != %d",bufp,temp,wbytes);
220                     code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR;
221                     break;
222                 } else {
223                     osi_Log2(afsd_logp, "rx_Write succeeded bp 0x%p, %d",bufp,temp);
224                 }       
225                 nbytes -= wbytes;
226             }   /* while more bytes to write */
227         }       /* if RPC started successfully */
228
229         if (code == 0) {
230             if (call_was_64bit) {
231                 code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
232                 if (code)
233                     osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
234                 else
235                     osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
236             } else {
237                 code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
238                 if (code)
239                     osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
240                 else
241                     osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
242             }
243         }
244
245         code1 = rx_EndCall(rxcallp, code);
246
247         if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
248             SET_SERVERHASNO64BIT(connp);
249             qdp = NULL;
250             nbytes = save_nbytes;
251             goto retry;
252         }
253
254         /* Prefer StoreData error over rx_EndCall error */
255         if (code1 != 0)
256             code = code1;
257     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
258
259     code = cm_MapRPCError(code, reqp);
260
261     if (code)
262         osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
263     else
264         osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
265
266     /* now, clean up our state */
267     lock_ObtainWrite(&scp->rw);
268
269     cm_ReleaseBIOD(&biod, 1, code, 1);
270     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
271
272     if (code == 0) {
273         osi_hyper_t t;
274         /* now, here's something a little tricky: in AFS 3, a dirty
275          * length can't be directly stored, instead, a dirty chunk is
276          * stored that sets the file's size (by writing and by using
277          * the truncate-first option in the store call).
278          *
279          * At this point, we've just finished a store, and so the trunc
280          * pos field is clean.  If the file's size at the server is at
281          * least as big as we think it should be, then we turn off the
282          * length dirty bit, since all the other dirty buffers must
283          * precede this one in the file.
284          *
285          * The file's desired size shouldn't be smaller than what's
286          * stored at the server now, since we just did the trunc pos
287          * store.
288          *
289          * We have to turn off the length dirty bit as soon as we can,
290          * so that we see updates made by other machines.
291          */
292
293         if (call_was_64bit) {
294             t.LowPart = outStatus.Length;
295             t.HighPart = outStatus.Length_hi;
296         } else {
297             t = ConvertLongToLargeInteger(outStatus.Length);
298         }
299
300         if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
301             scp->mask &= ~CM_SCACHEMASK_LENGTH;
302
303         cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp, CM_MERGEFLAG_STOREDATA);
304     } else {
305         if (code == CM_ERROR_SPACE)
306             scp->flags |= CM_SCACHEFLAG_OUTOFSPACE;
307         else if (code == CM_ERROR_QUOTA)
308             scp->flags |= CM_SCACHEFLAG_OVERQUOTA;
309     }
310     if (!scp_locked)
311         lock_ReleaseWrite(&scp->rw);
312
313     return code;
314 }
315
316 /*
317  * Truncate the file, by sending a StoreData RPC with zero length.
318  *
319  * Called with scp locked.  Releases and re-obtains the lock.
320  */
321 long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
322 {
323     AFSFetchStatus outStatus;
324     AFSStoreStatus inStatus;
325     AFSVolSync volSync;
326     AFSFid tfid;
327     long code, code1;
328     osi_hyper_t truncPos;
329     cm_conn_t *connp;
330     struct rx_call *rxcallp;
331     struct rx_connection *rxconnp;
332     int require_64bit_ops = 0;
333     int call_was_64bit = 0;
334
335     memset(&volSync, 0, sizeof(volSync));
336
337     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
338     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
339                      CM_SCACHESYNC_STOREDATA_EXCL);
340
341     /* prepare the output status for the store */
342     inStatus.Mask = AFS_SETMODTIME;
343     inStatus.ClientModTime = scp->clientModTime;
344     scp->mask &= ~CM_SCACHEMASK_CLIENTMODTIME;
345
346     /* calculate truncation position */
347     truncPos = scp->length;
348     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
349         && LargeIntegerLessThan(scp->truncPos, truncPos))
350         truncPos = scp->truncPos;
351     scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
352
353     if (LargeIntegerGreaterThan(truncPos,
354                                 ConvertLongToLargeInteger(LONG_MAX))) {
355
356         require_64bit_ops = 1;
357     }
358
359     lock_ReleaseWrite(&scp->rw);
360
361     cm_AFSFidFromFid(&tfid, &scp->fid);
362
363     /* now we're ready to do the store operation */
364     do {
365         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
366         if (code) 
367             continue;
368
369     retry:      
370         rxconnp = cm_GetRxConn(connp);
371         rxcallp = rx_NewCall(rxconnp);
372         rx_PutConnection(rxconnp);
373
374         if (SERVERHAS64BIT(connp)) {
375             call_was_64bit = 1;
376
377             code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
378                                           0, 0, truncPos.QuadPart);
379         } else {
380             call_was_64bit = 0;
381
382             if (require_64bit_ops) {
383                 code = CM_ERROR_TOOBIG;
384             } else {
385                 code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
386                                             0, 0, truncPos.LowPart);
387             }
388         }
389
390         if (code == 0) {
391             if (call_was_64bit)
392                 code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
393             else
394                 code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
395         }
396         code1 = rx_EndCall(rxcallp, code);
397
398         if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
399             SET_SERVERHASNO64BIT(connp);
400             goto retry;
401         }
402
403         /* prefer StoreData error over rx_EndCall error */
404         if (code == 0 && code1 != 0)
405             code = code1;
406     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
407     code = cm_MapRPCError(code, reqp);
408         
409     /* now, clean up our state */
410     lock_ObtainWrite(&scp->rw);
411
412     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
413
414     if (code == 0) {
415         osi_hyper_t t;
416         /*
417          * For explanation of handling of CM_SCACHEMASK_LENGTH,
418          * see cm_BufWrite().
419          */
420         if (call_was_64bit) {
421             t.HighPart = outStatus.Length_hi;
422             t.LowPart = outStatus.Length;
423         } else {
424             t = ConvertLongToLargeInteger(outStatus.Length);
425         }
426
427         if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
428             scp->mask &= ~CM_SCACHEMASK_LENGTH;
429         cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp, CM_MERGEFLAG_STOREDATA);
430     }
431
432     return code;
433 }
434
435 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
436 {
437     *bytesReadp = 0;
438
439     /* now return a code that means that I/O is done */
440     return 0;
441 }
442
443 /*
444  * stabilize scache entry with CM_SCACHESYNC_SETSIZE.  This prevents any new
445  * data buffers to be allocated, new data to be fetched from the file server,
446  * and writes to be accepted from the application but permits dirty buffers
447  * to be written to the file server.
448  *
449  * Stabilize uses cm_SyncOp to maintain the cm_scache_t in this stable state
450  * instead of holding the rwlock exclusively.  This permits background stores
451  * to be performed in parallel and in particular allow FlushFile to be
452  * implemented without violating the locking hierarchy.
453  */
454 long cm_BufStabilize(void *vscp, cm_user_t *userp, cm_req_t *reqp)
455 {
456     cm_scache_t *scp = vscp;
457     long code;
458
459     lock_ObtainWrite(&scp->rw);
460     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
461                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
462     lock_ReleaseWrite(&scp->rw);
463
464     return code;
465 }
466
467 /* undoes the work that cm_BufStabilize does: releases lock so things can change again */
468 long cm_BufUnstabilize(void *vscp, cm_user_t *userp)
469 {
470     cm_scache_t *scp = vscp;
471         
472     lock_ObtainWrite(&scp->rw);
473     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
474
475     lock_ReleaseWrite(&scp->rw);
476         
477     /* always succeeds */
478     return 0;
479 }
480
481 cm_buf_ops_t cm_bufOps = {
482     cm_BufWrite,
483     cm_BufRead,
484     cm_BufStabilize,
485     cm_BufUnstabilize
486 };
487
488 long cm_ValidateDCache(void)
489 {
490     return buf_ValidateBuffers();
491 }
492
493 long cm_ShutdownDCache(void)
494 {
495     return 0;
496 }
497
498 int cm_InitDCache(int newFile, long chunkSize, afs_uint64 nbuffers)
499 {
500     return buf_Init(newFile, &cm_bufOps, nbuffers);
501 }
502
503 /* check to see if we have an up-to-date buffer.  The buffer must have
504  * previously been obtained by calling buf_Get.
505  *
506  * Make sure we have a callback, and that the dataversion matches.
507  *
508  * Scp must be locked.
509  *
510  * Bufp *may* be locked.
511  */
512 int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
513 {
514     int code;
515     if (!cm_HaveCallback(scp))
516         return 0;
517     if ((bufp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED)) == (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
518         return 1;
519     if (bufp->dataVersion <= scp->dataVersion && bufp->dataVersion >= scp->bufDataVersionLow)
520         return 1;
521     if (bufp->offset.QuadPart >= scp->serverLength.QuadPart)
522         return 1;
523     if (!isBufLocked) {
524         code = lock_TryMutex(&bufp->mx);
525         if (code == 0) {
526             /* don't have the lock, and can't lock it, then
527              * return failure.
528              */
529             return 0;
530         }
531     }
532
533     /* remember dirty flag for later */
534     code = bufp->flags & CM_BUF_DIRTY;
535
536     /* release lock if we obtained it here */
537     if (!isBufLocked) 
538         lock_ReleaseMutex(&bufp->mx);
539
540     /* if buffer was dirty, buffer is acceptable for use */
541     if (code) 
542         return 1;
543     else 
544         return 0;
545 }
546
547 /*
548  * used when deciding whether to do a background fetch or not.
549  * call with scp->rw write-locked.
550  */
551 afs_int32
552 cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, osi_hyper_t *length,
553                         cm_user_t *userp, cm_req_t *reqp, osi_hyper_t *realBasep)
554 {
555     osi_hyper_t tbase;
556     osi_hyper_t tlength;
557     osi_hyper_t tblocksize;
558     long code;
559     cm_buf_t *bp;
560     int stop;
561         
562     /* now scan all buffers in the range, looking for any that look like
563      * they need work.
564      */
565     tbase = *startBasep;
566     tlength = *length;
567     tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
568     stop = 0;
569     while (LargeIntegerGreaterThanZero(tlength)) {
570         /* get callback so we can do a meaningful dataVersion comparison */
571         code = cm_SyncOp(scp, NULL, userp, reqp, 0,
572                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
573         if (code)
574             return code;
575                 
576         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
577             /* we're past the end of file */
578             break;
579         }
580
581         bp = buf_Find(scp, &tbase);
582         /* We cheat slightly by not locking the bp mutex. */
583         if (bp) {
584             if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING | CM_BUF_CMBKGFETCH)) == 0
585                  && (bp->dataVersion < scp->bufDataVersionLow || bp->dataVersion > scp->dataVersion))
586                 stop = 1;
587             buf_Release(bp);
588             bp = NULL;
589         }
590         else 
591             stop = 1;
592
593         /* if this buffer is essentially guaranteed to require a fetch,
594          * break out here and return this position.
595          */
596         if (stop) 
597             break;
598                 
599         tbase = LargeIntegerAdd(tbase, tblocksize);
600         tlength = LargeIntegerSubtract(tlength,  tblocksize);
601     }
602         
603     /* if we get here, either everything is fine or 'stop' stopped us at a
604      * particular buffer in the range that definitely needs to be fetched.
605      */
606     if (stop == 0) {
607         /* return non-zero code since realBasep won't be valid */
608         code = -1;
609     }   
610     else {
611         /* successfully found a page that will need fetching */
612         *realBasep = tbase;
613         code = 0;
614     }
615     return code;
616 }
617
618 afs_int32
619 cm_BkgStore(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
620             cm_user_t *userp)
621 {
622     osi_hyper_t toffset;
623     long length;
624     cm_req_t req;
625     long code = 0;
626
627     if (scp->flags & CM_SCACHEFLAG_DELETED) {
628         osi_Log4(afsd_logp, "Skipping BKG store - Deleted scp 0x%p, offset 0x%x:%08x, length 0x%x", scp, p2, p1, p3);
629     } else {
630         cm_InitReq(&req);
631
632         /* Retries will be performed by the BkgDaemon thread if appropriate */
633         req.flags |= CM_REQ_NORETRY;
634
635         toffset.LowPart = p1;
636         toffset.HighPart = p2;
637         length = p3;
638
639         osi_Log4(afsd_logp, "Starting BKG store scp 0x%p, offset 0x%x:%08x, length 0x%x", scp, p2, p1, p3);
640
641         code = cm_BufWrite(scp, &toffset, length, /* flags */ 0, userp, &req);
642
643         osi_Log4(afsd_logp, "Finished BKG store scp 0x%p, offset 0x%x:%08x, code 0x%x", scp, p2, p1, code);
644     }
645
646     /* 
647      * Keep the following list synchronized with the
648      * error code list in cm_BkgDaemon 
649      */
650     switch ( code ) {
651     case CM_ERROR_TIMEDOUT: /* or server restarting */
652     case CM_ERROR_RETRY:
653     case CM_ERROR_WOULDBLOCK:
654     case CM_ERROR_ALLBUSY:
655     case CM_ERROR_ALLDOWN:
656     case CM_ERROR_ALLOFFLINE:
657     case CM_ERROR_PARTIALWRITE:
658         break;  /* cm_BkgDaemon will re-insert the request in the queue */
659     case 0:
660     default:
661         lock_ObtainWrite(&scp->rw);
662         cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
663         lock_ReleaseWrite(&scp->rw);
664     }
665     return code;
666 }
667
668 /* Called with scp locked */
669 void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base, osi_hyper_t *length)
670 {
671     osi_hyper_t end;
672
673     if (code == 0) {
674         end =  LargeIntegerAdd(*base, *length);
675         if (LargeIntegerGreaterThan(*base, scp->prefetch.base))
676             scp->prefetch.base = *base;
677         if (LargeIntegerGreaterThan(end, scp->prefetch.end))
678             scp->prefetch.end = end;
679     }
680     scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
681 }
682
683 /* do the prefetch.  if the prefetch fails, return 0 (success)
684  * because there is no harm done.  */
685 afs_int32
686 cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
687                cm_user_t *userp)
688 {
689     osi_hyper_t length;
690     osi_hyper_t base;
691     osi_hyper_t offset;
692     osi_hyper_t end;
693     osi_hyper_t fetched;
694     osi_hyper_t tblocksize;
695     afs_int32 code;
696     int rxheld = 0;
697     cm_buf_t *bp = NULL;
698     cm_req_t req;
699
700     cm_InitReq(&req);
701
702     /* Retries will be performed by the BkgDaemon thread if appropriate */
703     req.flags |= CM_REQ_NORETRY;
704         
705     fetched.LowPart = 0;
706     fetched.HighPart = 0;
707     tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
708     base.LowPart = p1;
709     base.HighPart = p2;
710     length.LowPart = p3;
711     length.HighPart = p4;
712
713     end = LargeIntegerAdd(base, length);
714         
715     osi_Log5(afsd_logp, "Starting BKG prefetch scp 0x%p offset 0x%x:%x length 0x%x:%x",
716              scp, p2, p1, p4, p3);
717
718     for ( code = 0, offset = base;
719           code == 0 && LargeIntegerLessThan(offset, end); 
720           offset = LargeIntegerAdd(offset, tblocksize) )
721     {
722         if (rxheld) {
723             lock_ReleaseWrite(&scp->rw);
724             rxheld = 0;
725         }
726
727         code = buf_Get(scp, &offset, &req, &bp);
728         if (code)
729             break;
730
731         if (bp->cmFlags & CM_BUF_CMFETCHING) {
732             /* skip this buffer as another thread is already fetching it */
733             if (!rxheld) {
734                 lock_ObtainWrite(&scp->rw);
735                 rxheld = 1;
736             }
737             bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
738             buf_Release(bp);
739             bp = NULL;
740             continue;
741         }
742
743         if (!rxheld) {
744             lock_ObtainWrite(&scp->rw);
745             rxheld = 1;
746         }
747
748         code = cm_GetBuffer(scp, bp, NULL, userp, &req);
749         if (code == 0)
750             fetched = LargeIntegerAdd(fetched, tblocksize); 
751         buf_Release(bp);
752         bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
753     }
754     
755     if (!rxheld) {
756         lock_ObtainWrite(&scp->rw);
757         rxheld = 1;
758     }
759
760     /* Clear flag from any remaining buffers */
761     for ( ;
762           LargeIntegerLessThan(offset, end);
763           offset = LargeIntegerAdd(offset, tblocksize) )
764     {
765         bp = buf_Find(scp, &offset);
766         if (bp) {
767             bp->cmFlags &= ~CM_BUF_CMBKGFETCH;
768             buf_Release(bp);
769         }
770     }
771     cm_ClearPrefetchFlag(LargeIntegerGreaterThanZero(fetched) ? 0 : code, 
772                          scp, &base, &fetched);
773
774     /* wakeup anyone who is waiting */
775     if (scp->flags & CM_SCACHEFLAG_WAITING) {
776         osi_Log1(afsd_logp, "CM BkgPrefetch Waking scp 0x%p", scp);
777         osi_Wakeup((LONG_PTR) &scp->flags);
778     }
779     lock_ReleaseWrite(&scp->rw);
780
781     osi_Log4(afsd_logp, "Ending BKG prefetch scp 0x%p code 0x%x fetched 0x%x:%x",
782              scp, code, fetched.HighPart, fetched.LowPart);
783     return code;
784 }
785
786 /* a read was issued to offsetp, and we have to determine whether we should
787  * do a prefetch of the next chunk.
788  */
789 void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp, afs_uint32 count,
790                          cm_user_t *userp, cm_req_t *reqp)
791 {
792     long code;
793     int  rwheld = 0;
794     osi_hyper_t realBase;
795     osi_hyper_t readBase;
796     osi_hyper_t readLength;
797     osi_hyper_t readEnd;
798     osi_hyper_t offset;
799     osi_hyper_t tblocksize;             /* a long long temp variable */
800     cm_buf_t    *bp;
801
802     tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
803         
804     readBase = *offsetp;
805     /* round up to chunk boundary */
806     readBase.LowPart += (cm_chunkSize-1);
807     readBase.LowPart &= (-cm_chunkSize);
808
809     readLength = ConvertLongToLargeInteger(count);
810
811     lock_ObtainWrite(&scp->rw);
812     rwheld = 1;
813     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
814          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
815         lock_ReleaseWrite(&scp->rw);
816         return;
817     }
818     scp->flags |= CM_SCACHEFLAG_PREFETCHING;
819
820     /* start the scan at the latter of the end of this read or
821      * the end of the last fetched region.
822      */
823     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
824         readBase = scp->prefetch.end;
825
826     code = cm_CheckFetchRange(scp, &readBase, &readLength, userp, reqp,
827                               &realBase);
828     if (code) {
829         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
830         lock_ReleaseWrite(&scp->rw);
831         return; /* can't find something to prefetch */
832     }
833
834     readEnd = LargeIntegerAdd(realBase, readLength);
835
836     /*
837      * Mark each buffer in the range as queued for a
838      * background fetch
839      */
840     for ( offset = realBase;
841           LargeIntegerLessThan(offset, readEnd);
842           offset = LargeIntegerAdd(offset, tblocksize) )
843     {
844         if (rwheld) {
845             lock_ReleaseWrite(&scp->rw);
846             rwheld = 0;
847         }
848
849         bp = buf_Find(scp, &offset);
850         if (!bp)
851             continue;
852
853         if (!rwheld) {
854             lock_ObtainWrite(&scp->rw);
855             rwheld = 1;
856         }
857
858         bp->cmFlags |= CM_BUF_CMBKGFETCH;
859         buf_Release(bp);
860     }
861
862     if (rwheld)
863         lock_ReleaseWrite(&scp->rw);
864
865     osi_Log2(afsd_logp, "BKG Prefetch request scp 0x%p, base 0x%x",
866              scp, realBase.LowPart);
867
868     cm_QueueBKGRequest(scp, cm_BkgPrefetch, 
869                        realBase.LowPart, realBase.HighPart, 
870                        readLength.LowPart, readLength.HighPart, 
871                        userp);
872 }
873
874 /* scp must be locked; temporarily unlocked during processing.
875  * If returns 0, returns buffers held in biop, and with
876  * CM_BUF_CMSTORING set.
877  *
878  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
879  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
880  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
881  * must be woken, and the event must be set when the I/O is done.  All of this
882  * is required so that buf_WaitIO synchronizes properly with the buffer as it
883  * is being written out.
884  */
885 long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
886                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
887 {
888     cm_buf_t *bufp;
889     osi_queueData_t *qdp;
890     osi_hyper_t thyper;
891     osi_hyper_t tbase;
892     osi_hyper_t scanStart;              /* where to start scan for dirty pages */
893     osi_hyper_t scanEnd;                /* where to stop scan for dirty pages */
894     osi_hyper_t firstModOffset; /* offset of first modified page in range */
895     long temp;
896     long code;
897     long flags;                 /* flags to cm_SyncOp */
898         
899     /* clear things out */
900     biop->scp = scp;                    /* do not hold; held by caller */
901     biop->offset = *inOffsetp;
902     biop->length = 0;
903     biop->bufListp = NULL;
904     biop->bufListEndp = NULL;
905     biop->reserved = 0;
906
907     /* reserve a chunk's worth of buffers */
908     lock_ReleaseWrite(&scp->rw);
909     buf_ReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
910     lock_ObtainWrite(&scp->rw);
911
912     bufp = NULL;
913     for (temp = 0; temp < inSize; temp += cm_data.buf_blockSize) {
914         thyper = ConvertLongToLargeInteger(temp);
915         tbase = LargeIntegerAdd(*inOffsetp, thyper);
916
917         bufp = buf_Find(scp, &tbase);
918         if (bufp) {
919             /* get buffer mutex and scp mutex safely */
920             lock_ReleaseWrite(&scp->rw);
921             lock_ObtainMutex(&bufp->mx);
922
923             /*
924              * if the buffer is actively involved in I/O
925              * we wait for the I/O to complete.
926              */
927             if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
928                 buf_WaitIO(scp, bufp);
929
930             lock_ObtainWrite(&scp->rw);
931             flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
932             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
933             if (code) {
934                 lock_ReleaseMutex(&bufp->mx);
935                 buf_Release(bufp);
936                 bufp = NULL;
937                 buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
938                 return code;
939             }   
940                         
941             /* if the buffer is dirty, we're done */
942             if (bufp->flags & CM_BUF_DIRTY) {
943                 osi_assertx(!(bufp->flags & CM_BUF_WRITING),
944                             "WRITING w/o CMSTORING in SetupStoreBIOD");
945                 bufp->flags |= CM_BUF_WRITING;
946                 break;
947             }
948
949             /* this buffer is clean, so there's no reason to process it */
950             cm_SyncOpDone(scp, bufp, flags);
951             lock_ReleaseMutex(&bufp->mx);
952             buf_Release(bufp);
953             bufp = NULL;
954         }       
955     }
956
957     biop->reserved = 1;
958         
959     /* if we get here, if bufp is null, we didn't find any dirty buffers
960      * that weren't already being stored back, so we just quit now.
961      */
962     if (!bufp) {
963         return 0;
964     }
965
966     /* don't need buffer mutex any more */
967     lock_ReleaseMutex(&bufp->mx);
968         
969     /* put this element in the list */
970     qdp = osi_QDAlloc();
971     osi_SetQData(qdp, bufp);
972     /* don't have to hold bufp, since held by buf_Find above */
973     osi_QAddH((osi_queue_t **) &biop->bufListp,
974               (osi_queue_t **) &biop->bufListEndp,
975               &qdp->q);
976     biop->length = cm_data.buf_blockSize;
977     firstModOffset = bufp->offset;
978     biop->offset = firstModOffset;
979     bufp = NULL;        /* this buffer and reference added to the queue */
980
981     /* compute the window surrounding *inOffsetp of size cm_chunkSize */
982     scanStart = *inOffsetp;
983     scanStart.LowPart &= (-cm_chunkSize);
984     thyper = ConvertLongToLargeInteger(cm_chunkSize);
985     scanEnd = LargeIntegerAdd(scanStart, thyper);
986
987     flags = CM_SCACHESYNC_GETSTATUS
988         | CM_SCACHESYNC_STOREDATA
989         | CM_SCACHESYNC_BUFLOCKED
990         | CM_SCACHESYNC_NOWAIT;
991
992     /* start by looking backwards until scanStart */
993     /* hyper version of cm_data.buf_blockSize */
994     thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
995     tbase = LargeIntegerSubtract(firstModOffset, thyper);
996     while(LargeIntegerGreaterThanOrEqualTo(tbase, scanStart)) {
997         /* see if we can find the buffer */
998         bufp = buf_Find(scp, &tbase);
999         if (!bufp) 
1000             break;
1001
1002         /* try to lock it, and quit if we can't (simplifies locking) */
1003         lock_ReleaseWrite(&scp->rw);
1004         code = lock_TryMutex(&bufp->mx);
1005         lock_ObtainWrite(&scp->rw);
1006         if (code == 0) {
1007             buf_Release(bufp);
1008             bufp = NULL;
1009             break;
1010         }
1011                 
1012         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
1013         if (code) {
1014             lock_ReleaseMutex(&bufp->mx);
1015             buf_Release(bufp);
1016             bufp = NULL;
1017             break;
1018         }
1019                 
1020         if (!(bufp->flags & CM_BUF_DIRTY)) {
1021             /* buffer is clean, so we shouldn't add it */
1022             cm_SyncOpDone(scp, bufp, flags);
1023             lock_ReleaseMutex(&bufp->mx);
1024             buf_Release(bufp);
1025             bufp = NULL;
1026             break;
1027         }
1028
1029         /* don't need buffer mutex any more */
1030         lock_ReleaseMutex(&bufp->mx);
1031
1032         /* we have a dirty buffer ready for storing.  Add it to the tail
1033          * of the list, since it immediately precedes all of the disk
1034          * addresses we've already collected.
1035          */
1036         qdp = osi_QDAlloc();
1037         osi_SetQData(qdp, bufp);
1038         /* no buf_hold necessary, since we have it held from buf_Find */
1039         osi_QAddT((osi_queue_t **) &biop->bufListp,
1040                   (osi_queue_t **) &biop->bufListEndp,
1041                   &qdp->q);
1042         bufp = NULL;            /* added to the queue */
1043
1044         /* update biod info describing the transfer */
1045         biop->offset = LargeIntegerSubtract(biop->offset, thyper);
1046         biop->length += cm_data.buf_blockSize;
1047
1048         /* update loop pointer */
1049         tbase = LargeIntegerSubtract(tbase, thyper);
1050     }   /* while loop looking for pages preceding the one we found */
1051
1052     /* now, find later dirty, contiguous pages, and add them to the list */
1053     /* hyper version of cm_data.buf_blockSize */
1054     thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
1055     tbase = LargeIntegerAdd(firstModOffset, thyper);
1056     while(LargeIntegerLessThan(tbase, scanEnd)) {
1057         /* see if we can find the buffer */
1058         bufp = buf_Find(scp, &tbase);
1059         if (!bufp) 
1060             break;
1061
1062         /* try to lock it, and quit if we can't (simplifies locking) */
1063         lock_ReleaseWrite(&scp->rw);
1064         code = lock_TryMutex(&bufp->mx);
1065         lock_ObtainWrite(&scp->rw);
1066         if (code == 0) {
1067             buf_Release(bufp);
1068             bufp = NULL;
1069             break;
1070         }
1071
1072         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
1073         if (code) {
1074             lock_ReleaseMutex(&bufp->mx);
1075             buf_Release(bufp);
1076             bufp = NULL;
1077             break;
1078         }
1079                 
1080         if (!(bufp->flags & CM_BUF_DIRTY)) {
1081             /* buffer is clean, so we shouldn't add it */
1082             cm_SyncOpDone(scp, bufp, flags);
1083             lock_ReleaseMutex(&bufp->mx);
1084             buf_Release(bufp);
1085             bufp = NULL;
1086             break;
1087         }
1088
1089         /* don't need buffer mutex any more */
1090         lock_ReleaseMutex(&bufp->mx);
1091
1092         /* we have a dirty buffer ready for storing.  Add it to the head
1093          * of the list, since it immediately follows all of the disk
1094          * addresses we've already collected.
1095          */
1096         qdp = osi_QDAlloc();
1097         osi_SetQData(qdp, bufp);
1098         /* no buf_hold necessary, since we have it held from buf_Find */
1099         osi_QAddH((osi_queue_t **) &biop->bufListp,
1100                   (osi_queue_t **) &biop->bufListEndp,
1101                   &qdp->q);
1102         bufp = NULL;
1103
1104         /* update biod info describing the transfer */
1105         biop->length += cm_data.buf_blockSize;
1106                 
1107         /* update loop pointer */
1108         tbase = LargeIntegerAdd(tbase, thyper);
1109     }   /* while loop looking for pages following the first page we found */
1110         
1111     /* finally, we're done */
1112     return 0;
1113 }
1114
1115 /* scp must be locked; temporarily unlocked during processing.
1116  * If returns 0, returns buffers held in biop, and with
1117  * CM_BUF_CMFETCHING flags set.
1118  * If an error is returned, we don't return any buffers.
1119  */
1120 long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
1121                         cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
1122 {
1123     long code;
1124     cm_buf_t *tbp;
1125     osi_hyper_t tblocksize;             /* a long long temp variable */
1126     osi_hyper_t pageBase;               /* base offset we're looking at */
1127     osi_queueData_t *qdp;               /* one temp queue structure */
1128     osi_queueData_t *tqdp;              /* another temp queue structure */
1129     long collected;                     /* how many bytes have been collected */
1130     int isFirst;
1131     long flags;
1132     osi_hyper_t fileSize;               /* the # of bytes in the file */
1133     osi_queueData_t *heldBufListp;      /* we hold all buffers in this list */
1134     osi_queueData_t *heldBufListEndp;   /* first one */
1135     int reserving;
1136
1137     tblocksize = ConvertLongToLargeInteger(cm_data.buf_blockSize);
1138
1139     biop->scp = scp;                    /* do not hold; held by caller */
1140     biop->offset = *offsetp;
1141     /* null out the list of buffers */
1142     biop->bufListp = biop->bufListEndp = NULL;
1143     biop->reserved = 0;
1144
1145     /* first lookup the file's length, so we know when to stop */
1146     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
1147                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
1148     if (code) 
1149         return code;
1150         
1151     /* copy out size, since it may change */
1152     fileSize = scp->serverLength;
1153         
1154     lock_ReleaseWrite(&scp->rw);
1155
1156     pageBase = *offsetp;
1157     collected = pageBase.LowPart & (cm_chunkSize - 1);
1158     heldBufListp = NULL;
1159     heldBufListEndp = NULL;
1160
1161     /*
1162      * Obtaining buffers can cause dirty buffers to be recycled, which
1163      * can cause a storeback, so cannot be done while we have buffers
1164      * reserved.
1165      *
1166      * To get around this, we get buffers twice.  Before reserving buffers,
1167      * we obtain and release each one individually.  After reserving
1168      * buffers, we try to obtain them again, but only by lookup, not by
1169      * recycling.  If a buffer has gone away while we were waiting for
1170      * the others, we just use whatever buffers we already have.
1171      *
1172      * On entry to this function, we are already holding a buffer, so we
1173      * can't wait for reservation.  So we call buf_TryReserveBuffers()
1174      * instead.  Not only that, we can't really even call buf_Get(), for
1175      * the same reason.  We can't avoid that, though.  To avoid deadlock
1176      * we allow only one thread to be executing the buf_Get()-buf_Release()
1177      * sequence at a time.
1178      */
1179
1180     /* first hold all buffers, since we can't hold any locks in buf_Get */
1181     while (1) {
1182         /* stop at chunk boundary */
1183         if (collected >= cm_chunkSize) 
1184             break;
1185                 
1186         /* see if the next page would be past EOF */
1187         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
1188             break;
1189
1190         code = buf_Get(scp, &pageBase, reqp, &tbp);
1191         if (code) {
1192             lock_ObtainWrite(&scp->rw);
1193             cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
1194             return code;
1195         }
1196                 
1197         buf_Release(tbp);
1198         tbp = NULL;
1199
1200         pageBase = LargeIntegerAdd(tblocksize, pageBase);
1201         collected += cm_data.buf_blockSize;
1202     }
1203
1204     /* reserve a chunk's worth of buffers if possible */
1205     reserving = buf_TryReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1206
1207     pageBase = *offsetp;
1208     collected = pageBase.LowPart & (cm_chunkSize - 1);
1209
1210     /* now hold all buffers, if they are still there */
1211     while (1) {
1212         /* stop at chunk boundary */
1213         if (collected >= cm_chunkSize) 
1214             break;
1215                 
1216         /* see if the next page would be past EOF */
1217         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
1218             break;
1219
1220         tbp = buf_Find(scp, &pageBase);
1221         if (!tbp) 
1222             break;
1223
1224         /* add the buffer to the list */
1225         qdp = osi_QDAlloc();
1226         osi_SetQData(qdp, tbp);
1227         osi_QAddH((osi_queue_t **)&heldBufListp, 
1228                   (osi_queue_t **)&heldBufListEndp, 
1229                   &qdp->q);
1230         /* leave tbp held (from buf_Get) */
1231
1232         if (!reserving) 
1233             break;
1234
1235         collected += cm_data.buf_blockSize;
1236         pageBase = LargeIntegerAdd(tblocksize, pageBase);
1237     }
1238
1239     /* look at each buffer, adding it into the list if it looks idle and
1240      * filled with old data.  One special case: wait for idle if it is the
1241      * first buffer since we really need that one for our caller to make
1242      * any progress.
1243      */
1244     isFirst = 1;
1245     collected = 0;              /* now count how many we'll really use */
1246     for (tqdp = heldBufListEndp;
1247         tqdp;
1248           tqdp = (osi_queueData_t *) osi_QPrev(&tqdp->q)) {
1249         /* get a ptr to the held buffer */
1250         tbp = osi_GetQData(tqdp);
1251         pageBase = tbp->offset;
1252
1253         /* now lock the buffer lock */
1254         lock_ObtainMutex(&tbp->mx);
1255         lock_ObtainWrite(&scp->rw);
1256
1257         /* don't bother fetching over data that is already current */
1258         if (tbp->dataVersion <= scp->dataVersion && tbp->dataVersion >= scp->bufDataVersionLow) {
1259             /* we don't need this buffer, since it is current */
1260             lock_ReleaseWrite(&scp->rw);
1261             lock_ReleaseMutex(&tbp->mx);
1262             break;
1263         }
1264
1265         flags = CM_SCACHESYNC_FETCHDATA | CM_SCACHESYNC_BUFLOCKED;
1266         if (!isFirst) 
1267             flags |= CM_SCACHESYNC_NOWAIT;
1268
1269         /* wait for the buffer to serialize, if required.  Doesn't
1270          * release the scp or buffer lock(s) if NOWAIT is specified.
1271          */
1272         code = cm_SyncOp(scp, tbp, userp, reqp, 0, flags);
1273         if (code) {
1274             lock_ReleaseWrite(&scp->rw);
1275             lock_ReleaseMutex(&tbp->mx);
1276             break;
1277         }
1278                 
1279         /* don't fetch over dirty buffers */
1280         if (tbp->flags & CM_BUF_DIRTY) {
1281             cm_SyncOpDone(scp, tbp, flags);
1282             lock_ReleaseWrite(&scp->rw);
1283             lock_ReleaseMutex(&tbp->mx);
1284             break;
1285         }
1286
1287         /* Release locks */
1288         lock_ReleaseWrite(&scp->rw);
1289         lock_ReleaseMutex(&tbp->mx);
1290
1291         /* add the buffer to the list */
1292         qdp = osi_QDAlloc();
1293         osi_SetQData(qdp, tbp);
1294         osi_QAddH((osi_queue_t **)&biop->bufListp, 
1295                   (osi_queue_t **)&biop->bufListEndp, 
1296                   &qdp->q);
1297         buf_Hold(tbp);
1298
1299         /* from now on, a failure just stops our collection process, but
1300          * we still do the I/O to whatever we've already managed to collect.
1301          */
1302         isFirst = 0;
1303         collected += cm_data.buf_blockSize;
1304     }
1305         
1306     /* now, we've held in biop->bufListp all the buffer's we're really
1307      * interested in.  We also have holds left from heldBufListp, and we
1308      * now release those holds on the buffers.
1309      */
1310     for (qdp = heldBufListp; qdp; qdp = tqdp) {
1311         tqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1312         tbp = osi_GetQData(qdp);
1313         osi_QRemoveHT((osi_queue_t **) &heldBufListp,
1314                       (osi_queue_t **) &heldBufListEndp,
1315                       &qdp->q);
1316         osi_QDFree(qdp);
1317         buf_Release(tbp);
1318         tbp = NULL;
1319     }
1320
1321     /* Caller expects this */
1322     lock_ObtainWrite(&scp->rw);
1323  
1324     /* if we got a failure setting up the first buffer, then we don't have
1325      * any side effects yet, and we also have failed an operation that the
1326      * caller requires to make any progress.  Give up now.
1327      */
1328     if (code && isFirst) {
1329         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1330         return code;
1331     }
1332         
1333     /* otherwise, we're still OK, and should just return the I/O setup we've
1334      * got.
1335      */
1336     biop->length = collected;
1337     biop->reserved = reserving;
1338     return 0;
1339 }
1340
1341 /* release a bulk I/O structure that was setup by cm_SetupFetchBIOD or by
1342  * cm_SetupStoreBIOD
1343  */
1344 void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore, long code, int scp_locked)
1345 {
1346     cm_scache_t *scp;           /* do not release; not held in biop */
1347     cm_buf_t *bufp;
1348     osi_queueData_t *qdp;
1349     osi_queueData_t *nqdp;
1350     int flags;
1351
1352     /* Give back reserved buffers */
1353     if (biop->reserved)
1354         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1355         
1356     if (isStore)
1357         flags = CM_SCACHESYNC_STOREDATA;
1358     else
1359         flags = CM_SCACHESYNC_FETCHDATA;
1360
1361     scp = biop->scp;
1362     if (biop->bufListp) {
1363         for(qdp = biop->bufListp; qdp; qdp = nqdp) {
1364             /* lookup next guy first, since we're going to free this one */
1365             nqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1366                 
1367             /* extract buffer and free queue data */
1368             bufp = osi_GetQData(qdp);
1369             osi_QRemoveHT((osi_queue_t **) &biop->bufListp,
1370                            (osi_queue_t **) &biop->bufListEndp,
1371                            &qdp->q);
1372             osi_QDFree(qdp);
1373
1374             /* now, mark I/O as done, unlock the buffer and release it */
1375             if (scp_locked)
1376                 lock_ReleaseWrite(&scp->rw);
1377             lock_ObtainMutex(&bufp->mx);
1378             lock_ObtainWrite(&scp->rw);
1379             cm_SyncOpDone(scp, bufp, flags);
1380                 
1381             /* turn off writing and wakeup users */
1382             if (isStore) {
1383                 if (bufp->flags & CM_BUF_WAITING) {
1384                     osi_Log2(afsd_logp, "cm_ReleaseBIOD Waking [scp 0x%p] bp 0x%p", scp, bufp);
1385                     osi_Wakeup((LONG_PTR) bufp);
1386                 }
1387                 if (code) {
1388                     bufp->flags &= ~CM_BUF_WRITING;
1389                     switch (code) {
1390                     case CM_ERROR_NOSUCHFILE:
1391                     case CM_ERROR_BADFD:
1392                     case CM_ERROR_NOACCESS:
1393                     case CM_ERROR_QUOTA:
1394                     case CM_ERROR_SPACE:
1395                     case CM_ERROR_TOOBIG:
1396                     case CM_ERROR_READONLY:
1397                     case CM_ERROR_NOSUCHPATH:
1398                         /*
1399                          * Apply the fatal error to this buffer.
1400                          */
1401                         bufp->flags &= ~CM_BUF_DIRTY;
1402                         bufp->flags |= CM_BUF_ERROR;
1403                         bufp->dirty_offset = 0;
1404                         bufp->dirty_length = 0;
1405                         bufp->error = code;
1406                         bufp->dataVersion = CM_BUF_VERSION_BAD;
1407                         bufp->dirtyCounter++;
1408                         break;
1409                     case CM_ERROR_TIMEDOUT:
1410                     case CM_ERROR_ALLDOWN:
1411                     case CM_ERROR_ALLBUSY:
1412                     case CM_ERROR_ALLOFFLINE:
1413                     case CM_ERROR_CLOCKSKEW:
1414                     default:
1415                         /* do not mark the buffer in error state but do
1416                         * not attempt to complete the rest either.
1417                         */
1418                         break;
1419                     }
1420                 } else {
1421                     bufp->flags &= ~(CM_BUF_WRITING | CM_BUF_DIRTY);
1422                     bufp->dirty_offset = bufp->dirty_length = 0;
1423                 }
1424             }
1425
1426             if (!scp_locked)
1427                 lock_ReleaseWrite(&scp->rw);
1428             lock_ReleaseMutex(&bufp->mx);
1429             buf_Release(bufp);
1430             bufp = NULL;
1431         }
1432     } else {
1433         if (!scp_locked)
1434             lock_ObtainWrite(&scp->rw);
1435         cm_SyncOpDone(scp, NULL, flags);
1436         if (!scp_locked)
1437             lock_ReleaseWrite(&scp->rw);
1438     }
1439
1440     /* clean things out */
1441     biop->bufListp = NULL;
1442     biop->bufListEndp = NULL;
1443 }   
1444
1445 static int
1446 cm_CloneStatus(cm_scache_t *scp, cm_user_t *userp, int scp_locked,
1447                AFSFetchStatus *afsStatusp, AFSVolSync *volSyncp)
1448 {
1449     // setup the status based upon the scp data
1450     afsStatusp->InterfaceVersion = 0x1;
1451     switch (scp->fileType) {
1452     case CM_SCACHETYPE_FILE:
1453         afsStatusp->FileType = File;
1454         break;
1455     case CM_SCACHETYPE_DIRECTORY:
1456         afsStatusp->FileType = Directory;
1457         break;
1458     case CM_SCACHETYPE_MOUNTPOINT:
1459         afsStatusp->FileType = SymbolicLink;
1460         break;
1461     case CM_SCACHETYPE_SYMLINK:
1462     case CM_SCACHETYPE_DFSLINK:
1463         afsStatusp->FileType = SymbolicLink;
1464         break;
1465     default:
1466         afsStatusp->FileType = -1;    /* an invalid value */
1467     }
1468     afsStatusp->LinkCount = scp->linkCount;
1469     afsStatusp->Length = scp->length.LowPart;
1470     afsStatusp->DataVersion = (afs_uint32)(scp->dataVersion & MAX_AFS_UINT32);
1471     afsStatusp->Author = 0x1;
1472     afsStatusp->Owner = scp->owner;
1473     if (!scp_locked) {
1474         lock_ObtainWrite(&scp->rw);
1475         scp_locked = 1;
1476     }
1477     if (cm_FindACLCache(scp, userp, &afsStatusp->CallerAccess))
1478         afsStatusp->CallerAccess = scp->anyAccess;
1479     afsStatusp->AnonymousAccess = scp->anyAccess;
1480     afsStatusp->UnixModeBits = scp->unixModeBits;
1481     afsStatusp->ParentVnode = scp->parentVnode;
1482     afsStatusp->ParentUnique = scp->parentUnique;
1483     afsStatusp->ResidencyMask = 0;
1484     afsStatusp->ClientModTime = scp->clientModTime;
1485     afsStatusp->ServerModTime = scp->serverModTime;
1486     afsStatusp->Group = scp->group;
1487     afsStatusp->SyncCounter = 0;
1488     afsStatusp->dataVersionHigh = (afs_uint32)(scp->dataVersion >> 32);
1489     afsStatusp->lockCount = 0;
1490     afsStatusp->Length_hi = scp->length.HighPart;
1491     afsStatusp->errorCode = 0;
1492
1493     volSyncp->spare1 = scp->volumeCreationDate;
1494
1495     return scp_locked;
1496 }
1497
1498 /* Fetch a buffer.  Called with scp locked.
1499  * The scp is locked on return.
1500  */
1501 long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *userp,
1502                   cm_req_t *reqp)
1503 {
1504     long code=0, code1=0;
1505     afs_uint32 nbytes;                  /* bytes in transfer */
1506     afs_uint32 nbytes_hi = 0;            /* high-order 32 bits of bytes in transfer */
1507     afs_uint64 length_found = 0;
1508     long rbytes;                        /* bytes in rx_Read call */
1509     long temp;
1510     AFSFetchStatus afsStatus;
1511     AFSCallBack callback;
1512     AFSVolSync volSync;
1513     char *bufferp;
1514     cm_buf_t *tbufp;                    /* buf we're filling */
1515     osi_queueData_t *qdp;               /* q element we're scanning */
1516     AFSFid tfid;
1517     struct rx_call *rxcallp;
1518     struct rx_connection *rxconnp;
1519     cm_bulkIO_t biod;           /* bulk IO descriptor */
1520     cm_conn_t *connp;
1521     int getroot;
1522     afs_int32 t1,t2;
1523     int require_64bit_ops = 0;
1524     int call_was_64bit = 0;
1525     int fs_fetchdata_offset_bug = 0;
1526     int first_read = 1;
1527     int scp_locked = 1;
1528
1529     memset(&volSync, 0, sizeof(volSync));
1530
1531     /* now, the buffer may or may not be filled with good data (buf_GetNew
1532      * drops lots of locks, and may indeed return a properly initialized
1533      * buffer, although more likely it will just return a new, empty, buffer.
1534      */
1535
1536 #ifdef AFS_FREELANCE_CLIENT
1537
1538     // yj: if they're trying to get the /afs directory, we need to
1539     // handle it differently, since it's local rather than on any
1540     // server
1541
1542     getroot = (scp==cm_data.rootSCachep);
1543     if (getroot)
1544         osi_Log1(afsd_logp,"GetBuffer returns cm_data.rootSCachep=%x",cm_data.rootSCachep);
1545 #endif
1546
1547     if (cm_HaveCallback(scp) && bufp->dataVersion <= scp->dataVersion && bufp->dataVersion >= scp->bufDataVersionLow) {
1548         /* We already have this buffer don't do extra work */
1549         return 0;
1550     }
1551
1552     cm_AFSFidFromFid(&tfid, &scp->fid);
1553
1554     code = cm_SetupFetchBIOD(scp, &bufp->offset, &biod, userp, reqp);
1555     if (code) {
1556         /* couldn't even get the first page setup properly */
1557         osi_Log1(afsd_logp, "GetBuffer: SetupFetchBIOD failure code %d", code);
1558         return code;
1559     }
1560
1561     /* once we get here, we have the callback in place, we know that no one
1562      * is fetching the data now.  Check one last time that we still have
1563      * the wrong data, and then fetch it if we're still wrong.
1564      *
1565      * We can lose a race condition and end up with biod.length zero, in
1566      * which case we just retry.
1567      */
1568     if (bufp->dataVersion <= scp->dataVersion && bufp->dataVersion >= scp->bufDataVersionLow || biod.length == 0) {
1569         if ((bufp->dataVersion == CM_BUF_VERSION_BAD || bufp->dataVersion < scp->bufDataVersionLow) && 
1570              LargeIntegerGreaterThanOrEqualTo(bufp->offset, scp->serverLength)) 
1571         {
1572             osi_Log4(afsd_logp, "Bad DVs 0x%x != (0x%x -> 0x%x) or length 0x%x",
1573                      bufp->dataVersion, scp->bufDataVersionLow, scp->dataVersion, biod.length);
1574
1575             if (bufp->dataVersion == CM_BUF_VERSION_BAD)
1576                 memset(bufp->datap, 0, cm_data.buf_blockSize);
1577             bufp->dataVersion = scp->dataVersion;
1578         }
1579         cm_ReleaseBIOD(&biod, 0, 0, 1);
1580         return 0;
1581     } else if ((bufp->dataVersion == CM_BUF_VERSION_BAD || bufp->dataVersion < scp->bufDataVersionLow)
1582                 && (scp->mask & CM_SCACHEMASK_TRUNCPOS) &&
1583                 LargeIntegerGreaterThanOrEqualTo(bufp->offset, scp->truncPos)) {
1584         memset(bufp->datap, 0, cm_data.buf_blockSize);
1585         bufp->dataVersion = scp->dataVersion;
1586         cm_ReleaseBIOD(&biod, 0, 0, 1);
1587         return 0;
1588     }
1589
1590     lock_ReleaseWrite(&scp->rw);
1591     scp_locked = 0;
1592
1593     if (LargeIntegerGreaterThan(LargeIntegerAdd(biod.offset,
1594                                                 ConvertLongToLargeInteger(biod.length)),
1595                                 ConvertLongToLargeInteger(LONG_MAX))) {
1596         require_64bit_ops = 1;
1597     }
1598
1599     osi_Log2(afsd_logp, "cm_GetBuffer: fetching data scp %p bufp %p", scp, bufp);
1600     osi_Log3(afsd_logp, "cm_GetBuffer: fetching data scpDV 0x%x scpDVLow 0x%x bufDV 0x%x",
1601              scp->dataVersion, scp->bufDataVersionLow, bufp->dataVersion);
1602
1603 #ifdef AFS_FREELANCE_CLIENT
1604
1605     // yj code
1606     // if getroot then we don't need to make any calls
1607     // just return fake data
1608         
1609     if (cm_freelanceEnabled && getroot) {
1610         // setup the fake status                        
1611         afsStatus.InterfaceVersion = 0x1;
1612         afsStatus.FileType = 0x2;
1613         afsStatus.LinkCount = scp->linkCount;
1614         afsStatus.Length = cm_fakeDirSize;
1615         afsStatus.DataVersion = (afs_uint32)(cm_data.fakeDirVersion & 0xFFFFFFFF);
1616         afsStatus.Author = 0x1;
1617         afsStatus.Owner = 0x0;
1618         afsStatus.CallerAccess = 0x9;
1619         afsStatus.AnonymousAccess = 0x9;
1620         afsStatus.UnixModeBits = 0x1ff;
1621         afsStatus.ParentVnode = 0x1;
1622         afsStatus.ParentUnique = 0x1;
1623         afsStatus.ResidencyMask = 0;
1624         afsStatus.ClientModTime = (afs_uint32)FakeFreelanceModTime;
1625         afsStatus.ServerModTime = (afs_uint32)FakeFreelanceModTime;
1626         afsStatus.Group = 0;
1627         afsStatus.SyncCounter = 0;
1628         afsStatus.dataVersionHigh = (afs_uint32)(cm_data.fakeDirVersion >> 32);
1629         afsStatus.lockCount = 0;
1630         afsStatus.Length_hi = 0;
1631         afsStatus.errorCode = 0;
1632         memset(&volSync, 0, sizeof(volSync));
1633
1634         // once we're done setting up the status info,
1635         // we just fill the buffer pages with fakedata
1636         // from cm_FakeRootDir. Extra pages are set to
1637         // 0. 
1638                 
1639         lock_ObtainMutex(&cm_Freelance_Lock);
1640         t1 = bufp->offset.LowPart;
1641         qdp = biod.bufListEndp;
1642         while (qdp) {
1643             tbufp = osi_GetQData(qdp);
1644             bufferp=tbufp->datap;
1645             memset(bufferp, 0, cm_data.buf_blockSize);
1646             t2 = cm_fakeDirSize - t1;
1647             if (t2> (afs_int32)cm_data.buf_blockSize) 
1648                 t2=cm_data.buf_blockSize;
1649             if (t2 > 0) {
1650                 memcpy(bufferp, cm_FakeRootDir+t1, t2);
1651             } else {
1652                 t2 = 0;
1653             }
1654             t1+=t2;
1655             qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1656
1657         }
1658         lock_ReleaseMutex(&cm_Freelance_Lock);
1659         
1660         // once we're done, we skip over the part of the
1661         // code that does the ACTUAL fetching of data for
1662         // real files
1663
1664         goto fetchingcompleted;
1665     }
1666
1667 #endif /* AFS_FREELANCE_CLIENT */
1668
1669     /*
1670      * if the requested offset is greater than the file length,
1671      * the file server will return zero bytes of data and the
1672      * current status for the file which we already have since
1673      * we have just obtained a callback.  Instead, we can avoid
1674      * the network round trip by allocating zeroed buffers and
1675      * faking the status info.
1676      */
1677     if (biod.offset.QuadPart >= scp->length.QuadPart) {
1678         osi_Log5(afsd_logp, "SKIP FetchData64 scp 0x%p, off 0x%x:%08x > length 0x%x:%08x",
1679                  scp, biod.offset.HighPart, biod.offset.LowPart,
1680                  scp->length.HighPart, scp->length.LowPart);
1681
1682         /* Clone the current status info */
1683         scp_locked = cm_CloneStatus(scp, userp, scp_locked, &afsStatus, &volSync);
1684
1685         /* status info complete, fill pages with zeros */
1686         for (qdp = biod.bufListEndp;
1687              qdp;
1688              qdp = (osi_queueData_t *) osi_QPrev(&qdp->q)) {
1689             tbufp = osi_GetQData(qdp);
1690             bufferp=tbufp->datap;
1691             memset(bufferp, 0, cm_data.buf_blockSize);
1692         }
1693
1694         /* no need to contact the file server */
1695         goto fetchingcompleted;
1696     }
1697
1698     if (scp_locked) {
1699         lock_ReleaseWrite(&scp->rw);
1700         scp_locked = 0;
1701     }
1702
1703     /* now make the call */
1704     do {
1705         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
1706         if (code) 
1707             continue;
1708
1709         rxconnp = cm_GetRxConn(connp);
1710         rxcallp = rx_NewCall(rxconnp);
1711         rx_PutConnection(rxconnp);
1712
1713         nbytes = nbytes_hi = 0;
1714
1715         if (SERVERHAS64BIT(connp)) {
1716             call_was_64bit = 1;
1717
1718             osi_Log4(afsd_logp, "CALL FetchData64 scp 0x%p, off 0x%x:%08x, size 0x%x",
1719                      scp, biod.offset.HighPart, biod.offset.LowPart, biod.length);
1720
1721             code = StartRXAFS_FetchData64(rxcallp, &tfid, biod.offset.QuadPart, biod.length);
1722
1723             if (code == 0) {
1724                 temp = rx_Read32(rxcallp, &nbytes_hi);
1725                 if (temp == sizeof(afs_int32)) {
1726                     nbytes_hi = ntohl(nbytes_hi);
1727                 } else {
1728                     nbytes_hi = 0;
1729                     code = rxcallp->error;
1730                     code1 = rx_EndCall(rxcallp, code);
1731                     rxcallp = NULL;
1732                 }
1733             }
1734         } else {
1735             call_was_64bit = 0;
1736         }
1737
1738         if (code == RXGEN_OPCODE || !SERVERHAS64BIT(connp)) {
1739             if (require_64bit_ops) {
1740                 osi_Log0(afsd_logp, "Skipping FetchData.  Operation requires FetchData64");
1741                 code = CM_ERROR_TOOBIG;
1742             } else {
1743                 if (!rxcallp) {
1744                     rxconnp = cm_GetRxConn(connp);
1745                     rxcallp = rx_NewCall(rxconnp);
1746                     rx_PutConnection(rxconnp);
1747                 }
1748
1749                 osi_Log3(afsd_logp, "CALL FetchData scp 0x%p, off 0x%x, size 0x%x",
1750                          scp, biod.offset.LowPart, biod.length);
1751
1752                 code = StartRXAFS_FetchData(rxcallp, &tfid, biod.offset.LowPart,
1753                                             biod.length);
1754
1755                 SET_SERVERHASNO64BIT(connp);
1756             }
1757         }
1758
1759         if (code == 0) {
1760             temp  = rx_Read32(rxcallp, &nbytes);
1761             if (temp == sizeof(afs_int32)) {
1762                 nbytes = ntohl(nbytes);
1763                 FillInt64(length_found, nbytes_hi, nbytes);
1764                 if (length_found > biod.length) {
1765                     /*
1766                      * prior to 1.4.12 and 1.5.65 the file server would return
1767                      * (filesize - offset) if the requested offset was greater than
1768                      * the filesize.  The correct return value would have been zero.
1769                      * Force a retry by returning an RX_PROTOCOL_ERROR.  If the cause
1770                      * is a race between two RPCs issues by this cache manager, the
1771                      * correct thing will happen the second time.
1772                      */
1773                     osi_Log0(afsd_logp, "cm_GetBuffer length_found > biod.length");
1774                     fs_fetchdata_offset_bug = 1;
1775                 }
1776             } else {
1777                 osi_Log1(afsd_logp, "cm_GetBuffer rx_Read32 returns %d != 4", temp);
1778                 code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR;
1779             }
1780         }
1781         /* for the moment, nbytes_hi will always be 0 if code == 0
1782            because biod.length is a 32-bit quantity. */
1783
1784         if (code == 0) {
1785             qdp = biod.bufListEndp;
1786             if (qdp) {
1787                 tbufp = osi_GetQData(qdp);
1788                 bufferp = tbufp->datap;
1789             }
1790             else 
1791                 bufferp = NULL;
1792             /* fill length_found of data from the pipe into the pages.
1793              * When we stop, qdp will point at the last page we're
1794              * dealing with, and bufferp will tell us where we
1795              * stopped.  We'll need this info below when we clear
1796              * the remainder of the last page out (and potentially
1797              * clear later pages out, if we fetch past EOF).
1798              */
1799             while (length_found > 0) {
1800                 /* assert that there are still more buffers;
1801                  * our check above for length_found being less than
1802                  * biod.length should ensure this.
1803                  */
1804                 osi_assertx(bufferp != NULL, "null cm_buf_t");
1805
1806                 /* read rbytes of data */
1807                 rbytes = (afs_uint32)(length_found > cm_data.buf_blockSize ? cm_data.buf_blockSize : length_found);
1808                 temp = rx_Read(rxcallp, bufferp, rbytes);
1809                 if (temp < rbytes) {
1810                     /*
1811                      * If the file server returned (filesize - offset),
1812                      * then the first rx_Read will return zero octets of data.
1813                      * If it does, do not treat it as an error.  Correct the
1814                      * length_found and continue as if the file server said
1815                      * it was sending us zero octets of data.
1816                      */
1817                     if (fs_fetchdata_offset_bug && first_read)
1818                         length_found = 0;
1819                     else
1820                         code = (rxcallp->error < 0) ? rxcallp->error : RX_PROTOCOL_ERROR;
1821                     break;
1822                 }
1823                 first_read = 0;
1824
1825                 /* allow read-while-fetching.
1826                  * if this is the last buffer, clear the
1827                  * PREFETCHING flag, so the reader waiting for
1828                  * this buffer will start a prefetch.
1829                  */
1830                 tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
1831                 lock_ObtainWrite(&scp->rw);
1832                 if (scp->flags & CM_SCACHEFLAG_WAITING) {
1833                     osi_Log1(afsd_logp, "CM GetBuffer Waking scp 0x%p", scp);
1834                     osi_Wakeup((LONG_PTR) &scp->flags);
1835                 }
1836                 if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
1837                     osi_hyper_t tlength = ConvertLongToLargeInteger(biod.length);
1838                     *cpffp = 1;
1839                     cm_ClearPrefetchFlag(0, scp, &biod.offset, &tlength);
1840                 }
1841                 lock_ReleaseWrite(&scp->rw);
1842
1843                 /* and adjust counters */
1844                 length_found -= temp;
1845
1846                 /* and move to the next buffer */
1847                 if (length_found != 0) {
1848                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1849                     if (qdp) {
1850                         tbufp = osi_GetQData(qdp);
1851                         bufferp = tbufp->datap;
1852                     }
1853                     else 
1854                         bufferp = NULL;
1855                 } else 
1856                     bufferp += temp;
1857             }
1858
1859             /* zero out remainder of last pages, in case we are
1860              * fetching past EOF.  We were fetching an integral #
1861              * of pages, but stopped, potentially in the middle of
1862              * a page.  Zero the remainder of that page, and then
1863              * all of the rest of the pages.
1864              */
1865             /* bytes fetched */
1866             osi_assertx((bufferp - tbufp->datap) < LONG_MAX, "data >= LONG_MAX");
1867             rbytes = (long) (bufferp - tbufp->datap);
1868
1869             /* bytes left to zero */
1870             rbytes = cm_data.buf_blockSize - rbytes;
1871             while(qdp) {
1872                 if (rbytes != 0)
1873                     memset(bufferp, 0, rbytes);
1874                 qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1875                 if (qdp == NULL) 
1876                     break;
1877                 tbufp = osi_GetQData(qdp);
1878                 bufferp = tbufp->datap;
1879                 /* bytes to clear in this page */
1880                 rbytes = cm_data.buf_blockSize;
1881             }   
1882         }
1883
1884         if (code == 0) {
1885             if (call_was_64bit)
1886                 code = EndRXAFS_FetchData64(rxcallp, &afsStatus, &callback, &volSync);
1887             else
1888                 code = EndRXAFS_FetchData(rxcallp, &afsStatus, &callback, &volSync);
1889         } else {
1890             if (call_was_64bit)
1891                 osi_Log1(afsd_logp, "CALL EndRXAFS_FetchData64 skipped due to error %d", code);
1892             else
1893                 osi_Log1(afsd_logp, "CALL EndRXAFS_FetchData skipped due to error %d", code);
1894         }
1895
1896         if (rxcallp)
1897             code1 = rx_EndCall(rxcallp, code);
1898
1899         if (code1 == RXKADUNKNOWNKEY)
1900             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
1901
1902         /* If we are avoiding a file server bug, ignore the error state */
1903         if (fs_fetchdata_offset_bug && first_read && length_found == 0 && code == -451) {
1904             /* Clone the current status info and clear the error state */
1905             scp_locked = cm_CloneStatus(scp, userp, scp_locked, &afsStatus, &volSync);
1906             if (scp_locked) {
1907                 lock_ReleaseWrite(&scp->rw);
1908                 scp_locked = 0;
1909             }
1910             code = 0;
1911         /* Prefer the error value from FetchData over rx_EndCall */
1912         } else if (code == 0 && code1 != 0)
1913             code = code1;
1914         osi_Log0(afsd_logp, "CALL FetchData DONE");
1915
1916     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
1917
1918   fetchingcompleted:
1919     code = cm_MapRPCError(code, reqp);
1920
1921     if (!scp_locked)
1922         lock_ObtainWrite(&scp->rw);
1923     
1924     /* we know that no one else has changed the buffer, since we still have
1925      * the fetching flag on the buffers, and we have the scp locked again.
1926      * Copy in the version # into the buffer if we got code 0 back from the
1927      * read.
1928      */
1929     if (code == 0) {
1930         for(qdp = biod.bufListp;
1931              qdp;
1932              qdp = (osi_queueData_t *) osi_QNext(&qdp->q)) {
1933             tbufp = osi_GetQData(qdp);
1934             tbufp->dataVersion = afsStatus.dataVersionHigh;
1935             tbufp->dataVersion <<= 32;
1936             tbufp->dataVersion |= afsStatus.DataVersion;
1937
1938 #ifdef DISKCACHE95
1939             /* write buffer out to disk cache */
1940             diskcache_Update(tbufp->dcp, tbufp->datap, cm_data.buf_blockSize,
1941                               tbufp->dataVersion);
1942 #endif /* DISKCACHE95 */
1943         }
1944     }
1945
1946     /* release scatter/gather I/O structure (buffers, locks) */
1947     cm_ReleaseBIOD(&biod, 0, code, 1);
1948
1949     if (code == 0) 
1950         cm_MergeStatus(NULL, scp, &afsStatus, &volSync, userp, reqp, 0);
1951     
1952     return code;
1953 }