windows-buf-waiting-20050605
[openafs.git] / src / WINNT / afsd / cm_dcache.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <afs/stds.h>
12
13 #ifndef DJGPP
14 #include <windows.h>
15 #include <winsock2.h>
16 #include <nb30.h>
17 #endif /* !DJGPP */
18 #ifdef COMMENT
19 #include <malloc.h>
20 #endif
21 #include <string.h>
22 #include <stdlib.h>
23 #include <osi.h>
24
25 #include "afsd.h"
26
27 #ifdef DEBUG
28 extern void afsi_log(char *pattern, ...);
29 #endif
30
31 osi_mutex_t cm_bufGetMutex;
32 #ifdef AFS_FREELANCE_CLIENT
33 extern osi_mutex_t cm_Freelance_Lock;
34 #endif
35
36 /* functions called back from the buffer package when reading or writing data,
37  * or when holding or releasing a vnode pointer.
38  */
39 long cm_BufWrite(void *vfidp, osi_hyper_t *offsetp, long length, long flags,
40                  cm_user_t *userp, cm_req_t *reqp)
41 {
42     /* store the data back from this buffer; the buffer is locked and held,
43      * but the vnode involved isn't locked, yet.  It is held by its
44      * reference from the buffer, which won't change until the buffer is
45      * released by our caller.  Thus, we don't have to worry about holding
46      * bufp->scp.
47      */
48     long code;
49     cm_fid_t *fidp = vfidp;
50     cm_scache_t *scp;
51     long nbytes;
52     long temp;
53     AFSFetchStatus outStatus;
54     AFSStoreStatus inStatus;
55     osi_hyper_t thyper;
56     AFSVolSync volSync;
57     AFSFid tfid;
58     struct rx_call *callp;
59     struct rx_connection *rxconnp;
60     osi_queueData_t *qdp;
61     cm_buf_t *bufp;
62     long wbytes;
63     char *bufferp;
64     cm_conn_t *connp;
65     long truncPos;
66     cm_bulkIO_t biod;           /* bulk IO descriptor */
67
68     osi_assert(userp != NULL);
69
70     /* now, the buffer may or may not be filled with good data (buf_GetNew
71      * drops lots of locks, and may indeed return a properly initialized
72      * buffer, although more likely it will just return a new, empty, buffer.
73      */
74     scp = cm_FindSCache(fidp);
75     if (scp == NULL)
76         return CM_ERROR_NOSUCHFILE;     /* shouldn't happen */
77
78     cm_AFSFidFromFid(&tfid, fidp);
79
80     lock_ObtainMutex(&scp->mx);
81         
82     code = cm_SetupStoreBIOD(scp, offsetp, length, &biod, userp, reqp);
83     if (code) {
84         osi_Log1(afsd_logp, "cm_SetupStoreBIOD code %x", code);
85         lock_ReleaseMutex(&scp->mx);
86         cm_ReleaseSCache(scp);
87         return code;
88     }
89
90     if (biod.length == 0) {
91         osi_Log0(afsd_logp, "cm_SetupStoreBIOD length 0");
92         lock_ReleaseMutex(&scp->mx);
93         cm_ReleaseBIOD(&biod, 1);       /* should be a NOOP */
94         cm_ReleaseSCache(scp);
95         return 0;
96     }   
97
98     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
99     (void) cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL);
100
101     /* prepare the output status for the store */
102     scp->mask |= CM_SCACHEMASK_CLIENTMODTIME;
103     cm_StatusFromAttr(&inStatus, scp, NULL);
104     truncPos = scp->length.LowPart;
105     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
106          && scp->truncPos.LowPart < (unsigned long) truncPos)
107         truncPos = scp->truncPos.LowPart;
108         scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
109                 
110     /* compute how many bytes to write from this buffer */
111     thyper = LargeIntegerSubtract(scp->length, biod.offset);
112     if (LargeIntegerLessThanZero(thyper)) {
113         /* entire buffer is past EOF */
114         nbytes = 0;
115     }
116     else {
117         /* otherwise write out part of buffer before EOF, but not
118          * more than bufferSize bytes.
119          */
120         nbytes = thyper.LowPart;
121         if (nbytes > biod.length) 
122             nbytes = biod.length;
123     }
124
125     lock_ReleaseMutex(&scp->mx);
126         
127     /* now we're ready to do the store operation */
128     do {
129         code = cm_Conn(&scp->fid, userp, reqp, &connp);
130         if (code) 
131             continue;
132                 
133         rxconnp = cm_GetRxConn(connp);
134         callp = rx_NewCall(rxconnp);
135         rx_PutConnection(rxconnp);
136
137         osi_Log3(afsd_logp, "CALL StoreData scp 0x%x, off 0x%x, size 0x%x",
138                  (long) scp, biod.offset.LowPart, nbytes);
139
140         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
141                                     biod.offset.LowPart, nbytes, truncPos);
142
143         if (code == 0) {
144             /* write the data from the the list of buffers */
145             qdp = NULL;
146             while(nbytes > 0) {
147                 if (qdp == NULL)
148                     qdp = biod.bufListEndp;
149                 else
150                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
151                 osi_assert(qdp != NULL);
152                 bufp = osi_GetQData(qdp);
153                 bufferp = bufp->datap;
154                 wbytes = nbytes;
155                 if (wbytes > cm_data.buf_blockSize) 
156                     wbytes = cm_data.buf_blockSize;
157
158                 /* write out wbytes of data from bufferp */
159                 temp = rx_Write(callp, bufferp, wbytes);
160                 if (temp != wbytes) {
161                     osi_Log2(afsd_logp, "rx_Write failed %d != %d",temp,wbytes);
162                     code = -1;
163                     break;
164                 } else {
165                     osi_Log1(afsd_logp, "rx_Write succeeded %d",temp);
166                 }       
167                 nbytes -= wbytes;
168             }   /* while more bytes to write */
169         }               /* if RPC started successfully */
170         else {
171             osi_Log1(afsd_logp, "StartRXAFS_StoreData failed (%lX)",code);
172         }
173         if (code == 0) {
174             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
175             if (code)
176                 osi_Log1(afsd_logp, "EndRXAFS_StoreData failed (%lX)",code);
177         }
178         code = rx_EndCall(callp, code);
179                 
180     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
181     code = cm_MapRPCError(code, reqp);
182         
183     if (code)
184         osi_Log1(afsd_logp, "CALL StoreData FAILURE, code 0x%x", code);
185     else
186         osi_Log0(afsd_logp, "CALL StoreData SUCCESS");
187
188     /* now, clean up our state */
189     lock_ObtainMutex(&scp->mx);
190
191     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
192
193     if (code == 0) {
194         /* now, here's something a little tricky: in AFS 3, a dirty
195          * length can't be directly stored, instead, a dirty chunk is
196          * stored that sets the file's size (by writing and by using
197          * the truncate-first option in the store call).
198          *
199          * At this point, we've just finished a store, and so the trunc
200          * pos field is clean.  If the file's size at the server is at
201          * least as big as we think it should be, then we turn off the
202          * length dirty bit, since all the other dirty buffers must
203          * precede this one in the file.
204          *
205          * The file's desired size shouldn't be smaller than what's
206          * stored at the server now, since we just did the trunc pos
207          * store.
208          *
209          * We have to turn off the length dirty bit as soon as we can,
210          * so that we see updates made by other machines.
211          */
212         if (outStatus.Length >= scp->length.LowPart)
213             scp->mask &= ~CM_SCACHEMASK_LENGTH;
214         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
215     } else {
216         if (code == CM_ERROR_SPACE)
217             scp->flags |= CM_SCACHEFLAG_OUTOFSPACE;
218         else if (code == CM_ERROR_QUOTA)
219             scp->flags |= CM_SCACHEFLAG_OVERQUOTA;
220     }
221     lock_ReleaseMutex(&scp->mx);
222     cm_ReleaseBIOD(&biod, 1);
223     cm_ReleaseSCache(scp);
224
225     return code;
226 }
227
228 /*
229  * Truncate the file, by sending a StoreData RPC with zero length.
230  *
231  * Called with scp locked.  Releases and re-obtains the lock.
232  */
233 long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
234 {
235     AFSFetchStatus outStatus;
236     AFSStoreStatus inStatus;
237     AFSVolSync volSync;
238     AFSFid tfid;
239     long code;
240     long truncPos;
241     cm_conn_t *connp;
242     struct rx_call *callp;
243     struct rx_connection *rxconnp;
244
245     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
246     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
247                      CM_SCACHESYNC_STOREDATA_EXCL);
248
249     /* prepare the output status for the store */
250     inStatus.Mask = AFS_SETMODTIME;
251     inStatus.ClientModTime = scp->clientModTime;
252     scp->mask &= ~CM_SCACHEMASK_CLIENTMODTIME;
253
254     /* calculate truncation position */
255     truncPos = scp->length.LowPart;
256     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
257          && scp->truncPos.LowPart < (unsigned long) truncPos)
258         truncPos = scp->truncPos.LowPart;
259     scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
260
261     lock_ReleaseMutex(&scp->mx);
262
263     cm_AFSFidFromFid(&tfid, &scp->fid);
264
265     /* now we're ready to do the store operation */
266     do {
267         code = cm_Conn(&scp->fid, userp, reqp, &connp);
268         if (code) 
269             continue;
270                 
271         rxconnp = cm_GetRxConn(connp);
272         callp = rx_NewCall(rxconnp);
273         rx_PutConnection(rxconnp);
274
275         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
276                                     0, 0, truncPos);
277
278         if (code == 0)
279             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
280         code = rx_EndCall(callp, code);
281
282     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
283     code = cm_MapRPCError(code, reqp);
284         
285     /* now, clean up our state */
286     lock_ObtainMutex(&scp->mx);
287
288     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
289
290     if (code == 0) {
291         /*
292          * For explanation of handling of CM_SCACHEMASK_LENGTH,
293          * see cm_BufWrite().
294          */
295         if (outStatus.Length >= scp->length.LowPart)
296             scp->mask &= ~CM_SCACHEMASK_LENGTH;
297         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
298     }
299
300     return code;
301 }
302
303 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
304 {
305     *bytesReadp = cm_data.buf_blockSize;
306
307     /* now return a code that means that I/O is done */
308     return 0;
309 }
310
311 /* stabilize scache entry, and return with it locked so 
312  * it stays stable.
313  */
314 long cm_BufStabilize(void *parmp, cm_user_t *userp, cm_req_t *reqp)
315 {
316     cm_scache_t *scp;
317     long code;
318
319     scp = parmp;
320         
321     lock_ObtainMutex(&scp->mx);
322     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
323                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
324     if (code) {
325         lock_ReleaseMutex(&scp->mx);
326         return code;
327     }
328         
329     return 0;
330 }
331
332 /* undoes the work that cm_BufStabilize does: releases lock so things can change again */
333 long cm_BufUnstabilize(void *parmp, cm_user_t *userp)
334 {
335     cm_scache_t *scp;
336         
337     scp = parmp;
338         
339     lock_ReleaseMutex(&scp->mx);
340         
341     /* always succeeds */
342     return 0;
343 }
344
345 cm_buf_ops_t cm_bufOps = {
346     cm_BufWrite,
347     cm_BufRead,
348     cm_BufStabilize,
349     cm_BufUnstabilize
350 };
351
352 long cm_ValidateDCache(void)
353 {
354     return buf_ValidateBuffers();
355 }
356
357 long cm_ShutdownDCache(void)
358 {
359     return 0;
360 }
361
362 int cm_InitDCache(int newFile, long chunkSize, long nbuffers)
363 {
364     lock_InitializeMutex(&cm_bufGetMutex, "buf_Get mutex");
365     return buf_Init(newFile, &cm_bufOps, nbuffers);
366 }
367
368 /* check to see if we have an up-to-date buffer.  The buffer must have
369  * previously been obtained by calling buf_Get.
370  *
371  * Make sure we have a callback, and that the dataversion matches.
372  *
373  * Scp must be locked.
374  *
375  * Bufp *may* be locked.
376  */
377 int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
378 {
379     int code;
380     if (!cm_HaveCallback(scp))
381         return 0;
382     if ((bufp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED)) == (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
383         return 1;
384     if (bufp->dataVersion == scp->dataVersion)
385         return 1;
386     if (!isBufLocked) {
387         code = lock_TryMutex(&bufp->mx);
388         if (code == 0) {
389             /* don't have the lock, and can't lock it, then
390              * return failure.
391              */
392             return 0;
393         }
394     }
395
396     /* remember dirty flag for later */
397     code = bufp->flags & CM_BUF_DIRTY;
398
399     /* release lock if we obtained it here */
400     if (!isBufLocked) 
401         lock_ReleaseMutex(&bufp->mx);
402
403     /* if buffer was dirty, buffer is acceptable for use */
404     if (code) 
405         return 1;
406     else 
407         return 0;
408 }
409
410 /* used when deciding whether to do a prefetch or not */
411 long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, long length,
412                         cm_user_t *up, cm_req_t *reqp, osi_hyper_t *realBasep)
413 {
414     osi_hyper_t toffset;
415     osi_hyper_t tbase;
416     long code;
417     cm_buf_t *bp;
418     int stop;
419         
420     /* now scan all buffers in the range, looking for any that look like
421      * they need work.
422      */
423     tbase = *startBasep;
424     stop = 0;
425     lock_ObtainMutex(&scp->mx);
426     while(length > 0) {
427         /* get callback so we can do a meaningful dataVersion comparison */
428         code = cm_SyncOp(scp, NULL, up, reqp, 0,
429                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
430         if (code) {
431             scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
432             lock_ReleaseMutex(&scp->mx);
433             return code;
434         }
435                 
436         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
437             /* we're past the end of file */
438             break;
439         }
440
441         bp = buf_Find(scp, &tbase);
442         /* We cheat slightly by not locking the bp mutex. */
443         if (bp) {
444             if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
445                  && bp->dataVersion != scp->dataVersion)
446                 stop = 1;
447             buf_Release(bp);
448         }
449         else 
450             stop = 1;
451
452         /* if this buffer is essentially guaranteed to require a fetch,
453          * break out here and return this position.
454          */
455         if (stop) 
456             break;
457                 
458         toffset.LowPart = cm_data.buf_blockSize;
459         toffset.HighPart = 0;
460         tbase = LargeIntegerAdd(toffset, tbase);
461         length -= cm_data.buf_blockSize;
462     }
463         
464     /* if we get here, either everything is fine or stop stopped us at a
465      * particular buffer in the range that definitely needs to be fetched.
466      */
467     if (stop == 0) {
468         /* return non-zero code since realBasep won't be valid */
469         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
470         code = -1;
471     }   
472     else {
473         /* successfully found a page that will need fetching */
474         *realBasep = tbase;
475         code = 0;
476     }
477     lock_ReleaseMutex(&scp->mx);
478     return code;
479 }
480
481 void cm_BkgStore(cm_scache_t *scp, long p1, long p2, long p3, long p4,
482                  cm_user_t *userp)
483 {
484     osi_hyper_t toffset;
485     long length;
486     cm_req_t req;
487
488     cm_InitReq(&req);
489     req.flags |= CM_REQ_NORETRY;
490
491     toffset.LowPart = p1;
492     toffset.HighPart = p2;
493     length = p3;
494
495     osi_Log2(afsd_logp, "Starting BKG store vp 0x%x, base 0x%x", scp, p1);
496
497     cm_BufWrite(&scp->fid, &toffset, length, /* flags */ 0, userp, &req);
498
499     lock_ObtainMutex(&scp->mx);
500     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
501     lock_ReleaseMutex(&scp->mx);
502 }
503
504 void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base)
505 {
506     osi_hyper_t thyper;
507
508     if (code == 0) {
509         thyper.LowPart = cm_chunkSize;
510         thyper.HighPart = 0;
511         thyper =  LargeIntegerAdd(*base, thyper);
512         thyper.LowPart &= (-cm_chunkSize);
513         if (LargeIntegerGreaterThan(*base, scp->prefetch.base))
514             scp->prefetch.base = *base;
515         if (LargeIntegerGreaterThan(thyper, scp->prefetch.end))
516             scp->prefetch.end = thyper;
517     }
518     scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
519 }
520
521 /* do the prefetch */
522 void cm_BkgPrefetch(cm_scache_t *scp, long p1, long p2, long p3, long p4,
523                     cm_user_t *userp)
524 {
525     long length;
526     osi_hyper_t base;
527     long code;
528     cm_buf_t *bp;
529     int cpff = 0;                       /* cleared prefetch flag */
530     cm_req_t req;
531
532     cm_InitReq(&req);
533     req.flags |= CM_REQ_NORETRY;
534         
535     base.LowPart = p1;
536     base.HighPart = p2;
537     length = p3;
538         
539     osi_Log2(afsd_logp, "Starting BKG prefetch vp 0x%x, base 0x%x", scp, p1);
540
541     code = buf_Get(scp, &base, &bp);
542
543     lock_ObtainMutex(&scp->mx);
544
545     if (code || (bp->cmFlags & CM_BUF_CMFETCHING)) {
546         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
547         lock_ReleaseMutex(&scp->mx);
548         return;
549     }
550
551     code = cm_GetBuffer(scp, bp, &cpff, userp, &req);
552     if (!cpff) 
553         cm_ClearPrefetchFlag(code, scp, &base);
554     lock_ReleaseMutex(&scp->mx);
555     buf_Release(bp);
556     return;
557 }
558
559 /* a read was issued to offsetp, and we have to determine whether we should
560  * do a prefetch.
561  */
562 void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp,
563                          cm_user_t *userp, cm_req_t *reqp)
564 {
565     long code;
566     osi_hyper_t realBase;
567     osi_hyper_t readBase;
568         
569     readBase = *offsetp;
570     /* round up to chunk boundary */
571     readBase.LowPart += (cm_chunkSize-1);
572     readBase.LowPart &= (-cm_chunkSize);
573
574     lock_ObtainMutex(&scp->mx);
575     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
576          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
577         lock_ReleaseMutex(&scp->mx);
578         return;
579     }
580     scp->flags |= CM_SCACHEFLAG_PREFETCHING;
581
582     /* start the scan at the latter of the end of this read or
583      * the end of the last fetched region.
584      */
585     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
586         readBase = scp->prefetch.end;
587
588     lock_ReleaseMutex(&scp->mx);
589
590     code = cm_CheckFetchRange(scp, &readBase, cm_chunkSize, userp, reqp,
591                               &realBase);
592     if (code) 
593         return; /* can't find something to prefetch */
594
595     osi_Log2(afsd_logp, "BKG Prefetch request vp 0x%x, base 0x%x",
596              scp, realBase.LowPart);
597
598     cm_QueueBKGRequest(scp, cm_BkgPrefetch, realBase.LowPart,
599                        realBase.HighPart, cm_chunkSize, 0, userp);
600 }
601
602 /* scp must be locked; temporarily unlocked during processing.
603  * If returns 0, returns buffers held in biop, and with
604  * CM_BUF_CMSTORING set.
605  *
606  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
607  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
608  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
609  * must be woken, and the event must be set when the I/O is done.  All of this
610  * is required so that buf_WaitIO synchronizes properly with the buffer as it
611  * is being written out.
612  */
613 long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
614                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
615 {
616     cm_buf_t *bufp;
617     osi_queueData_t *qdp;
618     osi_hyper_t thyper;
619     osi_hyper_t tbase;
620     osi_hyper_t scanStart;              /* where to start scan for dirty pages */
621     osi_hyper_t scanEnd;                /* where to stop scan for dirty pages */
622     osi_hyper_t firstModOffset; /* offset of first modified page in range */
623     long temp;
624     long code;
625     long flags;                 /* flags to cm_SyncOp */
626         
627     /* clear things out */
628     biop->scp = scp;            /* don't hold */
629     biop->offset = *inOffsetp;
630     biop->length = 0;
631     biop->bufListp = NULL;
632     biop->bufListEndp = NULL;
633     biop->reserved = 0;
634
635     /* reserve a chunk's worth of buffers */
636     lock_ReleaseMutex(&scp->mx);
637     buf_ReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
638     lock_ObtainMutex(&scp->mx);
639
640     bufp = NULL;
641     for (temp = 0; temp < inSize; temp += cm_data.buf_blockSize, bufp = NULL) {
642         thyper.HighPart = 0;
643         thyper.LowPart = temp;
644         tbase = LargeIntegerAdd(*inOffsetp, thyper);
645
646         bufp = buf_Find(scp, &tbase);
647         if (bufp) {
648             /* get buffer mutex and scp mutex safely */
649             lock_ReleaseMutex(&scp->mx);
650             lock_ObtainMutex(&bufp->mx);
651             lock_ObtainMutex(&scp->mx);
652
653             flags = CM_SCACHESYNC_NEEDCALLBACK
654                 | CM_SCACHESYNC_GETSTATUS
655                     | CM_SCACHESYNC_STOREDATA
656                         | CM_SCACHESYNC_BUFLOCKED;
657             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
658             if (code) {
659                 lock_ReleaseMutex(&bufp->mx);
660                 buf_Release(bufp);
661                 buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
662                 return code;
663             }   
664                         
665             /* if the buffer is dirty, we're done */
666             if (bufp->flags & CM_BUF_DIRTY) {
667                 osi_assertx(!(bufp->flags & CM_BUF_WRITING),
668                             "WRITING w/o CMSTORING in SetupStoreBIOD");
669                 bufp->flags |= CM_BUF_WRITING;
670                 break;
671             }
672
673             /* this buffer is clean, so there's no reason to process it */
674             cm_SyncOpDone(scp, bufp, flags);
675             lock_ReleaseMutex(&bufp->mx);
676             buf_Release(bufp);
677         }       
678     }
679
680     biop->reserved = 1;
681         
682     /* if we get here, if bufp is null, we didn't find any dirty buffers
683      * that weren't already being stored back, so we just quit now.
684      */
685     if (!bufp) {
686         return 0;
687     }
688
689     /* don't need buffer mutex any more */
690     lock_ReleaseMutex(&bufp->mx);
691         
692     /* put this element in the list */
693     qdp = osi_QDAlloc();
694     osi_SetQData(qdp, bufp);
695     /* don't have to hold bufp, since held by buf_Find above */
696     osi_QAddH((osi_queue_t **) &biop->bufListp,
697               (osi_queue_t **) &biop->bufListEndp,
698               &qdp->q);
699     biop->length = cm_data.buf_blockSize;
700     firstModOffset = bufp->offset;
701     biop->offset = firstModOffset;
702
703     /* compute the window surrounding *inOffsetp of size cm_chunkSize */
704     scanStart = *inOffsetp;
705     scanStart.LowPart &= (-cm_chunkSize);
706     thyper.LowPart = cm_chunkSize;
707     thyper.HighPart = 0;
708     scanEnd = LargeIntegerAdd(scanStart, thyper);
709
710     flags = CM_SCACHESYNC_NEEDCALLBACK
711         | CM_SCACHESYNC_GETSTATUS
712         | CM_SCACHESYNC_STOREDATA
713         | CM_SCACHESYNC_BUFLOCKED
714         | CM_SCACHESYNC_NOWAIT;
715
716     /* start by looking backwards until scanStart */
717     thyper.HighPart = 0;                /* hyper version of cm_data.buf_blockSize */
718     thyper.LowPart = cm_data.buf_blockSize;
719     tbase = LargeIntegerSubtract(firstModOffset, thyper);
720     while(LargeIntegerGreaterThanOrEqualTo(tbase, scanStart)) {
721         /* see if we can find the buffer */
722         bufp = buf_Find(scp, &tbase);
723         if (!bufp) 
724             break;
725
726         /* try to lock it, and quit if we can't (simplifies locking) */
727         lock_ReleaseMutex(&scp->mx);
728         code = lock_TryMutex(&bufp->mx);
729         lock_ObtainMutex(&scp->mx);
730         if (code == 0) {
731             buf_Release(bufp);
732             break;
733         }
734                 
735         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
736         if (code) {
737             lock_ReleaseMutex(&bufp->mx);
738             buf_Release(bufp);
739             break;
740         }
741                 
742         if (!(bufp->flags & CM_BUF_DIRTY)) {
743             /* buffer is clean, so we shouldn't add it */
744             cm_SyncOpDone(scp, bufp, flags);
745             lock_ReleaseMutex(&bufp->mx);
746             buf_Release(bufp);
747             break;
748         }
749
750         /* don't need buffer mutex any more */
751         lock_ReleaseMutex(&bufp->mx);
752
753         /* we have a dirty buffer ready for storing.  Add it to the tail
754          * of the list, since it immediately precedes all of the disk
755          * addresses we've already collected.
756          */
757         qdp = osi_QDAlloc();
758         osi_SetQData(qdp, bufp);
759         /* no buf_hold necessary, since we have it held from buf_Find */
760         osi_QAddT((osi_queue_t **) &biop->bufListp,
761                   (osi_queue_t **) &biop->bufListEndp,
762                   &qdp->q);
763
764         /* update biod info describing the transfer */
765         biop->offset = LargeIntegerSubtract(biop->offset, thyper);
766         biop->length += cm_data.buf_blockSize;
767
768         /* update loop pointer */
769         tbase = LargeIntegerSubtract(tbase, thyper);
770     }   /* while loop looking for pages preceding the one we found */
771
772     /* now, find later dirty, contiguous pages, and add them to the list */
773     thyper.HighPart = 0;                /* hyper version of cm_data.buf_blockSize */
774     thyper.LowPart = cm_data.buf_blockSize;
775     tbase = LargeIntegerAdd(firstModOffset, thyper);
776     while(LargeIntegerLessThan(tbase, scanEnd)) {
777         /* see if we can find the buffer */
778         bufp = buf_Find(scp, &tbase);
779         if (!bufp) 
780             break;
781
782         /* try to lock it, and quit if we can't (simplifies locking) */
783         lock_ReleaseMutex(&scp->mx);
784         code = lock_TryMutex(&bufp->mx);
785         lock_ObtainMutex(&scp->mx);
786         if (code == 0) {
787             buf_Release(bufp);
788             break;
789         }
790
791         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
792         if (code) {
793             lock_ReleaseMutex(&bufp->mx);
794             buf_Release(bufp);
795             break;
796         }
797                 
798         if (!(bufp->flags & CM_BUF_DIRTY)) {
799             /* buffer is clean, so we shouldn't add it */
800             cm_SyncOpDone(scp, bufp, flags);
801             lock_ReleaseMutex(&bufp->mx);
802             buf_Release(bufp);
803             break;
804         }
805
806         /* don't need buffer mutex any more */
807         lock_ReleaseMutex(&bufp->mx);
808
809         /* we have a dirty buffer ready for storing.  Add it to the head
810          * of the list, since it immediately follows all of the disk
811          * addresses we've already collected.
812          */
813         qdp = osi_QDAlloc();
814         osi_SetQData(qdp, bufp);
815         /* no buf_hold necessary, since we have it held from buf_Find */
816         osi_QAddH((osi_queue_t **) &biop->bufListp,
817                   (osi_queue_t **) &biop->bufListEndp,
818                   &qdp->q);
819
820         /* update biod info describing the transfer */
821         biop->length += cm_data.buf_blockSize;
822                 
823         /* update loop pointer */
824         tbase = LargeIntegerAdd(tbase, thyper);
825     }   /* while loop looking for pages following the first page we found */
826         
827     /* finally, we're done */
828     return 0;
829 }
830
831 /* scp must be locked; temporarily unlocked during processing.
832  * If returns 0, returns buffers held in biop, and with
833  * CM_BUF_CMFETCHING flags set.
834  * If an error is returned, we don't return any buffers.
835  */
836 long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
837                         cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp)
838 {
839     long code;
840     cm_buf_t *tbp;
841     osi_hyper_t toffset;                /* a long long temp variable */
842     osi_hyper_t pageBase;               /* base offset we're looking at */
843     osi_queueData_t *qdp;               /* one temp queue structure */
844     osi_queueData_t *tqdp;              /* another temp queue structure */
845     long collected;                     /* how many bytes have been collected */
846     int isFirst;
847     long flags;
848     osi_hyper_t fileSize;               /* the # of bytes in the file */
849     osi_queueData_t *heldBufListp;      /* we hold all buffers in this list */
850     osi_queueData_t *heldBufListEndp;   /* first one */
851     int reserving;
852
853     biop->scp = scp;
854     biop->offset = *offsetp;
855     /* null out the list of buffers */
856     biop->bufListp = biop->bufListEndp = NULL;
857     biop->reserved = 0;
858
859     /* first lookup the file's length, so we know when to stop */
860     code = cm_SyncOp(scp, NULL, up, reqp, 0, 
861                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
862     if (code) 
863         return code;
864         
865     /* copy out size, since it may change */
866     fileSize = scp->serverLength;
867         
868     lock_ReleaseMutex(&scp->mx);
869
870     pageBase = *offsetp;
871     collected = pageBase.LowPart & (cm_chunkSize - 1);
872     heldBufListp = NULL;
873     heldBufListEndp = NULL;
874
875     /*
876      * Obtaining buffers can cause dirty buffers to be recycled, which
877      * can cause a storeback, so cannot be done while we have buffers
878      * reserved.
879      *
880      * To get around this, we get buffers twice.  Before reserving buffers,
881      * we obtain and release each one individually.  After reserving
882      * buffers, we try to obtain them again, but only by lookup, not by
883      * recycling.  If a buffer has gone away while we were waiting for
884      * the others, we just use whatever buffers we already have.
885      *
886      * On entry to this function, we are already holding a buffer, so we
887      * can't wait for reservation.  So we call buf_TryReserveBuffers()
888      * instead.  Not only that, we can't really even call buf_Get(), for
889      * the same reason.  We can't avoid that, though.  To avoid deadlock
890      * we allow only one thread to be executing the buf_Get()-buf_Release()
891      * sequence at a time.
892      */
893
894     // lock_ObtainMutex(&cm_bufGetMutex);
895     /* first hold all buffers, since we can't hold any locks in buf_Get */
896     while (1) {
897         /* stop at chunk boundary */
898         if (collected >= cm_chunkSize) 
899             break;
900                 
901         /* see if the next page would be past EOF */
902         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
903             break;
904
905         code = buf_Get(scp, &pageBase, &tbp);
906         if (code) {
907             //lock_ReleaseMutex(&cm_bufGetMutex);
908             lock_ObtainMutex(&scp->mx);
909             cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
910             return code;
911         }
912                 
913         buf_Release(tbp);
914
915         toffset.HighPart = 0;
916         toffset.LowPart = cm_data.buf_blockSize;
917         pageBase = LargeIntegerAdd(toffset, pageBase);
918         collected += cm_data.buf_blockSize;
919     }
920
921     /* reserve a chunk's worth of buffers if possible */
922     reserving = buf_TryReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
923
924     // lock_ReleaseMutex(&cm_bufGetMutex);
925
926     pageBase = *offsetp;
927     collected = pageBase.LowPart & (cm_chunkSize - 1);
928
929     /* now hold all buffers, if they are still there */
930     while (1) {
931         /* stop at chunk boundary */
932         if (collected >= cm_chunkSize) 
933             break;
934                 
935         /* see if the next page would be past EOF */
936         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
937             break;
938
939         tbp = buf_Find(scp, &pageBase);
940         if (!tbp) 
941             break;
942
943         /* add the buffer to the list */
944         qdp = osi_QDAlloc();
945         osi_SetQData(qdp, tbp);
946         osi_QAdd((osi_queue_t **)&heldBufListp, &qdp->q);
947         if (!heldBufListEndp) heldBufListEndp = qdp;
948         /* leave tbp held (from buf_Get) */
949
950         if (!reserving) 
951             break;
952
953         collected += cm_data.buf_blockSize;
954         toffset.HighPart = 0;
955         toffset.LowPart = cm_data.buf_blockSize;
956         pageBase = LargeIntegerAdd(toffset, pageBase);
957     }
958
959     /* look at each buffer, adding it into the list if it looks idle and
960      * filled with old data.  One special case: wait for idle if it is the
961      * first buffer since we really need that one for our caller to make
962      * any progress.
963      */
964     isFirst = 1;
965     collected = 0;              /* now count how many we'll really use */
966     for (tqdp = heldBufListEndp;
967         tqdp;
968           tqdp = (osi_queueData_t *) osi_QPrev(&tqdp->q)) {
969         /* get a ptr to the held buffer */
970         tbp = osi_GetQData(tqdp);
971         pageBase = tbp->offset;
972
973         /* now lock the buffer lock */
974         lock_ObtainMutex(&tbp->mx);
975         lock_ObtainMutex(&scp->mx);
976
977         /* don't bother fetching over data that is already current */
978         if (tbp->dataVersion == scp->dataVersion) {
979             /* we don't need this buffer, since it is current */
980             lock_ReleaseMutex(&scp->mx);
981             lock_ReleaseMutex(&tbp->mx);
982             break;
983         }
984
985         flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_FETCHDATA
986             | CM_SCACHESYNC_BUFLOCKED;
987         if (!isFirst) 
988             flags |= CM_SCACHESYNC_NOWAIT;
989
990         /* wait for the buffer to serialize, if required.  Doesn't
991          * release the scp or buffer lock(s) if NOWAIT is specified.
992          */
993         code = cm_SyncOp(scp, tbp, up, reqp, 0, flags);
994         if (code) {
995             lock_ReleaseMutex(&scp->mx);
996             lock_ReleaseMutex(&tbp->mx);
997             break;
998         }
999                 
1000         /* don't fetch over dirty buffers */
1001         if (tbp->flags & CM_BUF_DIRTY) {
1002             cm_SyncOpDone(scp, tbp, flags);
1003             lock_ReleaseMutex(&scp->mx);
1004             lock_ReleaseMutex(&tbp->mx);
1005             break;
1006         }
1007
1008         /* Release locks */
1009         lock_ReleaseMutex(&scp->mx);
1010         lock_ReleaseMutex(&tbp->mx);
1011
1012         /* add the buffer to the list */
1013         qdp = osi_QDAlloc();
1014         osi_SetQData(qdp, tbp);
1015         osi_QAdd((osi_queue_t **)&biop->bufListp, &qdp->q);
1016         if (!biop->bufListEndp) 
1017             biop->bufListEndp = qdp;
1018         buf_Hold(tbp);
1019
1020         /* from now on, a failure just stops our collection process, but
1021          * we still do the I/O to whatever we've already managed to collect.
1022          */
1023         isFirst = 0;
1024         collected += cm_data.buf_blockSize;
1025     }
1026         
1027     /* now, we've held in biop->bufListp all the buffer's we're really
1028      * interested in.  We also have holds left from heldBufListp, and we
1029      * now release those holds on the buffers.
1030      */
1031     for (qdp = heldBufListp; qdp; qdp = tqdp) {
1032         tqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1033         tbp = osi_GetQData(qdp);
1034         osi_QDFree(qdp);
1035         buf_Release(tbp);
1036     }
1037
1038     /* Caller expects this */
1039     lock_ObtainMutex(&scp->mx);
1040  
1041     /* if we got a failure setting up the first buffer, then we don't have
1042      * any side effects yet, and we also have failed an operation that the
1043      * caller requires to make any progress.  Give up now.
1044      */
1045     if (code && isFirst) {
1046         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1047         return code;
1048     }
1049         
1050     /* otherwise, we're still OK, and should just return the I/O setup we've
1051      * got.
1052      */
1053     biop->length = collected;
1054     biop->reserved = reserving;
1055     return 0;
1056 }
1057
1058 /* release a bulk I/O structure that was setup by cm_SetupFetchBIOD or by
1059  * cm_SetupStoreBIOD
1060  */
1061 void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore)
1062 {
1063     cm_scache_t *scp;
1064     cm_buf_t *bufp;
1065     osi_queueData_t *qdp;
1066     osi_queueData_t *nqdp;
1067     int flags;
1068
1069     /* Give back reserved buffers */
1070     if (biop->reserved)
1071         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1072         
1073     flags = CM_SCACHESYNC_NEEDCALLBACK;
1074     if (isStore)
1075         flags |= CM_SCACHESYNC_STOREDATA;
1076     else
1077         flags |= CM_SCACHESYNC_FETCHDATA;
1078
1079     scp = biop->scp;
1080     for(qdp = biop->bufListp; qdp; qdp = nqdp) {
1081         /* lookup next guy first, since we're going to free this one */
1082         nqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1083                 
1084         /* extract buffer and free queue data */
1085         bufp = osi_GetQData(qdp);
1086         osi_QDFree(qdp);
1087
1088         /* now, mark I/O as done, unlock the buffer and release it */
1089         lock_ObtainMutex(&bufp->mx);
1090         lock_ObtainMutex(&scp->mx);
1091         cm_SyncOpDone(scp, bufp, flags);
1092         lock_ReleaseMutex(&scp->mx);
1093                 
1094         /* turn off writing and wakeup users */
1095         if (isStore) {
1096             if (bufp->flags & CM_BUF_WAITING) {
1097                 osi_Log1(afsd_logp, "cm_ReleaseBIOD Waking bp 0x%x", bufp);
1098                 osi_Wakeup((long) bufp);
1099             }
1100             bufp->flags &= ~(CM_BUF_WRITING | CM_BUF_DIRTY);
1101         }
1102
1103         lock_ReleaseMutex(&bufp->mx);
1104         buf_Release(bufp);
1105     }
1106
1107     /* clean things out */
1108     biop->bufListp = NULL;
1109     biop->bufListEndp = NULL;
1110 }   
1111
1112 /* Fetch a buffer.  Called with scp locked.
1113  * The scp is locked on return.
1114  */
1115 long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *up,
1116                   cm_req_t *reqp)
1117 {
1118     long code;
1119     long nbytes;                        /* bytes in transfer */
1120     long rbytes;                        /* bytes in rx_Read call */
1121     long temp;
1122     AFSFetchStatus afsStatus;
1123     AFSCallBack callback;
1124     AFSVolSync volSync;
1125     char *bufferp;
1126     cm_buf_t *tbufp;                    /* buf we're filling */
1127     osi_queueData_t *qdp;               /* q element we're scanning */
1128     AFSFid tfid;
1129     struct rx_call *callp;
1130     struct rx_connection *rxconnp;
1131     cm_bulkIO_t biod;           /* bulk IO descriptor */
1132     cm_conn_t *connp;
1133     int getroot;
1134     long t1, t2;
1135
1136     /* now, the buffer may or may not be filled with good data (buf_GetNew
1137      * drops lots of locks, and may indeed return a properly initialized
1138      * buffer, although more likely it will just return a new, empty, buffer.
1139      */
1140
1141 #ifdef AFS_FREELANCE_CLIENT
1142
1143     // yj: if they're trying to get the /afs directory, we need to
1144     // handle it differently, since it's local rather than on any
1145     // server
1146
1147     getroot = (scp==cm_data.rootSCachep);
1148     if (getroot)
1149         osi_Log1(afsd_logp,"GetBuffer returns cm_data.rootSCachep=%x",cm_data.rootSCachep);
1150 #endif
1151
1152     cm_AFSFidFromFid(&tfid, &scp->fid);
1153
1154     code = cm_SetupFetchBIOD(scp, &bufp->offset, &biod, up, reqp);
1155     if (code) {
1156         /* couldn't even get the first page setup properly */
1157         osi_Log1(afsd_logp, "SetupFetchBIOD failure code %d", code);
1158         return code;
1159     }
1160
1161     /* once we get here, we have the callback in place, we know that no one
1162      * is fetching the data now.  Check one last time that we still have
1163      * the wrong data, and then fetch it if we're still wrong.
1164      *
1165      * We can lose a race condition and end up with biod.length zero, in
1166      * which case we just retry.
1167      */
1168     if (bufp->dataVersion == scp->dataVersion || biod.length == 0) {
1169         osi_Log3(afsd_logp, "Bad DVs %d, %d or length 0x%x",
1170                  bufp->dataVersion, scp->dataVersion, biod.length);
1171         if ((bufp->dataVersion == -1
1172              || bufp->dataVersion < scp->dataVersion)
1173              && LargeIntegerGreaterThanOrEqualTo(bufp->offset,
1174                                                  scp->serverLength)) {
1175             if (bufp->dataVersion == -1)
1176                 memset(bufp->datap, 0, cm_data.buf_blockSize);
1177             bufp->dataVersion = scp->dataVersion;
1178         }
1179         lock_ReleaseMutex(&scp->mx);
1180         cm_ReleaseBIOD(&biod, 0);
1181         lock_ObtainMutex(&scp->mx);
1182         return 0;
1183     }
1184         
1185     lock_ReleaseMutex(&scp->mx);
1186
1187 #ifdef DISKCACHE95
1188     DPRINTF("cm_GetBuffer: fetching data scpDV=%d bufDV=%d scp=%x bp=%x dcp=%x\n",
1189             scp->dataVersion, bufp->dataVersion, scp, bufp, bufp->dcp);
1190 #endif /* DISKCACHE95 */
1191
1192 #ifdef AFS_FREELANCE_CLIENT
1193
1194     // yj code
1195     // if getroot then we don't need to make any calls
1196     // just return fake data
1197         
1198     if (cm_freelanceEnabled && getroot) {
1199         // setup the fake status                        
1200         afsStatus.InterfaceVersion = 0x1;
1201         afsStatus.FileType = 0x2;
1202         afsStatus.LinkCount = scp->linkCount;
1203         afsStatus.Length = cm_fakeDirSize;
1204         afsStatus.DataVersion = cm_data.fakeDirVersion;
1205         afsStatus.Author = 0x1;
1206         afsStatus.Owner = 0x0;
1207         afsStatus.CallerAccess = 0x9;
1208         afsStatus.AnonymousAccess = 0x9;
1209         afsStatus.UnixModeBits = 0x1ff;
1210         afsStatus.ParentVnode = 0x1;
1211         afsStatus.ParentUnique = 0x1;
1212         afsStatus.ResidencyMask = 0;
1213         afsStatus.ClientModTime = (afs_uint32)FakeFreelanceModTime;
1214         afsStatus.ServerModTime = (afs_uint32)FakeFreelanceModTime;
1215         afsStatus.Group = 0;
1216         afsStatus.SyncCounter = 0;
1217         afsStatus.dataVersionHigh = 0;
1218         afsStatus.lockCount = 0;
1219         afsStatus.Length_hi = 0;
1220         afsStatus.errorCode = 0;
1221         
1222         // once we're done setting up the status info,
1223         // we just fill the buffer pages with fakedata
1224         // from cm_FakeRootDir. Extra pages are set to
1225         // 0. 
1226                 
1227         lock_ObtainMutex(&cm_Freelance_Lock);
1228         t1 = bufp->offset.LowPart;
1229         qdp = biod.bufListEndp;
1230         while (qdp) {
1231             tbufp = osi_GetQData(qdp);
1232             bufferp=tbufp->datap;
1233             memset(bufferp, 0, cm_data.buf_blockSize);
1234             t2 = cm_fakeDirSize - t1;
1235             if (t2>cm_data.buf_blockSize) t2=cm_data.buf_blockSize;
1236             if (t2 > 0) {
1237                 memcpy(bufferp, cm_FakeRootDir+t1, t2);
1238             } else {
1239                 t2 = 0;
1240             }
1241             t1+=t2;
1242             qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1243
1244         }
1245         lock_ReleaseMutex(&cm_Freelance_Lock);
1246         
1247         // once we're done, we skip over the part of the
1248         // code that does the ACTUAL fetching of data for
1249         // real files
1250
1251         goto fetchingcompleted;
1252     }
1253
1254 #endif /* AFS_FREELANCE_CLIENT */
1255
1256         /* now make the call */
1257     do {
1258         code = cm_Conn(&scp->fid, up, reqp, &connp);
1259         if (code) 
1260             continue;
1261         
1262         rxconnp = cm_GetRxConn(connp);
1263         callp = rx_NewCall(rxconnp);
1264         rx_PutConnection(rxconnp);
1265
1266         osi_Log3(afsd_logp, "CALL FetchData vp %x, off 0x%x, size 0x%x",
1267                   (long) scp, biod.offset.LowPart, biod.length);
1268
1269         code = StartRXAFS_FetchData(callp, &tfid, biod.offset.LowPart,
1270                                     biod.length);
1271
1272         /* now copy the data out of the pipe and put it in the buffer */
1273         temp  = rx_Read(callp, (char *)&nbytes, 4);
1274         if (temp == 4) {
1275             nbytes = ntohl(nbytes);
1276             if (nbytes > biod.length) 
1277                 code = (callp->error < 0) ? callp->error : -1;
1278         }
1279         else 
1280             code = (callp->error < 0) ? callp->error : -1;
1281
1282         if (code == 0) {
1283             qdp = biod.bufListEndp;
1284             if (qdp) {
1285                 tbufp = osi_GetQData(qdp);
1286                 bufferp = tbufp->datap;
1287             }
1288             else 
1289                 bufferp = NULL;
1290             /* fill nbytes of data from the pipe into the pages.
1291              * When we stop, qdp will point at the last page we're
1292              * dealing with, and bufferp will tell us where we
1293              * stopped.  We'll need this info below when we clear
1294              * the remainder of the last page out (and potentially
1295              * clear later pages out, if we fetch past EOF).
1296              */
1297             while (nbytes > 0) {
1298                 /* assert that there are still more buffers;
1299                  * our check above for nbytes being less than
1300                  * biod.length should ensure this.
1301                  */
1302                 osi_assert(bufferp != NULL);
1303
1304                 /* read rbytes of data */
1305                 rbytes = (nbytes > cm_data.buf_blockSize? cm_data.buf_blockSize : nbytes);
1306                 temp = rx_Read(callp, bufferp, rbytes);
1307                 if (temp < rbytes) {
1308                     code = (callp->error < 0) ? callp->error : -1;
1309                     break;
1310                 }
1311
1312                 /* allow read-while-fetching.
1313                  * if this is the last buffer, clear the
1314                  * PREFETCHING flag, so the reader waiting for
1315                  * this buffer will start a prefetch.
1316                  */
1317                 tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
1318                 lock_ObtainMutex(&scp->mx);
1319                 if (scp->flags & CM_SCACHEFLAG_WAITING) {
1320                     osi_Log1(afsd_logp, "CM GetBuffer Waking scp 0x%x", scp);
1321                     osi_Wakeup((long) &scp->flags);
1322                 }
1323                 if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
1324                     *cpffp = 1;
1325                     cm_ClearPrefetchFlag(0, scp, &biod.offset);
1326                 }
1327                 lock_ReleaseMutex(&scp->mx);
1328
1329                 /* and adjust counters */
1330                 nbytes -= temp;
1331
1332                 /* and move to the next buffer */
1333                 if (nbytes != 0) {
1334                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1335                     if (qdp) {
1336                         tbufp = osi_GetQData(qdp);
1337                         bufferp = tbufp->datap;
1338                     }
1339                     else 
1340                         bufferp = NULL;
1341                 } else 
1342                     bufferp += temp;
1343             }
1344
1345             /* zero out remainder of last pages, in case we are
1346              * fetching past EOF.  We were fetching an integral #
1347              * of pages, but stopped, potentially in the middle of
1348              * a page.  Zero the remainder of that page, and then
1349              * all of the rest of the pages.
1350              */
1351             /* bytes fetched */
1352             rbytes = bufferp - tbufp->datap;
1353             /* bytes left to zero */
1354             rbytes = cm_data.buf_blockSize - rbytes;
1355             while(qdp) {
1356                 if (rbytes != 0)
1357                     memset(bufferp, 0, rbytes);
1358                 qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1359                 if (qdp == NULL) 
1360                     break;
1361                 tbufp = osi_GetQData(qdp);
1362                 bufferp = tbufp->datap;
1363                 /* bytes to clear in this page */
1364                 rbytes = cm_data.buf_blockSize;
1365             }   
1366         }
1367
1368         if (code == 0)
1369             code = EndRXAFS_FetchData(callp, &afsStatus, &callback, &volSync);
1370         else
1371             osi_Log0(afsd_logp, "CALL EndRXAFS_FetchData skipped due to error");
1372         code = rx_EndCall(callp, code);
1373         if (code == RXKADUNKNOWNKEY)
1374             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
1375         osi_Log0(afsd_logp, "CALL FetchData DONE");
1376
1377     } while (cm_Analyze(connp, up, reqp, &scp->fid, &volSync, NULL, NULL, code));
1378
1379   fetchingcompleted:
1380     code = cm_MapRPCError(code, reqp);
1381
1382     lock_ObtainMutex(&scp->mx);
1383     
1384     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_FETCHSTATUS);
1385
1386     /* we know that no one else has changed the buffer, since we still have
1387      * the fetching flag on the buffers, and we have the scp locked again.
1388      * Copy in the version # into the buffer if we got code 0 back from the
1389      * read.
1390      */
1391     if (code == 0) {
1392         for(qdp = biod.bufListp;
1393              qdp;
1394              qdp = (osi_queueData_t *) osi_QNext(&qdp->q)) {
1395             tbufp = osi_GetQData(qdp);
1396             tbufp->dataVersion = afsStatus.DataVersion;
1397
1398 #ifdef DISKCACHE95
1399             /* write buffer out to disk cache */
1400             diskcache_Update(tbufp->dcp, tbufp->datap, cm_data.buf_blockSize,
1401                               tbufp->dataVersion);
1402 #endif /* DISKCACHE95 */
1403         }
1404     }
1405
1406     /* release scatter/gather I/O structure (buffers, locks) */
1407     lock_ReleaseMutex(&scp->mx);
1408     cm_ReleaseBIOD(&biod, 0);
1409     lock_ObtainMutex(&scp->mx);
1410
1411     if (code == 0) 
1412         cm_MergeStatus(scp, &afsStatus, &volSync, up, 0);
1413     
1414     return code;
1415 }