windows-misc-20050102
[openafs.git] / src / WINNT / afsd / cm_dcache.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <afs/stds.h>
12
13 #ifndef DJGPP
14 #include <windows.h>
15 #include <winsock2.h>
16 #include <nb30.h>
17 #endif /* !DJGPP */
18 #ifdef COMMENT
19 #include <malloc.h>
20 #endif
21 #include <string.h>
22 #include <stdlib.h>
23 #include <osi.h>
24
25 #include "afsd.h"
26
27 #ifdef DEBUG
28 extern void afsi_log(char *pattern, ...);
29 #endif
30
31 osi_mutex_t cm_bufGetMutex;
32 #ifdef AFS_FREELANCE_CLIENT
33 extern osi_mutex_t cm_Freelance_Lock;
34 #endif
35
36 /* functions called back from the buffer package when reading or writing data,
37  * or when holding or releasing a vnode pointer.
38  */
39 long cm_BufWrite(void *vfidp, osi_hyper_t *offsetp, long length, long flags,
40                  cm_user_t *userp, cm_req_t *reqp)
41 {
42     /* store the data back from this buffer; the buffer is locked and held,
43      * but the vnode involved isn't locked, yet.  It is held by its
44      * reference from the buffer, which won't change until the buffer is
45      * released by our caller.  Thus, we don't have to worry about holding
46      * bufp->scp.
47      */
48     long code;
49     cm_fid_t *fidp = vfidp;
50     cm_scache_t *scp;
51     long nbytes;
52     long temp;
53     AFSFetchStatus outStatus;
54     AFSStoreStatus inStatus;
55     osi_hyper_t thyper;
56     AFSVolSync volSync;
57     AFSFid tfid;
58     struct rx_call *callp;
59     osi_queueData_t *qdp;
60     cm_buf_t *bufp;
61     long wbytes;
62     char *bufferp;
63     cm_conn_t *connp;
64     long truncPos;
65     cm_bulkIO_t biod;           /* bulk IO descriptor */
66
67     osi_assert(userp != NULL);
68
69     /* now, the buffer may or may not be filled with good data (buf_GetNew
70      * drops lots of locks, and may indeed return a properly initialized
71      * buffer, although more likely it will just return a new, empty, buffer.
72      */
73     scp = cm_FindSCache(fidp);
74     if (scp == NULL)
75         return CM_ERROR_NOSUCHFILE;     /* shouldn't happen */
76
77     cm_AFSFidFromFid(&tfid, fidp);
78
79     lock_ObtainMutex(&scp->mx);
80         
81     code = cm_SetupStoreBIOD(scp, offsetp, length, &biod, userp, reqp);
82     if (code) {
83         osi_Log1(afsd_logp, "cm_SetupStoreBIOD code %x", code);
84         lock_ReleaseMutex(&scp->mx);
85         cm_ReleaseSCache(scp);
86         return code;
87     }
88
89     if (biod.length == 0) {
90         osi_Log0(afsd_logp, "cm_SetupStoreBIOD length 0");
91         lock_ReleaseMutex(&scp->mx);
92         cm_ReleaseBIOD(&biod, 1);       /* should be a NOOP */
93         cm_ReleaseSCache(scp);
94         return 0;
95     }   
96
97     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
98     (void) cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL);
99
100     /* prepare the output status for the store */
101     scp->mask |= CM_SCACHEMASK_CLIENTMODTIME;
102     cm_StatusFromAttr(&inStatus, scp, NULL);
103     truncPos = scp->length.LowPart;
104     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
105          && scp->truncPos.LowPart < (unsigned long) truncPos)
106         truncPos = scp->truncPos.LowPart;
107         scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
108                 
109     /* compute how many bytes to write from this buffer */
110     thyper = LargeIntegerSubtract(scp->length, biod.offset);
111     if (LargeIntegerLessThanZero(thyper)) {
112         /* entire buffer is past EOF */
113         nbytes = 0;
114     }
115     else {
116         /* otherwise write out part of buffer before EOF, but not
117          * more than bufferSize bytes.
118          */
119         nbytes = thyper.LowPart;
120         if (nbytes > biod.length) 
121             nbytes = biod.length;
122     }
123
124     lock_ReleaseMutex(&scp->mx);
125         
126     /* now we're ready to do the store operation */
127     do {
128         code = cm_Conn(&scp->fid, userp, reqp, &connp);
129         if (code) 
130             continue;
131                 
132         lock_ObtainMutex(&connp->mx);
133         callp = rx_NewCall(connp->callp);
134         lock_ReleaseMutex(&connp->mx);
135
136         osi_Log3(afsd_logp, "CALL StoreData vp %x, off 0x%x, size 0x%x",
137                  (long) scp, biod.offset.LowPart, nbytes);
138
139         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
140                                     biod.offset.LowPart, nbytes, truncPos);
141
142         if (code == 0) {
143             /* write the data from the the list of buffers */
144             qdp = NULL;
145             while(nbytes > 0) {
146                 if (qdp == NULL)
147                     qdp = biod.bufListEndp;
148                 else
149                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
150                 osi_assert(qdp != NULL);
151                 bufp = osi_GetQData(qdp);
152                 bufferp = bufp->datap;
153                 wbytes = nbytes;
154                 if (wbytes > buf_bufferSize) 
155                     wbytes = buf_bufferSize;
156
157                 /* write out wbytes of data from bufferp */
158                 temp = rx_Write(callp, bufferp, wbytes);
159                 if (temp != wbytes) {
160                     osi_Log2(afsd_logp, "rx_Write failed %d != %d",temp,wbytes);
161                     code = -1;
162                     break;
163                 } else {
164                     osi_Log1(afsd_logp, "rx_Write succeeded %d",temp);
165                 }       
166                 nbytes -= wbytes;
167             }   /* while more bytes to write */
168         }               /* if RPC started successfully */
169         else {
170             osi_Log1(afsd_logp, "StartRXAFS_StoreData failed (%lX)",code);
171         }
172         if (code == 0) {
173             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
174             if (code)
175                 osi_Log1(afsd_logp, "EndRXAFS_StoreData failed (%lX)",code);
176         }
177         code = rx_EndCall(callp, code);
178         osi_Log0(afsd_logp, "CALL StoreData DONE");
179                 
180     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
181     code = cm_MapRPCError(code, reqp);
182         
183     /* now, clean up our state */
184     lock_ObtainMutex(&scp->mx);
185
186     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
187
188     if (code == 0) {
189         /* now, here's something a little tricky: in AFS 3, a dirty
190          * length can't be directly stored, instead, a dirty chunk is
191          * stored that sets the file's size (by writing and by using
192          * the truncate-first option in the store call).
193          *
194          * At this point, we've just finished a store, and so the trunc
195          * pos field is clean.  If the file's size at the server is at
196          * least as big as we think it should be, then we turn off the
197          * length dirty bit, since all the other dirty buffers must
198          * precede this one in the file.
199          *
200          * The file's desired size shouldn't be smaller than what's
201          * stored at the server now, since we just did the trunc pos
202          * store.
203          *
204          * We have to turn off the length dirty bit as soon as we can,
205          * so that we see updates made by other machines.
206          */
207         if (outStatus.Length >= scp->length.LowPart)
208             scp->mask &= ~CM_SCACHEMASK_LENGTH;
209         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
210     } else {
211         if (code == CM_ERROR_SPACE)
212             scp->flags |= CM_SCACHEFLAG_OUTOFSPACE;
213         else if (code == CM_ERROR_QUOTA)
214             scp->flags |= CM_SCACHEFLAG_OVERQUOTA;
215     }
216     lock_ReleaseMutex(&scp->mx);
217     cm_ReleaseBIOD(&biod, 1);
218     cm_ReleaseSCache(scp);
219
220     return code;
221 }
222
223 /*
224  * Truncate the file, by sending a StoreData RPC with zero length.
225  *
226  * Called with scp locked.  Releases and re-obtains the lock.
227  */
228 long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
229 {
230     AFSFetchStatus outStatus;
231     AFSStoreStatus inStatus;
232     AFSVolSync volSync;
233     AFSFid tfid;
234     long code;
235     long truncPos;
236     cm_conn_t *connp;
237     struct rx_call *callp;
238
239     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
240     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
241                      CM_SCACHESYNC_STOREDATA_EXCL);
242
243     /* prepare the output status for the store */
244     inStatus.Mask = AFS_SETMODTIME;
245     inStatus.ClientModTime = scp->clientModTime;
246     scp->mask &= ~CM_SCACHEMASK_CLIENTMODTIME;
247
248     /* calculate truncation position */
249     truncPos = scp->length.LowPart;
250     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
251          && scp->truncPos.LowPart < (unsigned long) truncPos)
252         truncPos = scp->truncPos.LowPart;
253     scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
254
255     lock_ReleaseMutex(&scp->mx);
256
257     cm_AFSFidFromFid(&tfid, &scp->fid);
258
259     /* now we're ready to do the store operation */
260     do {
261         code = cm_Conn(&scp->fid, userp, reqp, &connp);
262         if (code) 
263             continue;
264                 
265         lock_ObtainMutex(&connp->mx);
266         callp = rx_NewCall(connp->callp);
267         lock_ReleaseMutex(&connp->mx);
268
269         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
270                                     0, 0, truncPos);
271
272         if (code == 0)
273             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
274         code = rx_EndCall(callp, code);
275
276     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
277     code = cm_MapRPCError(code, reqp);
278         
279     /* now, clean up our state */
280     lock_ObtainMutex(&scp->mx);
281
282     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
283
284     if (code == 0) {
285         /*
286          * For explanation of handling of CM_SCACHEMASK_LENGTH,
287          * see cm_BufWrite().
288          */
289         if (outStatus.Length >= scp->length.LowPart)
290             scp->mask &= ~CM_SCACHEMASK_LENGTH;
291         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
292     }
293
294     return code;
295 }
296
297 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
298 {
299     *bytesReadp = buf_bufferSize;
300
301     /* now return a code that means that I/O is done */
302     return 0;
303 }
304
305 /* stabilize scache entry, and return with it locked so 
306  * it stays stable.
307  */
308 long cm_BufStabilize(void *parmp, cm_user_t *userp, cm_req_t *reqp)
309 {
310     cm_scache_t *scp;
311     long code;
312
313     scp = parmp;
314         
315     lock_ObtainMutex(&scp->mx);
316     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
317                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
318     if (code) {
319         lock_ReleaseMutex(&scp->mx);
320         return code;
321     }
322         
323     return 0;
324 }
325
326 /* undoes the work that cm_BufStabilize does: releases lock so things can change again */
327 long cm_BufUnstabilize(void *parmp, cm_user_t *userp)
328 {
329     cm_scache_t *scp;
330         
331     scp = parmp;
332         
333     lock_ReleaseMutex(&scp->mx);
334         
335     /* always succeeds */
336     return 0;
337 }
338
339 cm_buf_ops_t cm_bufOps = {
340     cm_BufWrite,
341     cm_BufRead,
342     cm_BufStabilize,
343     cm_BufUnstabilize
344 };
345
346 int cm_InitDCache(long chunkSize, long nbuffers)
347 {
348     lock_InitializeMutex(&cm_bufGetMutex, "buf_Get mutex");
349     if (nbuffers) 
350         buf_nbuffers = nbuffers;
351     return buf_Init(&cm_bufOps);
352 }
353
354 /* check to see if we have an up-to-date buffer.  The buffer must have
355  * previously been obtained by calling buf_Get.
356  *
357  * Make sure we have a callback, and that the dataversion matches.
358  *
359  * Scp must be locked.
360  *
361  * Bufp *may* be locked.
362  */
363 int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
364 {
365     int code;
366     if (!cm_HaveCallback(scp))
367         return 0;
368     if ((bufp->cmFlags
369           & (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
370          == (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
371         return 1;
372     if (bufp->dataVersion == scp->dataVersion)
373         return 1;
374     if (!isBufLocked) {
375         code = lock_TryMutex(&bufp->mx);
376         if (code == 0) {
377             /* don't have the lock, and can't lock it, then
378              * return failure.
379              */
380             return 0;
381         }
382     }
383
384     /* remember dirty flag for later */
385     code = bufp->flags & CM_BUF_DIRTY;
386
387     /* release lock if we obtained it here */
388     if (!isBufLocked) 
389         lock_ReleaseMutex(&bufp->mx);
390
391     /* if buffer was dirty, buffer is acceptable for use */
392     if (code) 
393         return 1;
394     else 
395         return 0;
396 }
397
398 /* used when deciding whether to do a prefetch or not */
399 long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, long length,
400                         cm_user_t *up, cm_req_t *reqp, osi_hyper_t *realBasep)
401 {
402     osi_hyper_t toffset;
403     osi_hyper_t tbase;
404     long code;
405     cm_buf_t *bp;
406     int stop;
407         
408     /* now scan all buffers in the range, looking for any that look like
409      * they need work.
410      */
411     tbase = *startBasep;
412     stop = 0;
413     lock_ObtainMutex(&scp->mx);
414     while(length > 0) {
415         /* get callback so we can do a meaningful dataVersion comparison */
416         code = cm_SyncOp(scp, NULL, up, reqp, 0,
417                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
418         if (code) {
419             scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
420             lock_ReleaseMutex(&scp->mx);
421             return code;
422         }
423                 
424         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
425             /* we're past the end of file */
426             break;
427         }
428
429         bp = buf_Find(scp, &tbase);
430         /* We cheat slightly by not locking the bp mutex. */
431         if (bp) {
432             if ((bp->cmFlags
433                   & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
434                  && bp->dataVersion != scp->dataVersion)
435                 stop = 1;
436             buf_Release(bp);
437         }
438         else 
439             stop = 1;
440
441         /* if this buffer is essentially guaranteed to require a fetch,
442          * break out here and return this position.
443          */
444         if (stop) 
445             break;
446                 
447         toffset.LowPart = buf_bufferSize;
448         toffset.HighPart = 0;
449         tbase = LargeIntegerAdd(toffset, tbase);
450         length -= buf_bufferSize;
451     }
452         
453     /* if we get here, either everything is fine or stop stopped us at a
454      * particular buffer in the range that definitely needs to be fetched.
455      */
456     if (stop == 0) {
457         /* return non-zero code since realBasep won't be valid */
458         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
459         code = -1;
460     }   
461     else {
462         /* successfully found a page that will need fetching */
463         *realBasep = tbase;
464         code = 0;
465     }
466     lock_ReleaseMutex(&scp->mx);
467     return code;
468 }
469
470 void cm_BkgStore(cm_scache_t *scp, long p1, long p2, long p3, long p4,
471                  cm_user_t *userp)
472 {
473     osi_hyper_t toffset;
474     long length;
475     cm_req_t req;
476
477     cm_InitReq(&req);
478     req.flags |= CM_REQ_NORETRY;
479
480     toffset.LowPart = p1;
481     toffset.HighPart = p2;
482     length = p3;
483
484     osi_Log2(afsd_logp, "Starting BKG store vp 0x%x, base 0x%x", scp, p1);
485
486     cm_BufWrite(&scp->fid, &toffset, length, /* flags */ 0, userp, &req);
487
488     lock_ObtainMutex(&scp->mx);
489     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
490     lock_ReleaseMutex(&scp->mx);
491 }
492
493 void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base)
494 {
495     osi_hyper_t thyper;
496
497     if (code == 0) {
498         thyper.LowPart = cm_chunkSize;
499         thyper.HighPart = 0;
500         thyper =  LargeIntegerAdd(*base, thyper);
501         thyper.LowPart &= (-cm_chunkSize);
502         if (LargeIntegerGreaterThan(*base, scp->prefetch.base))
503             scp->prefetch.base = *base;
504         if (LargeIntegerGreaterThan(thyper, scp->prefetch.end))
505             scp->prefetch.end = thyper;
506     }
507     scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
508 }
509
510 /* do the prefetch */
511 void cm_BkgPrefetch(cm_scache_t *scp, long p1, long p2, long p3, long p4,
512                     cm_user_t *userp)
513 {
514     long length;
515     osi_hyper_t base;
516     long code;
517     cm_buf_t *bp;
518     int cpff = 0;                       /* cleared prefetch flag */
519     cm_req_t req;
520
521     cm_InitReq(&req);
522     req.flags |= CM_REQ_NORETRY;
523         
524     base.LowPart = p1;
525     base.HighPart = p2;
526     length = p3;
527         
528     osi_Log2(afsd_logp, "Starting BKG prefetch vp 0x%x, base 0x%x", scp, p1);
529
530     code = buf_Get(scp, &base, &bp);
531
532     lock_ObtainMutex(&scp->mx);
533
534     if (code || (bp->cmFlags & CM_BUF_CMFETCHING)) {
535         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
536         lock_ReleaseMutex(&scp->mx);
537         return;
538     }
539
540     code = cm_GetBuffer(scp, bp, &cpff, userp, &req);
541     if (!cpff) 
542         cm_ClearPrefetchFlag(code, scp, &base);
543     lock_ReleaseMutex(&scp->mx);
544     buf_Release(bp);
545     return;
546 }
547
548 /* a read was issued to offsetp, and we have to determine whether we should
549  * do a prefetch.
550  */
551 void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp,
552                          cm_user_t *userp, cm_req_t *reqp)
553 {
554     long code;
555     osi_hyper_t realBase;
556     osi_hyper_t readBase;
557         
558     readBase = *offsetp;
559     /* round up to chunk boundary */
560     readBase.LowPart += (cm_chunkSize-1);
561     readBase.LowPart &= (-cm_chunkSize);
562
563     lock_ObtainMutex(&scp->mx);
564     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
565          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
566         lock_ReleaseMutex(&scp->mx);
567         return;
568     }
569     scp->flags |= CM_SCACHEFLAG_PREFETCHING;
570
571     /* start the scan at the latter of the end of this read or
572      * the end of the last fetched region.
573      */
574     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
575         readBase = scp->prefetch.end;
576
577     lock_ReleaseMutex(&scp->mx);
578
579     code = cm_CheckFetchRange(scp, &readBase, cm_chunkSize, userp, reqp,
580                               &realBase);
581     if (code) 
582         return; /* can't find something to prefetch */
583
584     osi_Log2(afsd_logp, "BKG Prefetch request vp 0x%x, base 0x%x",
585              scp, realBase.LowPart);
586
587     cm_QueueBKGRequest(scp, cm_BkgPrefetch, realBase.LowPart,
588                        realBase.HighPart, cm_chunkSize, 0, userp);
589 }
590
591 /* scp must be locked; temporarily unlocked during processing.
592  * If returns 0, returns buffers held in biop, and with
593  * CM_BUF_CMSTORING set.
594  *
595  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
596  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
597  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
598  * must be woken, and the event must be set when the I/O is done.  All of this
599  * is required so that buf_WaitIO synchronizes properly with the buffer as it
600  * is being written out.
601  */
602 long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
603                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
604 {
605     cm_buf_t *bufp;
606     osi_queueData_t *qdp;
607     osi_hyper_t thyper;
608     osi_hyper_t tbase;
609     osi_hyper_t scanStart;              /* where to start scan for dirty pages */
610     osi_hyper_t scanEnd;                /* where to stop scan for dirty pages */
611     osi_hyper_t firstModOffset; /* offset of first modified page in range */
612     long temp;
613     long code;
614     long flags;                 /* flags to cm_SyncOp */
615         
616     /* clear things out */
617     biop->scp = scp;            /* don't hold */
618     biop->offset = *inOffsetp;
619     biop->length = 0;
620     biop->bufListp = NULL;
621     biop->bufListEndp = NULL;
622     biop->reserved = 0;
623
624     /* reserve a chunk's worth of buffers */
625     lock_ReleaseMutex(&scp->mx);
626     buf_ReserveBuffers(cm_chunkSize / buf_bufferSize);
627     lock_ObtainMutex(&scp->mx);
628
629     bufp = NULL;
630     for (temp = 0; temp < inSize; temp += buf_bufferSize, bufp = NULL) {
631         thyper.HighPart = 0;
632         thyper.LowPart = temp;
633         tbase = LargeIntegerAdd(*inOffsetp, thyper);
634
635         bufp = buf_Find(scp, &tbase);
636         if (bufp) {
637             /* get buffer mutex and scp mutex safely */
638             lock_ReleaseMutex(&scp->mx);
639             lock_ObtainMutex(&bufp->mx);
640             lock_ObtainMutex(&scp->mx);
641
642             flags = CM_SCACHESYNC_NEEDCALLBACK
643                 | CM_SCACHESYNC_GETSTATUS
644                     | CM_SCACHESYNC_STOREDATA
645                         | CM_SCACHESYNC_BUFLOCKED;
646             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
647             if (code) {
648                 lock_ReleaseMutex(&bufp->mx);
649                 buf_Release(bufp);
650                 buf_UnreserveBuffers(cm_chunkSize / buf_bufferSize);
651                 return code;
652             }   
653                         
654             /* if the buffer is dirty, we're done */
655             if (bufp->flags & CM_BUF_DIRTY) {
656                 osi_assertx(!(bufp->flags & CM_BUF_WRITING),
657                             "WRITING w/o CMSTORING in SetupStoreBIOD");
658                 bufp->flags |= CM_BUF_WRITING;
659                 break;
660             }
661
662             /* this buffer is clean, so there's no reason to process it */
663             cm_SyncOpDone(scp, bufp, flags);
664             lock_ReleaseMutex(&bufp->mx);
665             buf_Release(bufp);
666         }       
667     }
668
669     biop->reserved = 1;
670         
671     /* if we get here, if bufp is null, we didn't find any dirty buffers
672      * that weren't already being stored back, so we just quit now.
673      */
674     if (!bufp) {
675         return 0;
676     }
677
678     /* don't need buffer mutex any more */
679     lock_ReleaseMutex(&bufp->mx);
680         
681     /* put this element in the list */
682     qdp = osi_QDAlloc();
683     osi_SetQData(qdp, bufp);
684     /* don't have to hold bufp, since held by buf_Find above */
685     osi_QAddH((osi_queue_t **) &biop->bufListp,
686               (osi_queue_t **) &biop->bufListEndp,
687               &qdp->q);
688     biop->length = buf_bufferSize;
689     firstModOffset = bufp->offset;
690     biop->offset = firstModOffset;
691
692     /* compute the window surrounding *inOffsetp of size cm_chunkSize */
693     scanStart = *inOffsetp;
694     scanStart.LowPart &= (-cm_chunkSize);
695     thyper.LowPart = cm_chunkSize;
696     thyper.HighPart = 0;
697     scanEnd = LargeIntegerAdd(scanStart, thyper);
698
699     flags = CM_SCACHESYNC_NEEDCALLBACK
700         | CM_SCACHESYNC_GETSTATUS
701         | CM_SCACHESYNC_STOREDATA
702         | CM_SCACHESYNC_BUFLOCKED
703         | CM_SCACHESYNC_NOWAIT;
704
705     /* start by looking backwards until scanStart */
706     thyper.HighPart = 0;                /* hyper version of buf_bufferSize */
707     thyper.LowPart = buf_bufferSize;
708     tbase = LargeIntegerSubtract(firstModOffset, thyper);
709     while(LargeIntegerGreaterThanOrEqualTo(tbase, scanStart)) {
710         /* see if we can find the buffer */
711         bufp = buf_Find(scp, &tbase);
712         if (!bufp) 
713             break;
714
715         /* try to lock it, and quit if we can't (simplifies locking) */
716         code = lock_TryMutex(&bufp->mx);
717         if (code == 0) {
718             buf_Release(bufp);
719             break;
720         }
721                 
722         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
723         if (code) {
724             lock_ReleaseMutex(&bufp->mx);
725             buf_Release(bufp);
726             break;
727         }
728                 
729         if (!(bufp->flags & CM_BUF_DIRTY)) {
730             /* buffer is clean, so we shouldn't add it */
731             cm_SyncOpDone(scp, bufp, flags);
732             lock_ReleaseMutex(&bufp->mx);
733             buf_Release(bufp);
734             break;
735         }
736
737         /* don't need buffer mutex any more */
738         lock_ReleaseMutex(&bufp->mx);
739
740         /* we have a dirty buffer ready for storing.  Add it to the tail
741          * of the list, since it immediately precedes all of the disk
742          * addresses we've already collected.
743          */
744         qdp = osi_QDAlloc();
745         osi_SetQData(qdp, bufp);
746         /* no buf_hold necessary, since we have it held from buf_Find */
747         osi_QAddT((osi_queue_t **) &biop->bufListp,
748                   (osi_queue_t **) &biop->bufListEndp,
749                   &qdp->q);
750
751         /* update biod info describing the transfer */
752         biop->offset = LargeIntegerSubtract(biop->offset, thyper);
753         biop->length += buf_bufferSize;
754
755         /* update loop pointer */
756         tbase = LargeIntegerSubtract(tbase, thyper);
757     }   /* while loop looking for pages preceding the one we found */
758
759     /* now, find later dirty, contiguous pages, and add them to the list */
760     thyper.HighPart = 0;                /* hyper version of buf_bufferSize */
761     thyper.LowPart = buf_bufferSize;
762     tbase = LargeIntegerAdd(firstModOffset, thyper);
763     while(LargeIntegerLessThan(tbase, scanEnd)) {
764         /* see if we can find the buffer */
765         bufp = buf_Find(scp, &tbase);
766         if (!bufp) 
767             break;
768
769         /* try to lock it, and quit if we can't (simplifies locking) */
770         code = lock_TryMutex(&bufp->mx);
771         if (code == 0) {
772             buf_Release(bufp);
773             break;
774         }
775
776         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
777         if (code) {
778             lock_ReleaseMutex(&bufp->mx);
779             buf_Release(bufp);
780             break;
781         }
782                 
783         if (!(bufp->flags & CM_BUF_DIRTY)) {
784             /* buffer is clean, so we shouldn't add it */
785             cm_SyncOpDone(scp, bufp, flags);
786             lock_ReleaseMutex(&bufp->mx);
787             buf_Release(bufp);
788             break;
789         }
790
791         /* don't need buffer mutex any more */
792         lock_ReleaseMutex(&bufp->mx);
793
794         /* we have a dirty buffer ready for storing.  Add it to the head
795          * of the list, since it immediately follows all of the disk
796          * addresses we've already collected.
797          */
798         qdp = osi_QDAlloc();
799         osi_SetQData(qdp, bufp);
800         /* no buf_hold necessary, since we have it held from buf_Find */
801         osi_QAddH((osi_queue_t **) &biop->bufListp,
802                   (osi_queue_t **) &biop->bufListEndp,
803                   &qdp->q);
804
805         /* update biod info describing the transfer */
806         biop->length += buf_bufferSize;
807                 
808         /* update loop pointer */
809         tbase = LargeIntegerAdd(tbase, thyper);
810     }   /* while loop looking for pages following the first page we found */
811         
812     /* finally, we're done */
813     return 0;
814 }
815
816 /* scp must be locked; temporarily unlocked during processing.
817  * If returns 0, returns buffers held in biop, and with
818  * CM_BUF_CMFETCHING flags set.
819  * If an error is returned, we don't return any buffers.
820  */
821 long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
822                         cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp)
823 {
824     long code;
825     cm_buf_t *tbp;
826     osi_hyper_t toffset;                /* a long long temp variable */
827     osi_hyper_t pageBase;               /* base offset we're looking at */
828     osi_queueData_t *qdp;               /* one temp queue structure */
829     osi_queueData_t *tqdp;              /* another temp queue structure */
830     long collected;                     /* how many bytes have been collected */
831     int isFirst;
832     long flags;
833     osi_hyper_t fileSize;               /* the # of bytes in the file */
834     osi_queueData_t *heldBufListp;      /* we hold all buffers in this list */
835     osi_queueData_t *heldBufListEndp;   /* first one */
836     int reserving;
837
838     biop->scp = scp;
839     biop->offset = *offsetp;
840     /* null out the list of buffers */
841     biop->bufListp = biop->bufListEndp = NULL;
842     biop->reserved = 0;
843
844     /* first lookup the file's length, so we know when to stop */
845     code = cm_SyncOp(scp, NULL, up, reqp, 0, 
846                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
847     if (code) 
848         return code;
849         
850     /* copy out size, since it may change */
851     fileSize = scp->serverLength;
852         
853     lock_ReleaseMutex(&scp->mx);
854
855     pageBase = *offsetp;
856     collected = pageBase.LowPart & (cm_chunkSize - 1);
857     heldBufListp = NULL;
858     heldBufListEndp = NULL;
859
860     /*
861      * Obtaining buffers can cause dirty buffers to be recycled, which
862      * can cause a storeback, so cannot be done while we have buffers
863      * reserved.
864      *
865      * To get around this, we get buffers twice.  Before reserving buffers,
866      * we obtain and release each one individually.  After reserving
867      * buffers, we try to obtain them again, but only by lookup, not by
868      * recycling.  If a buffer has gone away while we were waiting for
869      * the others, we just use whatever buffers we already have.
870      *
871      * On entry to this function, we are already holding a buffer, so we
872      * can't wait for reservation.  So we call buf_TryReserveBuffers()
873      * instead.  Not only that, we can't really even call buf_Get(), for
874      * the same reason.  We can't avoid that, though.  To avoid deadlock
875      * we allow only one thread to be executing the buf_Get()-buf_Release()
876      * sequence at a time.
877      */
878
879     /* first hold all buffers, since we can't hold any locks in buf_Get */
880     while (1) {
881         /* stop at chunk boundary */
882         if (collected >= cm_chunkSize) break;
883                 
884         /* see if the next page would be past EOF */
885         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) break;
886
887         lock_ObtainMutex(&cm_bufGetMutex);
888
889         code = buf_Get(scp, &pageBase, &tbp);
890         if (code) {
891             lock_ReleaseMutex(&cm_bufGetMutex);
892             lock_ObtainMutex(&scp->mx);
893             return code;
894         }
895                 
896         buf_Release(tbp);
897
898         lock_ReleaseMutex(&cm_bufGetMutex);
899
900         toffset.HighPart = 0;
901         toffset.LowPart = buf_bufferSize;
902         pageBase = LargeIntegerAdd(toffset, pageBase);
903         collected += buf_bufferSize;
904     }
905
906     /* reserve a chunk's worth of buffers if possible */
907     reserving = buf_TryReserveBuffers(cm_chunkSize / buf_bufferSize);
908
909     pageBase = *offsetp;
910     collected = pageBase.LowPart & (cm_chunkSize - 1);
911
912     /* now hold all buffers, if they are still there */
913     while (1) {
914         /* stop at chunk boundary */
915         if (collected >= cm_chunkSize) 
916             break;
917                 
918         /* see if the next page would be past EOF */
919         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
920             break;
921
922         tbp = buf_Find(scp, &pageBase);
923         if (!tbp) 
924             break;
925
926         /* add the buffer to the list */
927         qdp = osi_QDAlloc();
928         osi_SetQData(qdp, tbp);
929         osi_QAdd((osi_queue_t **)&heldBufListp, &qdp->q);
930         if (!heldBufListEndp) heldBufListEndp = qdp;
931         /* leave tbp held (from buf_Get) */
932
933         if (!reserving) 
934             break;
935
936         collected += buf_bufferSize;
937         toffset.HighPart = 0;
938         toffset.LowPart = buf_bufferSize;
939         pageBase = LargeIntegerAdd(toffset, pageBase);
940     }
941
942     /* look at each buffer, adding it into the list if it looks idle and
943      * filled with old data.  One special case: wait for idle if it is the
944      * first buffer since we really need that one for our caller to make
945      * any progress.
946      */
947     isFirst = 1;
948     collected = 0;              /* now count how many we'll really use */
949     for (tqdp = heldBufListEndp;
950         tqdp;
951           tqdp = (osi_queueData_t *) osi_QPrev(&tqdp->q)) {
952         /* get a ptr to the held buffer */
953         tbp = osi_GetQData(tqdp);
954         pageBase = tbp->offset;
955
956         /* now lock the buffer lock */
957         lock_ObtainMutex(&tbp->mx);
958         lock_ObtainMutex(&scp->mx);
959
960         /* don't bother fetching over data that is already current */
961         if (tbp->dataVersion == scp->dataVersion) {
962             /* we don't need this buffer, since it is current */
963             lock_ReleaseMutex(&scp->mx);
964             lock_ReleaseMutex(&tbp->mx);
965             break;
966         }
967
968         flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_FETCHDATA
969             | CM_SCACHESYNC_BUFLOCKED;
970         if (!isFirst) 
971             flags |= CM_SCACHESYNC_NOWAIT;
972
973         /* wait for the buffer to serialize, if required.  Doesn't
974          * release the scp or buffer lock(s) if NOWAIT is specified.
975          */
976         code = cm_SyncOp(scp, tbp, up, reqp, 0, flags);
977         if (code) {
978             lock_ReleaseMutex(&scp->mx);
979             lock_ReleaseMutex(&tbp->mx);
980             break;
981         }
982                 
983         /* don't fetch over dirty buffers */
984         if (tbp->flags & CM_BUF_DIRTY) {
985             cm_SyncOpDone(scp, tbp, flags);
986             lock_ReleaseMutex(&scp->mx);
987             lock_ReleaseMutex(&tbp->mx);
988             break;
989         }
990
991         /* Release locks */
992         lock_ReleaseMutex(&scp->mx);
993         lock_ReleaseMutex(&tbp->mx);
994
995         /* add the buffer to the list */
996         qdp = osi_QDAlloc();
997         osi_SetQData(qdp, tbp);
998         osi_QAdd((osi_queue_t **)&biop->bufListp, &qdp->q);
999         if (!biop->bufListEndp) 
1000             biop->bufListEndp = qdp;
1001         buf_Hold(tbp);
1002
1003         /* from now on, a failure just stops our collection process, but
1004          * we still do the I/O to whatever we've already managed to collect.
1005          */
1006         isFirst = 0;
1007         collected += buf_bufferSize;
1008     }
1009         
1010     /* now, we've held in biop->bufListp all the buffer's we're really
1011      * interested in.  We also have holds left from heldBufListp, and we
1012      * now release those holds on the buffers.
1013      */
1014     for (qdp = heldBufListp; qdp; qdp = tqdp) {
1015         tqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1016         tbp = osi_GetQData(qdp);
1017         osi_QDFree(qdp);
1018         buf_Release(tbp);
1019     }
1020
1021     /* Caller expects this */
1022     lock_ObtainMutex(&scp->mx);
1023  
1024     /* if we got a failure setting up the first buffer, then we don't have
1025      * any side effects yet, and we also have failed an operation that the
1026      * caller requires to make any progress.  Give up now.
1027      */
1028     if (code && isFirst) {
1029         buf_UnreserveBuffers(cm_chunkSize / buf_bufferSize);
1030         return code;
1031     }
1032         
1033     /* otherwise, we're still OK, and should just return the I/O setup we've
1034      * got.
1035      */
1036     biop->length = collected;
1037     biop->reserved = reserving;
1038     return 0;
1039 }
1040
1041 /* release a bulk I/O structure that was setup by cm_SetupFetchBIOD or by
1042  * cm_SetupStoreBIOD
1043  */
1044 void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore)
1045 {
1046     cm_scache_t *scp;
1047     cm_buf_t *bufp;
1048     osi_queueData_t *qdp;
1049     osi_queueData_t *nqdp;
1050     int flags;
1051
1052     /* Give back reserved buffers */
1053     if (biop->reserved)
1054         buf_UnreserveBuffers(cm_chunkSize / buf_bufferSize);
1055         
1056     flags = CM_SCACHESYNC_NEEDCALLBACK;
1057     if (isStore)
1058         flags |= CM_SCACHESYNC_STOREDATA;
1059     else
1060         flags |= CM_SCACHESYNC_FETCHDATA;
1061
1062     scp = biop->scp;
1063     for(qdp = biop->bufListp; qdp; qdp = nqdp) {
1064         /* lookup next guy first, since we're going to free this one */
1065         nqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1066                 
1067         /* extract buffer and free queue data */
1068         bufp = osi_GetQData(qdp);
1069         osi_QDFree(qdp);
1070
1071         /* now, mark I/O as done, unlock the buffer and release it */
1072         lock_ObtainMutex(&bufp->mx);
1073         lock_ObtainMutex(&scp->mx);
1074         cm_SyncOpDone(scp, bufp, flags);
1075         lock_ReleaseMutex(&scp->mx);
1076                 
1077         /* turn off writing and wakeup users */
1078         if (isStore) {
1079             if (bufp->flags & CM_BUF_WAITING) {
1080                 osi_Wakeup((long) bufp);
1081             }
1082             bufp->flags &= ~(CM_BUF_WAITING | CM_BUF_WRITING | CM_BUF_DIRTY);
1083         }
1084
1085         lock_ReleaseMutex(&bufp->mx);
1086         buf_Release(bufp);
1087     }
1088
1089     /* clean things out */
1090     biop->bufListp = NULL;
1091     biop->bufListEndp = NULL;
1092 }   
1093
1094 /* Fetch a buffer.  Called with scp locked.
1095  * The scp is locked on return.
1096  */
1097 long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *up,
1098                   cm_req_t *reqp)
1099 {
1100     long code;
1101     long nbytes;                        /* bytes in transfer */
1102     long rbytes;                        /* bytes in rx_Read call */
1103     long temp;
1104     AFSFetchStatus afsStatus;
1105     AFSCallBack callback;
1106     AFSVolSync volSync;
1107     char *bufferp;
1108     cm_buf_t *tbufp;            /* buf we're filling */
1109     osi_queueData_t *qdp;               /* q element we're scanning */
1110     AFSFid tfid;
1111     struct rx_call *callp;
1112     cm_bulkIO_t biod;           /* bulk IO descriptor */
1113     cm_conn_t *connp;
1114     int getroot;
1115     long t1, t2;
1116
1117     /* now, the buffer may or may not be filled with good data (buf_GetNew
1118      * drops lots of locks, and may indeed return a properly initialized
1119      * buffer, although more likely it will just return a new, empty, buffer.
1120      */
1121
1122 #ifdef AFS_FREELANCE_CLIENT
1123
1124     // yj: if they're trying to get the /afs directory, we need to
1125     // handle it differently, since it's local rather than on any
1126     // server
1127
1128     getroot = (scp==cm_rootSCachep);
1129     if (getroot)
1130         osi_Log1(afsd_logp,"GetBuffer returns cm_rootSCachep=%x",cm_rootSCachep);
1131 #endif
1132
1133     cm_AFSFidFromFid(&tfid, &scp->fid);
1134
1135     code = cm_SetupFetchBIOD(scp, &bufp->offset, &biod, up, reqp);
1136     if (code) {
1137         /* couldn't even get the first page setup properly */
1138         osi_Log1(afsd_logp, "SetupFetchBIOD failure code %d", code);
1139         return code;
1140     }
1141
1142     /* once we get here, we have the callback in place, we know that no one
1143      * is fetching the data now.  Check one last time that we still have
1144      * the wrong data, and then fetch it if we're still wrong.
1145      *
1146      * We can lose a race condition and end up with biod.length zero, in
1147      * which case we just retry.
1148      */
1149     if (bufp->dataVersion == scp->dataVersion || biod.length == 0) {
1150         osi_Log3(afsd_logp, "Bad DVs %d, %d or length 0x%x",
1151                  bufp->dataVersion, scp->dataVersion, biod.length);
1152         if ((bufp->dataVersion == -1
1153              || bufp->dataVersion < scp->dataVersion)
1154              && LargeIntegerGreaterThanOrEqualTo(bufp->offset,
1155                                                  scp->serverLength)) {
1156             if (bufp->dataVersion == -1)
1157                 memset(bufp->datap, 0, buf_bufferSize);
1158             bufp->dataVersion = scp->dataVersion;
1159         }
1160         lock_ReleaseMutex(&scp->mx);
1161         cm_ReleaseBIOD(&biod, 0);
1162         lock_ObtainMutex(&scp->mx);
1163         return 0;
1164     }
1165         
1166     lock_ReleaseMutex(&scp->mx);
1167
1168 #ifdef DISKCACHE95
1169     DPRINTF("cm_GetBuffer: fetching data scpDV=%d bufDV=%d scp=%x bp=%x dcp=%x\n",
1170             scp->dataVersion, bufp->dataVersion, scp, bufp, bufp->dcp);
1171 #endif /* DISKCACHE95 */
1172
1173 #ifdef AFS_FREELANCE_CLIENT
1174
1175     // yj code
1176     // if getroot then we don't need to make any calls
1177     // just return fake data
1178         
1179     if (cm_freelanceEnabled && getroot) {
1180         // setup the fake status                        
1181         afsStatus.InterfaceVersion = 0x1;
1182         afsStatus.FileType = 0x2;
1183         afsStatus.LinkCount = scp->linkCount;
1184         afsStatus.Length = cm_fakeDirSize;
1185         afsStatus.DataVersion = scp->dataVersion;
1186         afsStatus.Author = 0x1;
1187         afsStatus.Owner = 0x0;
1188         afsStatus.CallerAccess = 0x9;
1189         afsStatus.AnonymousAccess = 0x9;
1190         afsStatus.UnixModeBits = 0x1ff;
1191         afsStatus.ParentVnode = 0x1;
1192         afsStatus.ParentUnique = 0x1;
1193         afsStatus.ResidencyMask = 0;
1194         afsStatus.ClientModTime = FakeFreelanceModTime;
1195         afsStatus.ServerModTime = FakeFreelanceModTime;
1196         afsStatus.Group = 0;
1197         afsStatus.SyncCounter = 0;
1198         afsStatus.dataVersionHigh = 0;
1199         
1200         // once we're done setting up the status info,
1201         // we just fill the buffer pages with fakedata
1202         // from cm_FakeRootDir. Extra pages are set to
1203         // 0. 
1204                 
1205         lock_ObtainMutex(&cm_Freelance_Lock);
1206         t1 = bufp->offset.LowPart;
1207         qdp = biod.bufListEndp;
1208         while (qdp) {
1209             tbufp = osi_GetQData(qdp);
1210             bufferp=tbufp->datap;
1211             memset(bufferp, 0, buf_bufferSize);
1212             t2 = cm_fakeDirSize - t1;
1213             if (t2>buf_bufferSize) t2=buf_bufferSize;
1214             if (t2 > 0) {
1215                 memcpy(bufferp, cm_FakeRootDir+t1, t2);
1216             } else {
1217                 t2 = 0;
1218             }
1219             t1+=t2;
1220             qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1221
1222         }
1223         lock_ReleaseMutex(&cm_Freelance_Lock);
1224         
1225         // once we're done, we skip over the part of the
1226         // code that does the ACTUAL fetching of data for
1227         // real files
1228
1229         goto fetchingcompleted;
1230     }
1231
1232 #endif /* AFS_FREELANCE_CLIENT */
1233
1234         /* now make the call */
1235     do {
1236         code = cm_Conn(&scp->fid, up, reqp, &connp);
1237         if (code) 
1238             continue;
1239                 
1240         lock_ObtainMutex(&connp->mx);
1241         callp = rx_NewCall(connp->callp);
1242         lock_ReleaseMutex(&connp->mx);
1243
1244         osi_Log3(afsd_logp, "CALL FetchData vp %x, off 0x%x, size 0x%x",
1245                   (long) scp, biod.offset.LowPart, biod.length);
1246
1247         code = StartRXAFS_FetchData(callp, &tfid, biod.offset.LowPart,
1248                                     biod.length);
1249
1250         /* now copy the data out of the pipe and put it in the buffer */
1251         temp  = rx_Read(callp, (char *)&nbytes, 4);
1252         if (temp == 4) {
1253             nbytes = ntohl(nbytes);
1254             if (nbytes > biod.length) 
1255                 code = (callp->error < 0) ? callp->error : -1;
1256         }
1257         else 
1258             code = (callp->error < 0) ? callp->error : -1;
1259
1260         if (code == 0) {
1261             qdp = biod.bufListEndp;
1262             if (qdp) {
1263                 tbufp = osi_GetQData(qdp);
1264                 bufferp = tbufp->datap;
1265             }
1266             else 
1267                 bufferp = NULL;
1268             /* fill nbytes of data from the pipe into the pages.
1269              * When we stop, qdp will point at the last page we're
1270              * dealing with, and bufferp will tell us where we
1271              * stopped.  We'll need this info below when we clear
1272              * the remainder of the last page out (and potentially
1273              * clear later pages out, if we fetch past EOF).
1274              */
1275             while (nbytes > 0) {
1276                 /* assert that there are still more buffers;
1277                  * our check above for nbytes being less than
1278                  * biod.length should ensure this.
1279                  */
1280                 osi_assert(bufferp != NULL);
1281
1282                 /* read rbytes of data */
1283                 rbytes = (nbytes > buf_bufferSize? buf_bufferSize : nbytes);
1284                 temp = rx_Read(callp, bufferp, rbytes);
1285                 if (temp < rbytes) {
1286                     code = (callp->error < 0) ? callp->error : -1;
1287                     break;
1288                 }
1289
1290                 /* allow read-while-fetching.
1291                  * if this is the last buffer, clear the
1292                  * PREFETCHING flag, so the reader waiting for
1293                  * this buffer will start a prefetch.
1294                  */
1295                 tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
1296                 lock_ObtainMutex(&scp->mx);
1297                 if (scp->flags & CM_SCACHEFLAG_WAITING) {
1298                     scp->flags &= ~CM_SCACHEFLAG_WAITING;
1299                     osi_Wakeup((long) &scp->flags);
1300                 }
1301                 if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
1302                     *cpffp = 1;
1303                     cm_ClearPrefetchFlag(0, scp, &biod.offset);
1304                 }
1305                 lock_ReleaseMutex(&scp->mx);
1306
1307                 /* and adjust counters */
1308                 nbytes -= temp;
1309
1310                 /* and move to the next buffer */
1311                 if (nbytes != 0) {
1312                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1313                     if (qdp) {
1314                         tbufp = osi_GetQData(qdp);
1315                         bufferp = tbufp->datap;
1316                     }
1317                     else 
1318                         bufferp = NULL;
1319                 } else 
1320                     bufferp += temp;
1321             }
1322
1323             /* zero out remainder of last pages, in case we are
1324              * fetching past EOF.  We were fetching an integral #
1325              * of pages, but stopped, potentially in the middle of
1326              * a page.  Zero the remainder of that page, and then
1327              * all of the rest of the pages.
1328              */
1329             /* bytes fetched */
1330             rbytes = bufferp - tbufp->datap;
1331             /* bytes left to zero */
1332             rbytes = buf_bufferSize - rbytes;
1333             while(qdp) {
1334                 if (rbytes != 0)
1335                     memset(bufferp, 0, rbytes);
1336                 qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1337                 if (qdp == NULL) 
1338                     break;
1339                 tbufp = osi_GetQData(qdp);
1340                 bufferp = tbufp->datap;
1341                 /* bytes to clear in this page */
1342                 rbytes = buf_bufferSize;
1343             }   
1344         }
1345
1346         if (code == 0)
1347             code = EndRXAFS_FetchData(callp, &afsStatus, &callback, &volSync);
1348         else
1349             osi_Log0(afsd_logp, "CALL EndRXAFS_FetchData skipped due to error");
1350         code = rx_EndCall(callp, code);
1351         if (code == RXKADUNKNOWNKEY)
1352             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
1353         osi_Log0(afsd_logp, "CALL FetchData DONE");
1354
1355     } while (cm_Analyze(connp, up, reqp, &scp->fid, &volSync, NULL, NULL, code));
1356
1357   fetchingcompleted:
1358     code = cm_MapRPCError(code, reqp);
1359
1360     lock_ObtainMutex(&scp->mx);
1361     /* we know that no one else has changed the buffer, since we still have
1362      * the fetching flag on the buffers, and we have the scp locked again.
1363      * Copy in the version # into the buffer if we got code 0 back from the
1364      * read.
1365      */
1366     if (code == 0) {
1367         for(qdp = biod.bufListp;
1368              qdp;
1369              qdp = (osi_queueData_t *) osi_QNext(&qdp->q)) {
1370             tbufp = osi_GetQData(qdp);
1371             tbufp->dataVersion = afsStatus.DataVersion;
1372
1373 #ifdef DISKCACHE95
1374             /* write buffer out to disk cache */
1375             diskcache_Update(tbufp->dcp, tbufp->datap, buf_bufferSize,
1376                               tbufp->dataVersion);
1377 #endif /* DISKCACHE95 */
1378         }
1379     }
1380
1381     /* release scatter/gather I/O structure (buffers, locks) */
1382     lock_ReleaseMutex(&scp->mx);
1383     cm_ReleaseBIOD(&biod, 0);
1384     lock_ObtainMutex(&scp->mx);
1385
1386     if (code == 0) 
1387         cm_MergeStatus(scp, &afsStatus, &volSync, up, 0);
1388     return code;
1389 }