windows-combined-20041010
[openafs.git] / src / WINNT / afsd / cm_dcache.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <afs/stds.h>
12
13 #ifndef DJGPP
14 #include <windows.h>
15 #include <winsock2.h>
16 #include <nb30.h>
17 #endif /* !DJGPP */
18 #ifdef COMMENT
19 #include <malloc.h>
20 #endif
21 #include <string.h>
22 #include <stdlib.h>
23 #include <osi.h>
24
25 #include "afsd.h"
26
27 #ifdef DEBUG
28 extern void afsi_log(char *pattern, ...);
29 #endif
30
31 osi_mutex_t cm_bufGetMutex;
32 #ifdef AFS_FREELANCE_CLIENT
33 extern osi_mutex_t cm_Freelance_Lock;
34 #endif
35
36 /* functions called back from the buffer package when reading or writing data,
37  * or when holding or releasing a vnode pointer.
38  */
39 long cm_BufWrite(void *vfidp, osi_hyper_t *offsetp, long length, long flags,
40                  cm_user_t *userp, cm_req_t *reqp)
41 {
42     /* store the data back from this buffer; the buffer is locked and held,
43      * but the vnode involved isn't locked, yet.  It is held by its
44      * reference from the buffer, which won't change until the buffer is
45      * released by our caller.  Thus, we don't have to worry about holding
46      * bufp->scp.
47      */
48     long code;
49     cm_fid_t *fidp = vfidp;
50     cm_scache_t *scp;
51     long nbytes;
52     long temp;
53     AFSFetchStatus outStatus;
54     AFSStoreStatus inStatus;
55     osi_hyper_t thyper;
56     AFSVolSync volSync;
57     AFSFid tfid;
58     struct rx_call *callp;
59     osi_queueData_t *qdp;
60     cm_buf_t *bufp;
61     long wbytes;
62     char *bufferp;
63     cm_conn_t *connp;
64     long truncPos;
65     cm_bulkIO_t biod;           /* bulk IO descriptor */
66
67     osi_assert(userp != NULL);
68
69     /* now, the buffer may or may not be filled with good data (buf_GetNew
70      * drops lots of locks, and may indeed return a properly initialized
71      * buffer, although more likely it will just return a new, empty, buffer.
72      */
73     scp = cm_FindSCache(fidp);
74     if (scp == NULL)
75         return CM_ERROR_NOSUCHFILE;     /* shouldn't happen */
76
77     cm_AFSFidFromFid(&tfid, fidp);
78
79     lock_ObtainMutex(&scp->mx);
80         
81     code = cm_SetupStoreBIOD(scp, offsetp, length, &biod, userp, reqp);
82     if (code) {
83         osi_Log1(afsd_logp, "cm_SetupStoreBIOD code %x", code);
84         lock_ReleaseMutex(&scp->mx);
85         cm_ReleaseSCache(scp);
86         return code;
87     }
88
89     if (biod.length == 0) {
90         osi_Log0(afsd_logp, "cm_SetupStoreBIOD length 0");
91         lock_ReleaseMutex(&scp->mx);
92         cm_ReleaseBIOD(&biod, 1);       /* should be a NOOP */
93         cm_ReleaseSCache(scp);
94         return 0;
95     }   
96
97     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
98     (void) cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL);
99
100     /* prepare the output status for the store */
101     scp->mask |= CM_SCACHEMASK_CLIENTMODTIME;
102     cm_StatusFromAttr(&inStatus, scp, NULL);
103     truncPos = scp->length.LowPart;
104     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
105          && scp->truncPos.LowPart < (unsigned long) truncPos)
106         truncPos = scp->truncPos.LowPart;
107         scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
108                 
109     /* compute how many bytes to write from this buffer */
110     thyper = LargeIntegerSubtract(scp->length, biod.offset);
111     if (LargeIntegerLessThanZero(thyper)) {
112         /* entire buffer is past EOF */
113         nbytes = 0;
114     }
115     else {
116         /* otherwise write out part of buffer before EOF, but not
117          * more than bufferSize bytes.
118          */
119         nbytes = thyper.LowPart;
120         if (nbytes > biod.length) 
121             nbytes = biod.length;
122     }
123
124     lock_ReleaseMutex(&scp->mx);
125         
126     /* now we're ready to do the store operation */
127     do {
128         code = cm_Conn(&scp->fid, userp, reqp, &connp);
129         if (code) 
130             continue;
131                 
132         callp = rx_NewCall(connp->callp);
133
134         osi_Log3(afsd_logp, "CALL StoreData vp %x, off 0x%x, size 0x%x",
135                  (long) scp, biod.offset.LowPart, nbytes);
136
137         lock_ObtainMutex(&connp->mx);
138         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
139                                     biod.offset.LowPart, nbytes, truncPos);
140
141         if (code == 0) {
142             /* write the data from the the list of buffers */
143             qdp = NULL;
144             while(nbytes > 0) {
145                 if (qdp == NULL)
146                     qdp = biod.bufListEndp;
147                 else
148                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
149                 osi_assert(qdp != NULL);
150                 bufp = osi_GetQData(qdp);
151                 bufferp = bufp->datap;
152                 wbytes = nbytes;
153                 if (wbytes > buf_bufferSize) 
154                     wbytes = buf_bufferSize;
155
156                 /* write out wbytes of data from bufferp */
157                 temp = rx_Write(callp, bufferp, wbytes);
158                 if (temp != wbytes) {
159                     osi_Log2(afsd_logp, "rx_Write failed %d != %d",temp,wbytes);
160                     code = -1;
161                     break;
162                 } else {
163                     osi_Log1(afsd_logp, "rx_Write succeeded %d",temp);
164                 }       
165                 nbytes -= wbytes;
166             }   /* while more bytes to write */
167         }               /* if RPC started successfully */
168         else {
169             osi_Log1(afsd_logp, "StartRXAFS_StoreData failed (%lX)",code);
170         }
171         if (code == 0) {
172             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
173             if (code)
174                 osi_Log1(afsd_logp, "EndRXAFS_StoreData failed (%lX)",code);
175         }
176         code = rx_EndCall(callp, code);
177         lock_ReleaseMutex(&connp->mx);
178
179         osi_Log0(afsd_logp, "CALL StoreData DONE");
180                 
181     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
182     code = cm_MapRPCError(code, reqp);
183         
184     /* now, clean up our state */
185     lock_ObtainMutex(&scp->mx);
186
187     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
188
189     if (code == 0) {
190         /* now, here's something a little tricky: in AFS 3, a dirty
191          * length can't be directly stored, instead, a dirty chunk is
192          * stored that sets the file's size (by writing and by using
193          * the truncate-first option in the store call).
194          *
195          * At this point, we've just finished a store, and so the trunc
196          * pos field is clean.  If the file's size at the server is at
197          * least as big as we think it should be, then we turn off the
198          * length dirty bit, since all the other dirty buffers must
199          * precede this one in the file.
200          *
201          * The file's desired size shouldn't be smaller than what's
202          * stored at the server now, since we just did the trunc pos
203          * store.
204          *
205          * We have to turn off the length dirty bit as soon as we can,
206          * so that we see updates made by other machines.
207          */
208         if (outStatus.Length >= scp->length.LowPart)
209             scp->mask &= ~CM_SCACHEMASK_LENGTH;
210         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
211     } else {
212         if (code == CM_ERROR_SPACE)
213             scp->flags |= CM_SCACHEFLAG_OUTOFSPACE;
214         else if (code == CM_ERROR_QUOTA)
215             scp->flags |= CM_SCACHEFLAG_OVERQUOTA;
216     }
217     lock_ReleaseMutex(&scp->mx);
218     cm_ReleaseBIOD(&biod, 1);
219     cm_ReleaseSCache(scp);
220
221     return code;
222 }
223
224 /*
225  * Truncate the file, by sending a StoreData RPC with zero length.
226  *
227  * Called with scp locked.  Releases and re-obtains the lock.
228  */
229 long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
230 {
231     AFSFetchStatus outStatus;
232     AFSStoreStatus inStatus;
233     AFSVolSync volSync;
234     AFSFid tfid;
235     long code;
236     long truncPos;
237     cm_conn_t *connp;
238     struct rx_call *callp;
239
240     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
241     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
242                      CM_SCACHESYNC_STOREDATA_EXCL);
243
244     /* prepare the output status for the store */
245     inStatus.Mask = AFS_SETMODTIME;
246     inStatus.ClientModTime = scp->clientModTime;
247     scp->mask &= ~CM_SCACHEMASK_CLIENTMODTIME;
248
249     /* calculate truncation position */
250     truncPos = scp->length.LowPart;
251     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
252          && scp->truncPos.LowPart < (unsigned long) truncPos)
253         truncPos = scp->truncPos.LowPart;
254     scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
255
256     lock_ReleaseMutex(&scp->mx);
257
258     cm_AFSFidFromFid(&tfid, &scp->fid);
259
260     /* now we're ready to do the store operation */
261     do {
262         code = cm_Conn(&scp->fid, userp, reqp, &connp);
263         if (code) 
264             continue;
265                 
266         callp = rx_NewCall(connp->callp);
267
268         lock_ObtainMutex(&connp->mx);
269         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
270                                     0, 0, truncPos);
271
272         if (code == 0)
273             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
274         code = rx_EndCall(callp, code);
275
276         lock_ReleaseMutex(&connp->mx);
277
278     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
279     code = cm_MapRPCError(code, reqp);
280         
281     /* now, clean up our state */
282     lock_ObtainMutex(&scp->mx);
283
284     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
285
286     if (code == 0) {
287         /*
288          * For explanation of handling of CM_SCACHEMASK_LENGTH,
289          * see cm_BufWrite().
290          */
291         if (outStatus.Length >= scp->length.LowPart)
292             scp->mask &= ~CM_SCACHEMASK_LENGTH;
293         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
294     }
295
296     return code;
297 }
298
299 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
300 {
301     *bytesReadp = buf_bufferSize;
302
303     /* now return a code that means that I/O is done */
304     return 0;
305 }
306
307 /* stabilize scache entry, and return with it locked so 
308  * it stays stable.
309  */
310 long cm_BufStabilize(void *parmp, cm_user_t *userp, cm_req_t *reqp)
311 {
312     cm_scache_t *scp;
313     long code;
314
315     scp = parmp;
316         
317     lock_ObtainMutex(&scp->mx);
318     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
319                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
320     if (code) {
321         lock_ReleaseMutex(&scp->mx);
322         return code;
323     }
324         
325     return 0;
326 }
327
328 /* undoes the work that cm_BufStabilize does: releases lock so things can change again */
329 long cm_BufUnstabilize(void *parmp, cm_user_t *userp)
330 {
331     cm_scache_t *scp;
332         
333     scp = parmp;
334         
335     lock_ReleaseMutex(&scp->mx);
336         
337     /* always succeeds */
338     return 0;
339 }
340
341 cm_buf_ops_t cm_bufOps = {
342     cm_BufWrite,
343     cm_BufRead,
344     cm_BufStabilize,
345     cm_BufUnstabilize
346 };
347
348 int cm_InitDCache(long chunkSize, long nbuffers)
349 {
350     lock_InitializeMutex(&cm_bufGetMutex, "buf_Get mutex");
351     if (nbuffers) 
352         buf_nbuffers = nbuffers;
353     return buf_Init(&cm_bufOps);
354 }
355
356 /* check to see if we have an up-to-date buffer.  The buffer must have
357  * previously been obtained by calling buf_Get.
358  *
359  * Make sure we have a callback, and that the dataversion matches.
360  *
361  * Scp must be locked.
362  *
363  * Bufp *may* be locked.
364  */
365 int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
366 {
367     int code;
368     if (!cm_HaveCallback(scp))
369         return 0;
370     if ((bufp->cmFlags
371           & (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
372          == (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
373         return 1;
374     if (bufp->dataVersion == scp->dataVersion)
375         return 1;
376     if (!isBufLocked) {
377         code = lock_TryMutex(&bufp->mx);
378         if (code == 0) {
379             /* don't have the lock, and can't lock it, then
380              * return failure.
381              */
382             return 0;
383         }
384     }
385
386     /* remember dirty flag for later */
387     code = bufp->flags & CM_BUF_DIRTY;
388
389     /* release lock if we obtained it here */
390     if (!isBufLocked) 
391         lock_ReleaseMutex(&bufp->mx);
392
393     /* if buffer was dirty, buffer is acceptable for use */
394     if (code) 
395         return 1;
396     else 
397         return 0;
398 }
399
400 /* used when deciding whether to do a prefetch or not */
401 long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, long length,
402                         cm_user_t *up, cm_req_t *reqp, osi_hyper_t *realBasep)
403 {
404     osi_hyper_t toffset;
405     osi_hyper_t tbase;
406     long code;
407     cm_buf_t *bp;
408     int stop;
409         
410     /* now scan all buffers in the range, looking for any that look like
411      * they need work.
412      */
413     tbase = *startBasep;
414     stop = 0;
415     lock_ObtainMutex(&scp->mx);
416     while(length > 0) {
417         /* get callback so we can do a meaningful dataVersion comparison */
418         code = cm_SyncOp(scp, NULL, up, reqp, 0,
419                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
420         if (code) {
421             scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
422             lock_ReleaseMutex(&scp->mx);
423             return code;
424         }
425                 
426         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
427             /* we're past the end of file */
428             break;
429         }
430
431         bp = buf_Find(scp, &tbase);
432         /* We cheat slightly by not locking the bp mutex. */
433         if (bp) {
434             if ((bp->cmFlags
435                   & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
436                  && bp->dataVersion != scp->dataVersion)
437                 stop = 1;
438             buf_Release(bp);
439         }
440         else 
441             stop = 1;
442
443         /* if this buffer is essentially guaranteed to require a fetch,
444          * break out here and return this position.
445          */
446         if (stop) 
447             break;
448                 
449         toffset.LowPart = buf_bufferSize;
450         toffset.HighPart = 0;
451         tbase = LargeIntegerAdd(toffset, tbase);
452         length -= buf_bufferSize;
453     }
454         
455     /* if we get here, either everything is fine or stop stopped us at a
456      * particular buffer in the range that definitely needs to be fetched.
457      */
458     if (stop == 0) {
459         /* return non-zero code since realBasep won't be valid */
460         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
461         code = -1;
462     }   
463     else {
464         /* successfully found a page that will need fetching */
465         *realBasep = tbase;
466         code = 0;
467     }
468     lock_ReleaseMutex(&scp->mx);
469     return code;
470 }
471
472 void cm_BkgStore(cm_scache_t *scp, long p1, long p2, long p3, long p4,
473                  cm_user_t *userp)
474 {
475     osi_hyper_t toffset;
476     long length;
477     cm_req_t req;
478
479     cm_InitReq(&req);
480     req.flags |= CM_REQ_NORETRY;
481
482     toffset.LowPart = p1;
483     toffset.HighPart = p2;
484     length = p3;
485
486     osi_Log2(afsd_logp, "Starting BKG store vp 0x%x, base 0x%x", scp, p1);
487
488     cm_BufWrite(&scp->fid, &toffset, length, /* flags */ 0, userp, &req);
489
490     lock_ObtainMutex(&scp->mx);
491     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
492     lock_ReleaseMutex(&scp->mx);
493 }
494
495 void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base)
496 {
497     osi_hyper_t thyper;
498
499     if (code == 0) {
500         thyper.LowPart = cm_chunkSize;
501         thyper.HighPart = 0;
502         thyper =  LargeIntegerAdd(*base, thyper);
503         thyper.LowPart &= (-cm_chunkSize);
504         if (LargeIntegerGreaterThan(*base, scp->prefetch.base))
505             scp->prefetch.base = *base;
506         if (LargeIntegerGreaterThan(thyper, scp->prefetch.end))
507             scp->prefetch.end = thyper;
508     }
509     scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
510 }
511
512 /* do the prefetch */
513 void cm_BkgPrefetch(cm_scache_t *scp, long p1, long p2, long p3, long p4,
514                     cm_user_t *userp)
515 {
516     long length;
517     osi_hyper_t base;
518     long code;
519     cm_buf_t *bp;
520     int cpff = 0;                       /* cleared prefetch flag */
521     cm_req_t req;
522
523     cm_InitReq(&req);
524     req.flags |= CM_REQ_NORETRY;
525         
526     base.LowPart = p1;
527     base.HighPart = p2;
528     length = p3;
529         
530     osi_Log2(afsd_logp, "Starting BKG prefetch vp 0x%x, base 0x%x", scp, p1);
531
532     code = buf_Get(scp, &base, &bp);
533
534     lock_ObtainMutex(&scp->mx);
535
536     if (code || (bp->cmFlags & CM_BUF_CMFETCHING)) {
537         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
538         lock_ReleaseMutex(&scp->mx);
539         return;
540     }
541
542     code = cm_GetBuffer(scp, bp, &cpff, userp, &req);
543     if (!cpff) 
544         cm_ClearPrefetchFlag(code, scp, &base);
545     lock_ReleaseMutex(&scp->mx);
546     buf_Release(bp);
547     return;
548 }
549
550 /* a read was issued to offsetp, and we have to determine whether we should
551  * do a prefetch.
552  */
553 void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp,
554                          cm_user_t *userp, cm_req_t *reqp)
555 {
556     long code;
557     osi_hyper_t realBase;
558     osi_hyper_t readBase;
559         
560     readBase = *offsetp;
561     /* round up to chunk boundary */
562     readBase.LowPart += (cm_chunkSize-1);
563     readBase.LowPart &= (-cm_chunkSize);
564
565     lock_ObtainMutex(&scp->mx);
566     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
567          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
568         lock_ReleaseMutex(&scp->mx);
569         return;
570     }
571     scp->flags |= CM_SCACHEFLAG_PREFETCHING;
572
573     /* start the scan at the latter of the end of this read or
574      * the end of the last fetched region.
575      */
576     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
577         readBase = scp->prefetch.end;
578
579     lock_ReleaseMutex(&scp->mx);
580
581     code = cm_CheckFetchRange(scp, &readBase, cm_chunkSize, userp, reqp,
582                               &realBase);
583     if (code) 
584         return; /* can't find something to prefetch */
585
586     osi_Log2(afsd_logp, "BKG Prefetch request vp 0x%x, base 0x%x",
587              scp, realBase.LowPart);
588
589     cm_QueueBKGRequest(scp, cm_BkgPrefetch, realBase.LowPart,
590                        realBase.HighPart, cm_chunkSize, 0, userp);
591 }
592
593 /* scp must be locked; temporarily unlocked during processing.
594  * If returns 0, returns buffers held in biop, and with
595  * CM_BUF_CMSTORING set.
596  *
597  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
598  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
599  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
600  * must be woken, and the event must be set when the I/O is done.  All of this
601  * is required so that buf_WaitIO synchronizes properly with the buffer as it
602  * is being written out.
603  */
604 long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
605                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
606 {
607     cm_buf_t *bufp;
608     osi_queueData_t *qdp;
609     osi_hyper_t thyper;
610     osi_hyper_t tbase;
611     osi_hyper_t scanStart;              /* where to start scan for dirty pages */
612     osi_hyper_t scanEnd;                /* where to stop scan for dirty pages */
613     osi_hyper_t firstModOffset; /* offset of first modified page in range */
614     long temp;
615     long code;
616     long flags;                 /* flags to cm_SyncOp */
617         
618     /* clear things out */
619     biop->scp = scp;            /* don't hold */
620     biop->offset = *inOffsetp;
621     biop->length = 0;
622     biop->bufListp = NULL;
623     biop->bufListEndp = NULL;
624     biop->reserved = 0;
625
626     /* reserve a chunk's worth of buffers */
627     lock_ReleaseMutex(&scp->mx);
628     buf_ReserveBuffers(cm_chunkSize / buf_bufferSize);
629     lock_ObtainMutex(&scp->mx);
630
631     bufp = NULL;
632     for (temp = 0; temp < inSize; temp += buf_bufferSize, bufp = NULL) {
633         thyper.HighPart = 0;
634         thyper.LowPart = temp;
635         tbase = LargeIntegerAdd(*inOffsetp, thyper);
636
637         bufp = buf_Find(scp, &tbase);
638         if (bufp) {
639             /* get buffer mutex and scp mutex safely */
640             lock_ReleaseMutex(&scp->mx);
641             lock_ObtainMutex(&bufp->mx);
642             lock_ObtainMutex(&scp->mx);
643
644             flags = CM_SCACHESYNC_NEEDCALLBACK
645                 | CM_SCACHESYNC_GETSTATUS
646                     | CM_SCACHESYNC_STOREDATA
647                         | CM_SCACHESYNC_BUFLOCKED;
648             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
649             if (code) {
650                 lock_ReleaseMutex(&bufp->mx);
651                 buf_Release(bufp);
652                 buf_UnreserveBuffers(cm_chunkSize / buf_bufferSize);
653                 return code;
654             }   
655                         
656             /* if the buffer is dirty, we're done */
657             if (bufp->flags & CM_BUF_DIRTY) {
658                 osi_assertx(!(bufp->flags & CM_BUF_WRITING),
659                             "WRITING w/o CMSTORING in SetupStoreBIOD");
660                 bufp->flags |= CM_BUF_WRITING;
661                 break;
662             }
663
664             /* this buffer is clean, so there's no reason to process it */
665             cm_SyncOpDone(scp, bufp, flags);
666             lock_ReleaseMutex(&bufp->mx);
667             buf_Release(bufp);
668         }       
669     }
670
671     biop->reserved = 1;
672         
673     /* if we get here, if bufp is null, we didn't find any dirty buffers
674      * that weren't already being stored back, so we just quit now.
675      */
676     if (!bufp) {
677         return 0;
678     }
679
680     /* don't need buffer mutex any more */
681     lock_ReleaseMutex(&bufp->mx);
682         
683     /* put this element in the list */
684     qdp = osi_QDAlloc();
685     osi_SetQData(qdp, bufp);
686     /* don't have to hold bufp, since held by buf_Find above */
687     osi_QAddH((osi_queue_t **) &biop->bufListp,
688               (osi_queue_t **) &biop->bufListEndp,
689               &qdp->q);
690     biop->length = buf_bufferSize;
691     firstModOffset = bufp->offset;
692     biop->offset = firstModOffset;
693
694     /* compute the window surrounding *inOffsetp of size cm_chunkSize */
695     scanStart = *inOffsetp;
696     scanStart.LowPart &= (-cm_chunkSize);
697     thyper.LowPart = cm_chunkSize;
698     thyper.HighPart = 0;
699     scanEnd = LargeIntegerAdd(scanStart, thyper);
700
701     flags = CM_SCACHESYNC_NEEDCALLBACK
702         | CM_SCACHESYNC_GETSTATUS
703         | CM_SCACHESYNC_STOREDATA
704         | CM_SCACHESYNC_BUFLOCKED
705         | CM_SCACHESYNC_NOWAIT;
706
707     /* start by looking backwards until scanStart */
708     thyper.HighPart = 0;                /* hyper version of buf_bufferSize */
709     thyper.LowPart = buf_bufferSize;
710     tbase = LargeIntegerSubtract(firstModOffset, thyper);
711     while(LargeIntegerGreaterThanOrEqualTo(tbase, scanStart)) {
712         /* see if we can find the buffer */
713         bufp = buf_Find(scp, &tbase);
714         if (!bufp) 
715             break;
716
717         /* try to lock it, and quit if we can't (simplifies locking) */
718         code = lock_TryMutex(&bufp->mx);
719         if (code == 0) {
720             buf_Release(bufp);
721             break;
722         }
723                 
724         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
725         if (code) {
726             lock_ReleaseMutex(&bufp->mx);
727             buf_Release(bufp);
728             break;
729         }
730                 
731         if (!(bufp->flags & CM_BUF_DIRTY)) {
732             /* buffer is clean, so we shouldn't add it */
733             cm_SyncOpDone(scp, bufp, flags);
734             lock_ReleaseMutex(&bufp->mx);
735             buf_Release(bufp);
736             break;
737         }
738
739         /* don't need buffer mutex any more */
740         lock_ReleaseMutex(&bufp->mx);
741
742         /* we have a dirty buffer ready for storing.  Add it to the tail
743          * of the list, since it immediately precedes all of the disk
744          * addresses we've already collected.
745          */
746         qdp = osi_QDAlloc();
747         osi_SetQData(qdp, bufp);
748         /* no buf_hold necessary, since we have it held from buf_Find */
749         osi_QAddT((osi_queue_t **) &biop->bufListp,
750                   (osi_queue_t **) &biop->bufListEndp,
751                   &qdp->q);
752
753         /* update biod info describing the transfer */
754         biop->offset = LargeIntegerSubtract(biop->offset, thyper);
755         biop->length += buf_bufferSize;
756
757         /* update loop pointer */
758         tbase = LargeIntegerSubtract(tbase, thyper);
759     }   /* while loop looking for pages preceding the one we found */
760
761     /* now, find later dirty, contiguous pages, and add them to the list */
762     thyper.HighPart = 0;                /* hyper version of buf_bufferSize */
763     thyper.LowPart = buf_bufferSize;
764     tbase = LargeIntegerAdd(firstModOffset, thyper);
765     while(LargeIntegerLessThan(tbase, scanEnd)) {
766         /* see if we can find the buffer */
767         bufp = buf_Find(scp, &tbase);
768         if (!bufp) 
769             break;
770
771         /* try to lock it, and quit if we can't (simplifies locking) */
772         code = lock_TryMutex(&bufp->mx);
773         if (code == 0) {
774             buf_Release(bufp);
775             break;
776         }
777
778         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
779         if (code) {
780             lock_ReleaseMutex(&bufp->mx);
781             buf_Release(bufp);
782             break;
783         }
784                 
785         if (!(bufp->flags & CM_BUF_DIRTY)) {
786             /* buffer is clean, so we shouldn't add it */
787             cm_SyncOpDone(scp, bufp, flags);
788             lock_ReleaseMutex(&bufp->mx);
789             buf_Release(bufp);
790             break;
791         }
792
793         /* don't need buffer mutex any more */
794         lock_ReleaseMutex(&bufp->mx);
795
796         /* we have a dirty buffer ready for storing.  Add it to the head
797          * of the list, since it immediately follows all of the disk
798          * addresses we've already collected.
799          */
800         qdp = osi_QDAlloc();
801         osi_SetQData(qdp, bufp);
802         /* no buf_hold necessary, since we have it held from buf_Find */
803         osi_QAddH((osi_queue_t **) &biop->bufListp,
804                   (osi_queue_t **) &biop->bufListEndp,
805                   &qdp->q);
806
807         /* update biod info describing the transfer */
808         biop->length += buf_bufferSize;
809                 
810         /* update loop pointer */
811         tbase = LargeIntegerAdd(tbase, thyper);
812     }   /* while loop looking for pages following the first page we found */
813         
814     /* finally, we're done */
815     return 0;
816 }
817
818 /* scp must be locked; temporarily unlocked during processing.
819  * If returns 0, returns buffers held in biop, and with
820  * CM_BUF_CMFETCHING flags set.
821  * If an error is returned, we don't return any buffers.
822  */
823 long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
824                         cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp)
825 {
826     long code;
827     cm_buf_t *tbp;
828     osi_hyper_t toffset;                /* a long long temp variable */
829     osi_hyper_t pageBase;               /* base offset we're looking at */
830     osi_queueData_t *qdp;               /* one temp queue structure */
831     osi_queueData_t *tqdp;              /* another temp queue structure */
832     long collected;                     /* how many bytes have been collected */
833     int isFirst;
834     long flags;
835     osi_hyper_t fileSize;               /* the # of bytes in the file */
836     osi_queueData_t *heldBufListp;      /* we hold all buffers in this list */
837     osi_queueData_t *heldBufListEndp;   /* first one */
838     int reserving;
839
840     biop->scp = scp;
841     biop->offset = *offsetp;
842     /* null out the list of buffers */
843     biop->bufListp = biop->bufListEndp = NULL;
844     biop->reserved = 0;
845
846     /* first lookup the file's length, so we know when to stop */
847     code = cm_SyncOp(scp, NULL, up, reqp, 0, 
848                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
849     if (code) 
850         return code;
851         
852     /* copy out size, since it may change */
853     fileSize = scp->serverLength;
854         
855     lock_ReleaseMutex(&scp->mx);
856
857     pageBase = *offsetp;
858     collected = pageBase.LowPart & (cm_chunkSize - 1);
859     heldBufListp = NULL;
860     heldBufListEndp = NULL;
861
862     /*
863      * Obtaining buffers can cause dirty buffers to be recycled, which
864      * can cause a storeback, so cannot be done while we have buffers
865      * reserved.
866      *
867      * To get around this, we get buffers twice.  Before reserving buffers,
868      * we obtain and release each one individually.  After reserving
869      * buffers, we try to obtain them again, but only by lookup, not by
870      * recycling.  If a buffer has gone away while we were waiting for
871      * the others, we just use whatever buffers we already have.
872      *
873      * On entry to this function, we are already holding a buffer, so we
874      * can't wait for reservation.  So we call buf_TryReserveBuffers()
875      * instead.  Not only that, we can't really even call buf_Get(), for
876      * the same reason.  We can't avoid that, though.  To avoid deadlock
877      * we allow only one thread to be executing the buf_Get()-buf_Release()
878      * sequence at a time.
879      */
880
881     /* first hold all buffers, since we can't hold any locks in buf_Get */
882     while (1) {
883         /* stop at chunk boundary */
884         if (collected >= cm_chunkSize) break;
885                 
886         /* see if the next page would be past EOF */
887         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) break;
888
889         lock_ObtainMutex(&cm_bufGetMutex);
890
891         code = buf_Get(scp, &pageBase, &tbp);
892         if (code) {
893             lock_ReleaseMutex(&cm_bufGetMutex);
894             lock_ObtainMutex(&scp->mx);
895             return code;
896         }
897                 
898         buf_Release(tbp);
899
900         lock_ReleaseMutex(&cm_bufGetMutex);
901
902         toffset.HighPart = 0;
903         toffset.LowPart = buf_bufferSize;
904         pageBase = LargeIntegerAdd(toffset, pageBase);
905         collected += buf_bufferSize;
906     }
907
908     /* reserve a chunk's worth of buffers if possible */
909     reserving = buf_TryReserveBuffers(cm_chunkSize / buf_bufferSize);
910
911     pageBase = *offsetp;
912     collected = pageBase.LowPart & (cm_chunkSize - 1);
913
914     /* now hold all buffers, if they are still there */
915     while (1) {
916         /* stop at chunk boundary */
917         if (collected >= cm_chunkSize) 
918             break;
919                 
920         /* see if the next page would be past EOF */
921         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
922             break;
923
924         tbp = buf_Find(scp, &pageBase);
925         if (!tbp) 
926             break;
927
928         /* add the buffer to the list */
929         qdp = osi_QDAlloc();
930         osi_SetQData(qdp, tbp);
931         osi_QAdd((osi_queue_t **)&heldBufListp, &qdp->q);
932         if (!heldBufListEndp) heldBufListEndp = qdp;
933         /* leave tbp held (from buf_Get) */
934
935         if (!reserving) 
936             break;
937
938         collected += buf_bufferSize;
939         toffset.HighPart = 0;
940         toffset.LowPart = buf_bufferSize;
941         pageBase = LargeIntegerAdd(toffset, pageBase);
942     }
943
944     /* look at each buffer, adding it into the list if it looks idle and
945      * filled with old data.  One special case: wait for idle if it is the
946      * first buffer since we really need that one for our caller to make
947      * any progress.
948      */
949     isFirst = 1;
950     collected = 0;              /* now count how many we'll really use */
951     for (tqdp = heldBufListEndp;
952         tqdp;
953           tqdp = (osi_queueData_t *) osi_QPrev(&tqdp->q)) {
954         /* get a ptr to the held buffer */
955         tbp = osi_GetQData(tqdp);
956         pageBase = tbp->offset;
957
958         /* now lock the buffer lock */
959         lock_ObtainMutex(&tbp->mx);
960         lock_ObtainMutex(&scp->mx);
961
962         /* don't bother fetching over data that is already current */
963         if (tbp->dataVersion == scp->dataVersion) {
964             /* we don't need this buffer, since it is current */
965             lock_ReleaseMutex(&scp->mx);
966             lock_ReleaseMutex(&tbp->mx);
967             break;
968         }
969
970         flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_FETCHDATA
971             | CM_SCACHESYNC_BUFLOCKED;
972         if (!isFirst) 
973             flags |= CM_SCACHESYNC_NOWAIT;
974
975         /* wait for the buffer to serialize, if required.  Doesn't
976          * release the scp or buffer lock(s) if NOWAIT is specified.
977          */
978         code = cm_SyncOp(scp, tbp, up, reqp, 0, flags);
979         if (code) {
980             lock_ReleaseMutex(&scp->mx);
981             lock_ReleaseMutex(&tbp->mx);
982             break;
983         }
984                 
985         /* don't fetch over dirty buffers */
986         if (tbp->flags & CM_BUF_DIRTY) {
987             cm_SyncOpDone(scp, tbp, flags);
988             lock_ReleaseMutex(&scp->mx);
989             lock_ReleaseMutex(&tbp->mx);
990             break;
991         }
992
993         /* Release locks */
994         lock_ReleaseMutex(&scp->mx);
995         lock_ReleaseMutex(&tbp->mx);
996
997         /* add the buffer to the list */
998         qdp = osi_QDAlloc();
999         osi_SetQData(qdp, tbp);
1000         osi_QAdd((osi_queue_t **)&biop->bufListp, &qdp->q);
1001         if (!biop->bufListEndp) 
1002             biop->bufListEndp = qdp;
1003         buf_Hold(tbp);
1004
1005         /* from now on, a failure just stops our collection process, but
1006          * we still do the I/O to whatever we've already managed to collect.
1007          */
1008         isFirst = 0;
1009         collected += buf_bufferSize;
1010     }
1011         
1012     /* now, we've held in biop->bufListp all the buffer's we're really
1013      * interested in.  We also have holds left from heldBufListp, and we
1014      * now release those holds on the buffers.
1015      */
1016     for (qdp = heldBufListp; qdp; qdp = tqdp) {
1017         tqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1018         tbp = osi_GetQData(qdp);
1019         osi_QDFree(qdp);
1020         buf_Release(tbp);
1021     }
1022
1023     /* Caller expects this */
1024     lock_ObtainMutex(&scp->mx);
1025  
1026     /* if we got a failure setting up the first buffer, then we don't have
1027      * any side effects yet, and we also have failed an operation that the
1028      * caller requires to make any progress.  Give up now.
1029      */
1030     if (code && isFirst) {
1031         buf_UnreserveBuffers(cm_chunkSize / buf_bufferSize);
1032         return code;
1033     }
1034         
1035     /* otherwise, we're still OK, and should just return the I/O setup we've
1036      * got.
1037      */
1038     biop->length = collected;
1039     biop->reserved = reserving;
1040     return 0;
1041 }
1042
1043 /* release a bulk I/O structure that was setup by cm_SetupFetchBIOD or by
1044  * cm_SetupStoreBIOD
1045  */
1046 void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore)
1047 {
1048     cm_scache_t *scp;
1049     cm_buf_t *bufp;
1050     osi_queueData_t *qdp;
1051     osi_queueData_t *nqdp;
1052     int flags;
1053
1054     /* Give back reserved buffers */
1055     if (biop->reserved)
1056         buf_UnreserveBuffers(cm_chunkSize / buf_bufferSize);
1057         
1058     flags = CM_SCACHESYNC_NEEDCALLBACK;
1059     if (isStore)
1060         flags |= CM_SCACHESYNC_STOREDATA;
1061     else
1062         flags |= CM_SCACHESYNC_FETCHDATA;
1063
1064     scp = biop->scp;
1065     for(qdp = biop->bufListp; qdp; qdp = nqdp) {
1066         /* lookup next guy first, since we're going to free this one */
1067         nqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1068                 
1069         /* extract buffer and free queue data */
1070         bufp = osi_GetQData(qdp);
1071         osi_QDFree(qdp);
1072
1073         /* now, mark I/O as done, unlock the buffer and release it */
1074         lock_ObtainMutex(&bufp->mx);
1075         lock_ObtainMutex(&scp->mx);
1076         cm_SyncOpDone(scp, bufp, flags);
1077         lock_ReleaseMutex(&scp->mx);
1078                 
1079         /* turn off writing and wakeup users */
1080         if (isStore) {
1081             if (bufp->flags & CM_BUF_WAITING) {
1082                 osi_Wakeup((long) bufp);
1083             }
1084             bufp->flags &= ~(CM_BUF_WAITING | CM_BUF_WRITING | CM_BUF_DIRTY);
1085         }
1086
1087         lock_ReleaseMutex(&bufp->mx);
1088         buf_Release(bufp);
1089     }
1090
1091     /* clean things out */
1092     biop->bufListp = NULL;
1093     biop->bufListEndp = NULL;
1094 }   
1095
1096 /* Fetch a buffer.  Called with scp locked.
1097  * The scp is locked on return.
1098  */
1099 long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *up,
1100                   cm_req_t *reqp)
1101 {
1102     long code;
1103     long nbytes;                        /* bytes in transfer */
1104     long rbytes;                        /* bytes in rx_Read call */
1105     long temp;
1106     AFSFetchStatus afsStatus;
1107     AFSCallBack callback;
1108     AFSVolSync volSync;
1109     char *bufferp;
1110     cm_buf_t *tbufp;            /* buf we're filling */
1111     osi_queueData_t *qdp;               /* q element we're scanning */
1112     AFSFid tfid;
1113     struct rx_call *callp;
1114     cm_bulkIO_t biod;           /* bulk IO descriptor */
1115     cm_conn_t *connp;
1116     int getroot;
1117     long t1, t2;
1118
1119     /* now, the buffer may or may not be filled with good data (buf_GetNew
1120      * drops lots of locks, and may indeed return a properly initialized
1121      * buffer, although more likely it will just return a new, empty, buffer.
1122      */
1123
1124 #ifdef AFS_FREELANCE_CLIENT
1125
1126     // yj: if they're trying to get the /afs directory, we need to
1127     // handle it differently, since it's local rather than on any
1128     // server
1129
1130     getroot = (scp==cm_rootSCachep);
1131     if (getroot)
1132         osi_Log1(afsd_logp,"GetBuffer returns cm_rootSCachep=%x",cm_rootSCachep);
1133 #endif
1134
1135     cm_AFSFidFromFid(&tfid, &scp->fid);
1136
1137     code = cm_SetupFetchBIOD(scp, &bufp->offset, &biod, up, reqp);
1138     if (code) {
1139         /* couldn't even get the first page setup properly */
1140         osi_Log1(afsd_logp, "SetupFetchBIOD failure code %d", code);
1141         return code;
1142     }
1143
1144     /* once we get here, we have the callback in place, we know that no one
1145      * is fetching the data now.  Check one last time that we still have
1146      * the wrong data, and then fetch it if we're still wrong.
1147      *
1148      * We can lose a race condition and end up with biod.length zero, in
1149      * which case we just retry.
1150      */
1151     if (bufp->dataVersion == scp->dataVersion || biod.length == 0) {
1152         osi_Log3(afsd_logp, "Bad DVs %d, %d or length 0x%x",
1153                  bufp->dataVersion, scp->dataVersion, biod.length);
1154         if ((bufp->dataVersion == -1
1155              || bufp->dataVersion < scp->dataVersion)
1156              && LargeIntegerGreaterThanOrEqualTo(bufp->offset,
1157                                                  scp->serverLength)) {
1158             if (bufp->dataVersion == -1)
1159                 memset(bufp->datap, 0, buf_bufferSize);
1160             bufp->dataVersion = scp->dataVersion;
1161         }
1162         lock_ReleaseMutex(&scp->mx);
1163         cm_ReleaseBIOD(&biod, 0);
1164         lock_ObtainMutex(&scp->mx);
1165         return 0;
1166     }
1167         
1168     lock_ReleaseMutex(&scp->mx);
1169
1170 #ifdef DISKCACHE95
1171     DPRINTF("cm_GetBuffer: fetching data scpDV=%d bufDV=%d scp=%x bp=%x dcp=%x\n",
1172             scp->dataVersion, bufp->dataVersion, scp, bufp, bufp->dcp);
1173 #endif /* DISKCACHE95 */
1174
1175 #ifdef AFS_FREELANCE_CLIENT
1176
1177     // yj code
1178     // if getroot then we don't need to make any calls
1179     // just return fake data
1180         
1181     if (cm_freelanceEnabled && getroot) {
1182         // setup the fake status                        
1183         afsStatus.InterfaceVersion = 0x1;
1184         afsStatus.FileType = 0x2;
1185         afsStatus.LinkCount = scp->linkCount;
1186         afsStatus.Length = cm_fakeDirSize;
1187         afsStatus.DataVersion = cm_fakeDirVersion;
1188         afsStatus.Author = 0x1;
1189         afsStatus.Owner = 0x0;
1190         afsStatus.CallerAccess = 0x9;
1191         afsStatus.AnonymousAccess = 0x9;
1192         afsStatus.UnixModeBits = 0x1ff;
1193         afsStatus.ParentVnode = 0x1;
1194         afsStatus.ParentUnique = 0x1;
1195         afsStatus.ResidencyMask = 0;
1196         afsStatus.ClientModTime = FakeFreelanceModTime;
1197         afsStatus.ServerModTime = FakeFreelanceModTime;
1198         afsStatus.Group = 0;
1199         afsStatus.SyncCounter = 0;
1200         afsStatus.dataVersionHigh = 0;
1201         
1202         // once we're done setting up the status info,
1203         // we just fill the buffer pages with fakedata
1204         // from cm_FakeRootDir. Extra pages are set to
1205         // 0. 
1206                 
1207         lock_ObtainMutex(&cm_Freelance_Lock);
1208         t1 = bufp->offset.LowPart;
1209         qdp = biod.bufListEndp;
1210         while (qdp) {
1211             tbufp = osi_GetQData(qdp);
1212             bufferp=tbufp->datap;
1213             memset(bufferp, 0, buf_bufferSize);
1214             t2 = cm_fakeDirSize - t1;
1215             if (t2>buf_bufferSize) t2=buf_bufferSize;
1216             if (t2 > 0) {
1217                 memcpy(bufferp, cm_FakeRootDir+t1, t2);
1218             } else {
1219                 t2 = 0;
1220             }
1221             t1+=t2;
1222             qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1223
1224         }
1225         lock_ReleaseMutex(&cm_Freelance_Lock);
1226         
1227         // once we're done, we skip over the part of the
1228         // code that does the ACTUAL fetching of data for
1229         // real files
1230
1231         goto fetchingcompleted;
1232     }
1233
1234 #endif /* AFS_FREELANCE_CLIENT */
1235
1236         /* now make the call */
1237     do {
1238         code = cm_Conn(&scp->fid, up, reqp, &connp);
1239         if (code) 
1240             continue;
1241                 
1242         lock_ObtainMutex(&connp->mx);
1243         callp = rx_NewCall(connp->callp);
1244
1245         osi_Log3(afsd_logp, "CALL FetchData vp %x, off 0x%x, size 0x%x",
1246                   (long) scp, biod.offset.LowPart, biod.length);
1247
1248         code = StartRXAFS_FetchData(callp, &tfid, biod.offset.LowPart,
1249                                     biod.length);
1250
1251         /* now copy the data out of the pipe and put it in the buffer */
1252         temp  = rx_Read(callp, (char *)&nbytes, 4);
1253         if (temp == 4) {
1254             nbytes = ntohl(nbytes);
1255             if (nbytes > biod.length) 
1256                 code = (callp->error < 0) ? callp->error : -1;
1257         }
1258         else 
1259             code = (callp->error < 0) ? callp->error : -1;
1260
1261         if (code == 0) {
1262             qdp = biod.bufListEndp;
1263             if (qdp) {
1264                 tbufp = osi_GetQData(qdp);
1265                 bufferp = tbufp->datap;
1266             }
1267             else 
1268                 bufferp = NULL;
1269             /* fill nbytes of data from the pipe into the pages.
1270              * When we stop, qdp will point at the last page we're
1271              * dealing with, and bufferp will tell us where we
1272              * stopped.  We'll need this info below when we clear
1273              * the remainder of the last page out (and potentially
1274              * clear later pages out, if we fetch past EOF).
1275              */
1276             while (nbytes > 0) {
1277                 /* assert that there are still more buffers;
1278                  * our check above for nbytes being less than
1279                  * biod.length should ensure this.
1280                  */
1281                 osi_assert(bufferp != NULL);
1282
1283                 /* read rbytes of data */
1284                 rbytes = (nbytes > buf_bufferSize? buf_bufferSize : nbytes);
1285                 temp = rx_Read(callp, bufferp, rbytes);
1286                 if (temp < rbytes) {
1287                     code = (callp->error < 0) ? callp->error : -1;
1288                     break;
1289                 }
1290
1291                 /* allow read-while-fetching.
1292                  * if this is the last buffer, clear the
1293                  * PREFETCHING flag, so the reader waiting for
1294                  * this buffer will start a prefetch.
1295                  */
1296                 tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
1297                 lock_ObtainMutex(&scp->mx);
1298                 if (scp->flags & CM_SCACHEFLAG_WAITING) {
1299                     scp->flags &= ~CM_SCACHEFLAG_WAITING;
1300                     osi_Wakeup((long) &scp->flags);
1301                 }
1302                 if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
1303                     *cpffp = 1;
1304                     cm_ClearPrefetchFlag(0, scp, &biod.offset);
1305                 }
1306                 lock_ReleaseMutex(&scp->mx);
1307
1308                 /* and adjust counters */
1309                 nbytes -= temp;
1310
1311                 /* and move to the next buffer */
1312                 if (nbytes != 0) {
1313                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1314                     if (qdp) {
1315                         tbufp = osi_GetQData(qdp);
1316                         bufferp = tbufp->datap;
1317                     }
1318                     else 
1319                         bufferp = NULL;
1320                 } else 
1321                     bufferp += temp;
1322             }
1323
1324             /* zero out remainder of last pages, in case we are
1325              * fetching past EOF.  We were fetching an integral #
1326              * of pages, but stopped, potentially in the middle of
1327              * a page.  Zero the remainder of that page, and then
1328              * all of the rest of the pages.
1329              */
1330             /* bytes fetched */
1331             rbytes = bufferp - tbufp->datap;
1332             /* bytes left to zero */
1333             rbytes = buf_bufferSize - rbytes;
1334             while(qdp) {
1335                 if (rbytes != 0)
1336                     memset(bufferp, 0, rbytes);
1337                 qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1338                 if (qdp == NULL) 
1339                     break;
1340                 tbufp = osi_GetQData(qdp);
1341                 bufferp = tbufp->datap;
1342                 /* bytes to clear in this page */
1343                 rbytes = buf_bufferSize;
1344             }   
1345         }
1346
1347         if (code == 0)
1348             code = EndRXAFS_FetchData(callp, &afsStatus, &callback, &volSync);
1349         else
1350             osi_Log0(afsd_logp, "CALL EndRXAFS_FetchData skipped due to error");
1351         code = rx_EndCall(callp, code);
1352         if (code == RXKADUNKNOWNKEY)
1353             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
1354         osi_Log0(afsd_logp, "CALL FetchData DONE");
1355
1356         lock_ReleaseMutex(&connp->mx);
1357
1358     } while (cm_Analyze(connp, up, reqp, &scp->fid, &volSync, NULL, NULL, code));
1359
1360   fetchingcompleted:
1361     code = cm_MapRPCError(code, reqp);
1362
1363     lock_ObtainMutex(&scp->mx);
1364     /* we know that no one else has changed the buffer, since we still have
1365      * the fetching flag on the buffers, and we have the scp locked again.
1366      * Copy in the version # into the buffer if we got code 0 back from the
1367      * read.
1368      */
1369     if (code == 0) {
1370         for(qdp = biod.bufListp;
1371              qdp;
1372              qdp = (osi_queueData_t *) osi_QNext(&qdp->q)) {
1373             tbufp = osi_GetQData(qdp);
1374             tbufp->dataVersion = afsStatus.DataVersion;
1375
1376 #ifdef DISKCACHE95
1377             /* write buffer out to disk cache */
1378             diskcache_Update(tbufp->dcp, tbufp->datap, buf_bufferSize,
1379                               tbufp->dataVersion);
1380 #endif /* DISKCACHE95 */
1381         }
1382     }
1383
1384     /* release scatter/gather I/O structure (buffers, locks) */
1385     lock_ReleaseMutex(&scp->mx);
1386     cm_ReleaseBIOD(&biod, 0);
1387     lock_ObtainMutex(&scp->mx);
1388
1389     if (code == 0) 
1390         cm_MergeStatus(scp, &afsStatus, &volSync, up, 0);
1391     return code;
1392 }