windows-more-cleanups-20060201
[openafs.git] / src / WINNT / afsd / cm_dcache.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <afs/stds.h>
12
13 #ifndef DJGPP
14 #include <windows.h>
15 #include <winsock2.h>
16 #include <nb30.h>
17 #endif /* !DJGPP */
18 #ifdef COMMENT
19 #include <malloc.h>
20 #endif
21 #include <string.h>
22 #include <stdlib.h>
23 #include <osi.h>
24
25 #include "afsd.h"
26
27 #ifdef DEBUG
28 extern void afsi_log(char *pattern, ...);
29 #endif
30
31 osi_mutex_t cm_bufGetMutex;
32 #ifdef AFS_FREELANCE_CLIENT
33 extern osi_mutex_t cm_Freelance_Lock;
34 #endif
35
36 /* functions called back from the buffer package when reading or writing data,
37  * or when holding or releasing a vnode pointer.
38  */
39 long cm_BufWrite(void *vfidp, osi_hyper_t *offsetp, long length, long flags,
40                  cm_user_t *userp, cm_req_t *reqp)
41 {
42     /* store the data back from this buffer; the buffer is locked and held,
43      * but the vnode involved isn't locked, yet.  It is held by its
44      * reference from the buffer, which won't change until the buffer is
45      * released by our caller.  Thus, we don't have to worry about holding
46      * bufp->scp.
47      */
48     long code;
49     cm_fid_t *fidp = vfidp;
50     cm_scache_t *scp;
51     long nbytes;
52     long temp;
53     AFSFetchStatus outStatus;
54     AFSStoreStatus inStatus;
55     osi_hyper_t thyper;
56     AFSVolSync volSync;
57     AFSFid tfid;
58     struct rx_call *callp;
59     struct rx_connection *rxconnp;
60     osi_queueData_t *qdp;
61     cm_buf_t *bufp;
62     long wbytes;
63     char *bufferp;
64     cm_conn_t *connp;
65     long truncPos;
66     cm_bulkIO_t biod;           /* bulk IO descriptor */
67
68     osi_assert(userp != NULL);
69
70     /* now, the buffer may or may not be filled with good data (buf_GetNew
71      * drops lots of locks, and may indeed return a properly initialized
72      * buffer, although more likely it will just return a new, empty, buffer.
73      */
74     scp = cm_FindSCache(fidp);
75     if (scp == NULL)
76         return CM_ERROR_NOSUCHFILE;     /* shouldn't happen */
77
78     cm_AFSFidFromFid(&tfid, fidp);
79
80     lock_ObtainMutex(&scp->mx);
81         
82     code = cm_SetupStoreBIOD(scp, offsetp, length, &biod, userp, reqp);
83     if (code) {
84         osi_Log1(afsd_logp, "cm_SetupStoreBIOD code %x", code);
85         lock_ReleaseMutex(&scp->mx);
86         cm_ReleaseSCache(scp);
87         return code;
88     }
89
90     if (biod.length == 0) {
91         osi_Log0(afsd_logp, "cm_SetupStoreBIOD length 0");
92         lock_ReleaseMutex(&scp->mx);
93         cm_ReleaseBIOD(&biod, 1);       /* should be a NOOP */
94         cm_ReleaseSCache(scp);
95         return 0;
96     }   
97
98     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
99     (void) cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL);
100
101     /* prepare the output status for the store */
102     scp->mask |= CM_SCACHEMASK_CLIENTMODTIME;
103     cm_StatusFromAttr(&inStatus, scp, NULL);
104     truncPos = scp->length.LowPart;
105     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
106          && scp->truncPos.LowPart < (unsigned long) truncPos)
107         truncPos = scp->truncPos.LowPart;
108         scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
109                 
110     /* compute how many bytes to write from this buffer */
111     thyper = LargeIntegerSubtract(scp->length, biod.offset);
112     if (LargeIntegerLessThanZero(thyper)) {
113         /* entire buffer is past EOF */
114         nbytes = 0;
115     }
116     else {
117         /* otherwise write out part of buffer before EOF, but not
118          * more than bufferSize bytes.
119          */
120         nbytes = thyper.LowPart;
121         if (nbytes > biod.length) 
122             nbytes = biod.length;
123     }
124
125     lock_ReleaseMutex(&scp->mx);
126         
127     /* now we're ready to do the store operation */
128     do {
129         code = cm_Conn(&scp->fid, userp, reqp, &connp);
130         if (code) 
131             continue;
132                 
133         rxconnp = cm_GetRxConn(connp);
134         callp = rx_NewCall(rxconnp);
135         rx_PutConnection(rxconnp);
136
137         osi_Log4(afsd_logp, "CALL StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
138                  scp, biod.offset.HighPart, biod.offset.LowPart, nbytes);
139
140         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
141                                     biod.offset.LowPart, nbytes, truncPos);
142
143         if (code == 0) {
144             /* write the data from the the list of buffers */
145             qdp = NULL;
146             while(nbytes > 0) {
147                 if (qdp == NULL)
148                     qdp = biod.bufListEndp;
149                 else
150                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
151                 osi_assert(qdp != NULL);
152                 bufp = osi_GetQData(qdp);
153                 bufferp = bufp->datap;
154                 wbytes = nbytes;
155                 if (wbytes > cm_data.buf_blockSize) 
156                     wbytes = cm_data.buf_blockSize;
157
158                 /* write out wbytes of data from bufferp */
159                 temp = rx_Write(callp, bufferp, wbytes);
160                 if (temp != wbytes) {
161                     osi_Log3(afsd_logp, "rx_Write failed bp 0x%p, %d != %d",bufp,temp,wbytes);
162                     code = -1;
163                     break;
164                 } else {
165                     osi_Log2(afsd_logp, "rx_Write succeeded bp 0x%p, %d",bufp,temp);
166                 }       
167                 nbytes -= wbytes;
168             }   /* while more bytes to write */
169         }               /* if RPC started successfully */
170         else {
171             osi_Log2(afsd_logp, "StartRXAFS_StoreData scp 0x%p failed (%lX)",scp,code);
172         }
173         if (code == 0) {
174             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
175             if (code)
176                 osi_Log2(afsd_logp, "EndRXAFS_StoreData scp 0x%p failed (%lX)",scp,code);
177         }
178         code = rx_EndCall(callp, code);
179                 
180     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
181     code = cm_MapRPCError(code, reqp);
182         
183     if (code)
184         osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
185     else
186         osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
187
188     /* now, clean up our state */
189     lock_ObtainMutex(&scp->mx);
190
191     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
192
193     if (code == 0) {
194         /* now, here's something a little tricky: in AFS 3, a dirty
195          * length can't be directly stored, instead, a dirty chunk is
196          * stored that sets the file's size (by writing and by using
197          * the truncate-first option in the store call).
198          *
199          * At this point, we've just finished a store, and so the trunc
200          * pos field is clean.  If the file's size at the server is at
201          * least as big as we think it should be, then we turn off the
202          * length dirty bit, since all the other dirty buffers must
203          * precede this one in the file.
204          *
205          * The file's desired size shouldn't be smaller than what's
206          * stored at the server now, since we just did the trunc pos
207          * store.
208          *
209          * We have to turn off the length dirty bit as soon as we can,
210          * so that we see updates made by other machines.
211          */
212         if (outStatus.Length >= scp->length.LowPart)
213             scp->mask &= ~CM_SCACHEMASK_LENGTH;
214         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
215     } else {
216         if (code == CM_ERROR_SPACE)
217             scp->flags |= CM_SCACHEFLAG_OUTOFSPACE;
218         else if (code == CM_ERROR_QUOTA)
219             scp->flags |= CM_SCACHEFLAG_OVERQUOTA;
220     }
221     lock_ReleaseMutex(&scp->mx);
222     cm_ReleaseBIOD(&biod, 1);
223     cm_ReleaseSCache(scp);
224
225     return code;
226 }
227
228 /*
229  * Truncate the file, by sending a StoreData RPC with zero length.
230  *
231  * Called with scp locked.  Releases and re-obtains the lock.
232  */
233 long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
234 {
235     AFSFetchStatus outStatus;
236     AFSStoreStatus inStatus;
237     AFSVolSync volSync;
238     AFSFid tfid;
239     long code;
240     long truncPos;
241     cm_conn_t *connp;
242     struct rx_call *callp;
243     struct rx_connection *rxconnp;
244
245     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
246     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
247                      CM_SCACHESYNC_STOREDATA_EXCL);
248
249     /* prepare the output status for the store */
250     inStatus.Mask = AFS_SETMODTIME;
251     inStatus.ClientModTime = scp->clientModTime;
252     scp->mask &= ~CM_SCACHEMASK_CLIENTMODTIME;
253
254     /* calculate truncation position */
255     truncPos = scp->length.LowPart;
256     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
257          && scp->truncPos.LowPart < (unsigned long) truncPos)
258         truncPos = scp->truncPos.LowPart;
259     scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
260
261     lock_ReleaseMutex(&scp->mx);
262
263     cm_AFSFidFromFid(&tfid, &scp->fid);
264
265     /* now we're ready to do the store operation */
266     do {
267         code = cm_Conn(&scp->fid, userp, reqp, &connp);
268         if (code) 
269             continue;
270                 
271         rxconnp = cm_GetRxConn(connp);
272         callp = rx_NewCall(rxconnp);
273         rx_PutConnection(rxconnp);
274
275         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
276                                     0, 0, truncPos);
277
278         if (code == 0)
279             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
280         code = rx_EndCall(callp, code);
281
282     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
283     code = cm_MapRPCError(code, reqp);
284         
285     /* now, clean up our state */
286     lock_ObtainMutex(&scp->mx);
287
288     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
289
290     if (code == 0) {
291         /*
292          * For explanation of handling of CM_SCACHEMASK_LENGTH,
293          * see cm_BufWrite().
294          */
295         if (outStatus.Length >= scp->length.LowPart)
296             scp->mask &= ~CM_SCACHEMASK_LENGTH;
297         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
298     }
299
300     return code;
301 }
302
303 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
304 {
305     *bytesReadp = cm_data.buf_blockSize;
306
307     /* now return a code that means that I/O is done */
308     return 0;
309 }
310
311 /* stabilize scache entry, and return with it locked so 
312  * it stays stable.
313  */
314 long cm_BufStabilize(void *parmp, cm_user_t *userp, cm_req_t *reqp)
315 {
316     cm_scache_t *scp;
317     long code;
318
319     scp = parmp;
320         
321     lock_ObtainMutex(&scp->mx);
322     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
323                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
324     if (code) {
325         lock_ReleaseMutex(&scp->mx);
326         return code;
327     }
328         
329     return 0;
330 }
331
332 /* undoes the work that cm_BufStabilize does: releases lock so things can change again */
333 long cm_BufUnstabilize(void *parmp, cm_user_t *userp)
334 {
335     cm_scache_t *scp;
336         
337     scp = parmp;
338         
339     lock_ReleaseMutex(&scp->mx);
340         
341     /* always succeeds */
342     return 0;
343 }
344
345 cm_buf_ops_t cm_bufOps = {
346     cm_BufWrite,
347     cm_BufRead,
348     cm_BufStabilize,
349     cm_BufUnstabilize
350 };
351
352 long cm_ValidateDCache(void)
353 {
354     return buf_ValidateBuffers();
355 }
356
357 long cm_ShutdownDCache(void)
358 {
359     return 0;
360 }
361
362 int cm_InitDCache(int newFile, long chunkSize, afs_uint64 nbuffers)
363 {
364     lock_InitializeMutex(&cm_bufGetMutex, "buf_Get mutex");
365     return buf_Init(newFile, &cm_bufOps, nbuffers);
366 }
367
368 /* check to see if we have an up-to-date buffer.  The buffer must have
369  * previously been obtained by calling buf_Get.
370  *
371  * Make sure we have a callback, and that the dataversion matches.
372  *
373  * Scp must be locked.
374  *
375  * Bufp *may* be locked.
376  */
377 int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
378 {
379     int code;
380     if (!cm_HaveCallback(scp))
381         return 0;
382     if ((bufp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED)) == (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
383         return 1;
384     if (bufp->dataVersion == scp->dataVersion)
385         return 1;
386     if (!isBufLocked) {
387         code = lock_TryMutex(&bufp->mx);
388         if (code == 0) {
389             /* don't have the lock, and can't lock it, then
390              * return failure.
391              */
392             return 0;
393         }
394     }
395
396     /* remember dirty flag for later */
397     code = bufp->flags & CM_BUF_DIRTY;
398
399     /* release lock if we obtained it here */
400     if (!isBufLocked) 
401         lock_ReleaseMutex(&bufp->mx);
402
403     /* if buffer was dirty, buffer is acceptable for use */
404     if (code) 
405         return 1;
406     else 
407         return 0;
408 }
409
410 /* used when deciding whether to do a prefetch or not */
411 long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, long length,
412                         cm_user_t *up, cm_req_t *reqp, osi_hyper_t *realBasep)
413 {
414     osi_hyper_t toffset;
415     osi_hyper_t tbase;
416     long code;
417     cm_buf_t *bp;
418     int stop;
419         
420     /* now scan all buffers in the range, looking for any that look like
421      * they need work.
422      */
423     tbase = *startBasep;
424     stop = 0;
425     lock_ObtainMutex(&scp->mx);
426     while(length > 0) {
427         /* get callback so we can do a meaningful dataVersion comparison */
428         code = cm_SyncOp(scp, NULL, up, reqp, 0,
429                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
430         if (code) {
431             scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
432             lock_ReleaseMutex(&scp->mx);
433             return code;
434         }
435                 
436         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
437             /* we're past the end of file */
438             break;
439         }
440
441         bp = buf_Find(scp, &tbase);
442         /* We cheat slightly by not locking the bp mutex. */
443         if (bp) {
444             if ((bp->cmFlags & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
445                  && bp->dataVersion != scp->dataVersion)
446                 stop = 1;
447             buf_Release(bp);
448         }
449         else 
450             stop = 1;
451
452         /* if this buffer is essentially guaranteed to require a fetch,
453          * break out here and return this position.
454          */
455         if (stop) 
456             break;
457                 
458         toffset.LowPart = cm_data.buf_blockSize;
459         toffset.HighPart = 0;
460         tbase = LargeIntegerAdd(toffset, tbase);
461         length -= cm_data.buf_blockSize;
462     }
463         
464     /* if we get here, either everything is fine or stop stopped us at a
465      * particular buffer in the range that definitely needs to be fetched.
466      */
467     if (stop == 0) {
468         /* return non-zero code since realBasep won't be valid */
469         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
470         code = -1;
471     }   
472     else {
473         /* successfully found a page that will need fetching */
474         *realBasep = tbase;
475         code = 0;
476     }
477     lock_ReleaseMutex(&scp->mx);
478     return code;
479 }
480
481 void cm_BkgStore(cm_scache_t *scp, long p1, long p2, long p3, long p4,
482                  cm_user_t *userp)
483 {
484     osi_hyper_t toffset;
485     long length;
486     cm_req_t req;
487
488     cm_InitReq(&req);
489     req.flags |= CM_REQ_NORETRY;
490
491     toffset.LowPart = p1;
492     toffset.HighPart = p2;
493     length = p3;
494
495     osi_Log4(afsd_logp, "Starting BKG store scp 0x%p, offset 0x%x:%08x, length 0x%x", scp, p2, p1, p3);
496
497     cm_BufWrite(&scp->fid, &toffset, length, /* flags */ 0, userp, &req);
498
499     lock_ObtainMutex(&scp->mx);
500     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
501     lock_ReleaseMutex(&scp->mx);
502 }
503
504 /* Called with scp locked */
505 void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base)
506 {
507     osi_hyper_t thyper;
508
509     if (code == 0) {
510         thyper.LowPart = cm_chunkSize;
511         thyper.HighPart = 0;
512         thyper =  LargeIntegerAdd(*base, thyper);
513         thyper.LowPart &= (-cm_chunkSize);
514         if (LargeIntegerGreaterThan(*base, scp->prefetch.base))
515             scp->prefetch.base = *base;
516         if (LargeIntegerGreaterThan(thyper, scp->prefetch.end))
517             scp->prefetch.end = thyper;
518     }
519     scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
520 }
521
522 /* do the prefetch */
523 void cm_BkgPrefetch(cm_scache_t *scp, long p1, long p2, long p3, long p4,
524                     cm_user_t *userp)
525 {
526     long length;
527     osi_hyper_t base;
528     long code;
529     cm_buf_t *bp;
530     int cpff = 0;                       /* cleared prefetch flag */
531     cm_req_t req;
532
533     cm_InitReq(&req);
534     req.flags |= CM_REQ_NORETRY;
535         
536     base.LowPart = p1;
537     base.HighPart = p2;
538     length = p3;
539         
540     osi_Log2(afsd_logp, "Starting BKG prefetch scp 0x%p, base 0x%x", scp, p1);
541
542     code = buf_Get(scp, &base, &bp);
543
544     lock_ObtainMutex(&scp->mx);
545
546     if (code || (bp->cmFlags & CM_BUF_CMFETCHING)) {
547         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
548         lock_ReleaseMutex(&scp->mx);
549         return;
550     }
551
552     code = cm_GetBuffer(scp, bp, &cpff, userp, &req);
553     if (!cpff) 
554         cm_ClearPrefetchFlag(code, scp, &base);
555     lock_ReleaseMutex(&scp->mx);
556     buf_Release(bp);
557     return;
558 }
559
560 /* a read was issued to offsetp, and we have to determine whether we should
561  * do a prefetch.
562  */
563 void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp,
564                          cm_user_t *userp, cm_req_t *reqp)
565 {
566     long code;
567     osi_hyper_t realBase;
568     osi_hyper_t readBase;
569         
570     readBase = *offsetp;
571     /* round up to chunk boundary */
572     readBase.LowPart += (cm_chunkSize-1);
573     readBase.LowPart &= (-cm_chunkSize);
574
575     lock_ObtainMutex(&scp->mx);
576     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
577          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
578         lock_ReleaseMutex(&scp->mx);
579         return;
580     }
581     scp->flags |= CM_SCACHEFLAG_PREFETCHING;
582
583     /* start the scan at the latter of the end of this read or
584      * the end of the last fetched region.
585      */
586     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
587         readBase = scp->prefetch.end;
588
589     lock_ReleaseMutex(&scp->mx);
590
591     code = cm_CheckFetchRange(scp, &readBase, cm_chunkSize, userp, reqp,
592                               &realBase);
593     if (code) 
594         return; /* can't find something to prefetch */
595
596     osi_Log2(afsd_logp, "BKG Prefetch request scp 0x%p, base 0x%x",
597              scp, realBase.LowPart);
598
599     cm_QueueBKGRequest(scp, cm_BkgPrefetch, realBase.LowPart,
600                        realBase.HighPart, cm_chunkSize, 0, userp);
601 }
602
603 /* scp must be locked; temporarily unlocked during processing.
604  * If returns 0, returns buffers held in biop, and with
605  * CM_BUF_CMSTORING set.
606  *
607  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
608  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
609  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
610  * must be woken, and the event must be set when the I/O is done.  All of this
611  * is required so that buf_WaitIO synchronizes properly with the buffer as it
612  * is being written out.
613  */
614 long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
615                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
616 {
617     cm_buf_t *bufp;
618     osi_queueData_t *qdp;
619     osi_hyper_t thyper;
620     osi_hyper_t tbase;
621     osi_hyper_t scanStart;              /* where to start scan for dirty pages */
622     osi_hyper_t scanEnd;                /* where to stop scan for dirty pages */
623     osi_hyper_t firstModOffset; /* offset of first modified page in range */
624     long temp;
625     long code;
626     long flags;                 /* flags to cm_SyncOp */
627         
628     /* clear things out */
629     biop->scp = scp;            /* don't hold */
630     biop->offset = *inOffsetp;
631     biop->length = 0;
632     biop->bufListp = NULL;
633     biop->bufListEndp = NULL;
634     biop->reserved = 0;
635
636     /* reserve a chunk's worth of buffers */
637     lock_ReleaseMutex(&scp->mx);
638     buf_ReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
639     lock_ObtainMutex(&scp->mx);
640
641     bufp = NULL;
642     for (temp = 0; temp < inSize; temp += cm_data.buf_blockSize, bufp = NULL) {
643         thyper.HighPart = 0;
644         thyper.LowPart = temp;
645         tbase = LargeIntegerAdd(*inOffsetp, thyper);
646
647         bufp = buf_Find(scp, &tbase);
648         if (bufp) {
649             /* get buffer mutex and scp mutex safely */
650             lock_ReleaseMutex(&scp->mx);
651             lock_ObtainMutex(&bufp->mx);
652             lock_ObtainMutex(&scp->mx);
653
654             flags = CM_SCACHESYNC_NEEDCALLBACK
655                 | CM_SCACHESYNC_GETSTATUS
656                     | CM_SCACHESYNC_STOREDATA
657                         | CM_SCACHESYNC_BUFLOCKED;
658             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
659             if (code) {
660                 lock_ReleaseMutex(&bufp->mx);
661                 buf_Release(bufp);
662                 buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
663                 return code;
664             }   
665                         
666             /* if the buffer is dirty, we're done */
667             if (bufp->flags & CM_BUF_DIRTY) {
668                 osi_assertx(!(bufp->flags & CM_BUF_WRITING),
669                             "WRITING w/o CMSTORING in SetupStoreBIOD");
670                 bufp->flags |= CM_BUF_WRITING;
671                 break;
672             }
673
674             /* this buffer is clean, so there's no reason to process it */
675             cm_SyncOpDone(scp, bufp, flags);
676             lock_ReleaseMutex(&bufp->mx);
677             buf_Release(bufp);
678         }       
679     }
680
681     biop->reserved = 1;
682         
683     /* if we get here, if bufp is null, we didn't find any dirty buffers
684      * that weren't already being stored back, so we just quit now.
685      */
686     if (!bufp) {
687         return 0;
688     }
689
690     /* don't need buffer mutex any more */
691     lock_ReleaseMutex(&bufp->mx);
692         
693     /* put this element in the list */
694     qdp = osi_QDAlloc();
695     osi_SetQData(qdp, bufp);
696     /* don't have to hold bufp, since held by buf_Find above */
697     osi_QAddH((osi_queue_t **) &biop->bufListp,
698               (osi_queue_t **) &biop->bufListEndp,
699               &qdp->q);
700     biop->length = cm_data.buf_blockSize;
701     firstModOffset = bufp->offset;
702     biop->offset = firstModOffset;
703
704     /* compute the window surrounding *inOffsetp of size cm_chunkSize */
705     scanStart = *inOffsetp;
706     scanStart.LowPart &= (-cm_chunkSize);
707     thyper.LowPart = cm_chunkSize;
708     thyper.HighPart = 0;
709     scanEnd = LargeIntegerAdd(scanStart, thyper);
710
711     flags = CM_SCACHESYNC_NEEDCALLBACK
712         | CM_SCACHESYNC_GETSTATUS
713         | CM_SCACHESYNC_STOREDATA
714         | CM_SCACHESYNC_BUFLOCKED
715         | CM_SCACHESYNC_NOWAIT;
716
717     /* start by looking backwards until scanStart */
718     thyper.HighPart = 0;                /* hyper version of cm_data.buf_blockSize */
719     thyper.LowPart = cm_data.buf_blockSize;
720     tbase = LargeIntegerSubtract(firstModOffset, thyper);
721     while(LargeIntegerGreaterThanOrEqualTo(tbase, scanStart)) {
722         /* see if we can find the buffer */
723         bufp = buf_Find(scp, &tbase);
724         if (!bufp) 
725             break;
726
727         /* try to lock it, and quit if we can't (simplifies locking) */
728         lock_ReleaseMutex(&scp->mx);
729         code = lock_TryMutex(&bufp->mx);
730         lock_ObtainMutex(&scp->mx);
731         if (code == 0) {
732             buf_Release(bufp);
733             break;
734         }
735                 
736         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
737         if (code) {
738             lock_ReleaseMutex(&bufp->mx);
739             buf_Release(bufp);
740             break;
741         }
742                 
743         if (!(bufp->flags & CM_BUF_DIRTY)) {
744             /* buffer is clean, so we shouldn't add it */
745             cm_SyncOpDone(scp, bufp, flags);
746             lock_ReleaseMutex(&bufp->mx);
747             buf_Release(bufp);
748             break;
749         }
750
751         /* don't need buffer mutex any more */
752         lock_ReleaseMutex(&bufp->mx);
753
754         /* we have a dirty buffer ready for storing.  Add it to the tail
755          * of the list, since it immediately precedes all of the disk
756          * addresses we've already collected.
757          */
758         qdp = osi_QDAlloc();
759         osi_SetQData(qdp, bufp);
760         /* no buf_hold necessary, since we have it held from buf_Find */
761         osi_QAddT((osi_queue_t **) &biop->bufListp,
762                   (osi_queue_t **) &biop->bufListEndp,
763                   &qdp->q);
764
765         /* update biod info describing the transfer */
766         biop->offset = LargeIntegerSubtract(biop->offset, thyper);
767         biop->length += cm_data.buf_blockSize;
768
769         /* update loop pointer */
770         tbase = LargeIntegerSubtract(tbase, thyper);
771     }   /* while loop looking for pages preceding the one we found */
772
773     /* now, find later dirty, contiguous pages, and add them to the list */
774     thyper.HighPart = 0;                /* hyper version of cm_data.buf_blockSize */
775     thyper.LowPart = cm_data.buf_blockSize;
776     tbase = LargeIntegerAdd(firstModOffset, thyper);
777     while(LargeIntegerLessThan(tbase, scanEnd)) {
778         /* see if we can find the buffer */
779         bufp = buf_Find(scp, &tbase);
780         if (!bufp) 
781             break;
782
783         /* try to lock it, and quit if we can't (simplifies locking) */
784         lock_ReleaseMutex(&scp->mx);
785         code = lock_TryMutex(&bufp->mx);
786         lock_ObtainMutex(&scp->mx);
787         if (code == 0) {
788             buf_Release(bufp);
789             break;
790         }
791
792         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
793         if (code) {
794             lock_ReleaseMutex(&bufp->mx);
795             buf_Release(bufp);
796             break;
797         }
798                 
799         if (!(bufp->flags & CM_BUF_DIRTY)) {
800             /* buffer is clean, so we shouldn't add it */
801             cm_SyncOpDone(scp, bufp, flags);
802             lock_ReleaseMutex(&bufp->mx);
803             buf_Release(bufp);
804             break;
805         }
806
807         /* don't need buffer mutex any more */
808         lock_ReleaseMutex(&bufp->mx);
809
810         /* we have a dirty buffer ready for storing.  Add it to the head
811          * of the list, since it immediately follows all of the disk
812          * addresses we've already collected.
813          */
814         qdp = osi_QDAlloc();
815         osi_SetQData(qdp, bufp);
816         /* no buf_hold necessary, since we have it held from buf_Find */
817         osi_QAddH((osi_queue_t **) &biop->bufListp,
818                   (osi_queue_t **) &biop->bufListEndp,
819                   &qdp->q);
820
821         /* update biod info describing the transfer */
822         biop->length += cm_data.buf_blockSize;
823                 
824         /* update loop pointer */
825         tbase = LargeIntegerAdd(tbase, thyper);
826     }   /* while loop looking for pages following the first page we found */
827         
828     /* finally, we're done */
829     return 0;
830 }
831
832 /* scp must be locked; temporarily unlocked during processing.
833  * If returns 0, returns buffers held in biop, and with
834  * CM_BUF_CMFETCHING flags set.
835  * If an error is returned, we don't return any buffers.
836  */
837 long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
838                         cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp)
839 {
840     long code;
841     cm_buf_t *tbp;
842     osi_hyper_t toffset;                /* a long long temp variable */
843     osi_hyper_t pageBase;               /* base offset we're looking at */
844     osi_queueData_t *qdp;               /* one temp queue structure */
845     osi_queueData_t *tqdp;              /* another temp queue structure */
846     long collected;                     /* how many bytes have been collected */
847     int isFirst;
848     long flags;
849     osi_hyper_t fileSize;               /* the # of bytes in the file */
850     osi_queueData_t *heldBufListp;      /* we hold all buffers in this list */
851     osi_queueData_t *heldBufListEndp;   /* first one */
852     int reserving;
853
854     biop->scp = scp;
855     biop->offset = *offsetp;
856     /* null out the list of buffers */
857     biop->bufListp = biop->bufListEndp = NULL;
858     biop->reserved = 0;
859
860     /* first lookup the file's length, so we know when to stop */
861     code = cm_SyncOp(scp, NULL, up, reqp, 0, 
862                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
863     if (code) 
864         return code;
865         
866     /* copy out size, since it may change */
867     fileSize = scp->serverLength;
868         
869     lock_ReleaseMutex(&scp->mx);
870
871     pageBase = *offsetp;
872     collected = pageBase.LowPart & (cm_chunkSize - 1);
873     heldBufListp = NULL;
874     heldBufListEndp = NULL;
875
876     /*
877      * Obtaining buffers can cause dirty buffers to be recycled, which
878      * can cause a storeback, so cannot be done while we have buffers
879      * reserved.
880      *
881      * To get around this, we get buffers twice.  Before reserving buffers,
882      * we obtain and release each one individually.  After reserving
883      * buffers, we try to obtain them again, but only by lookup, not by
884      * recycling.  If a buffer has gone away while we were waiting for
885      * the others, we just use whatever buffers we already have.
886      *
887      * On entry to this function, we are already holding a buffer, so we
888      * can't wait for reservation.  So we call buf_TryReserveBuffers()
889      * instead.  Not only that, we can't really even call buf_Get(), for
890      * the same reason.  We can't avoid that, though.  To avoid deadlock
891      * we allow only one thread to be executing the buf_Get()-buf_Release()
892      * sequence at a time.
893      */
894
895     // lock_ObtainMutex(&cm_bufGetMutex);
896     /* first hold all buffers, since we can't hold any locks in buf_Get */
897     while (1) {
898         /* stop at chunk boundary */
899         if (collected >= cm_chunkSize) 
900             break;
901                 
902         /* see if the next page would be past EOF */
903         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
904             break;
905
906         code = buf_Get(scp, &pageBase, &tbp);
907         if (code) {
908             //lock_ReleaseMutex(&cm_bufGetMutex);
909             lock_ObtainMutex(&scp->mx);
910             cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
911             return code;
912         }
913                 
914         buf_Release(tbp);
915
916         toffset.HighPart = 0;
917         toffset.LowPart = cm_data.buf_blockSize;
918         pageBase = LargeIntegerAdd(toffset, pageBase);
919         collected += cm_data.buf_blockSize;
920     }
921
922     /* reserve a chunk's worth of buffers if possible */
923     reserving = buf_TryReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
924
925     // lock_ReleaseMutex(&cm_bufGetMutex);
926
927     pageBase = *offsetp;
928     collected = pageBase.LowPart & (cm_chunkSize - 1);
929
930     /* now hold all buffers, if they are still there */
931     while (1) {
932         /* stop at chunk boundary */
933         if (collected >= cm_chunkSize) 
934             break;
935                 
936         /* see if the next page would be past EOF */
937         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
938             break;
939
940         tbp = buf_Find(scp, &pageBase);
941         if (!tbp) 
942             break;
943
944         /* add the buffer to the list */
945         qdp = osi_QDAlloc();
946         osi_SetQData(qdp, tbp);
947         osi_QAdd((osi_queue_t **)&heldBufListp, &qdp->q);
948         if (!heldBufListEndp) heldBufListEndp = qdp;
949         /* leave tbp held (from buf_Get) */
950
951         if (!reserving) 
952             break;
953
954         collected += cm_data.buf_blockSize;
955         toffset.HighPart = 0;
956         toffset.LowPart = cm_data.buf_blockSize;
957         pageBase = LargeIntegerAdd(toffset, pageBase);
958     }
959
960     /* look at each buffer, adding it into the list if it looks idle and
961      * filled with old data.  One special case: wait for idle if it is the
962      * first buffer since we really need that one for our caller to make
963      * any progress.
964      */
965     isFirst = 1;
966     collected = 0;              /* now count how many we'll really use */
967     for (tqdp = heldBufListEndp;
968         tqdp;
969           tqdp = (osi_queueData_t *) osi_QPrev(&tqdp->q)) {
970         /* get a ptr to the held buffer */
971         tbp = osi_GetQData(tqdp);
972         pageBase = tbp->offset;
973
974         /* now lock the buffer lock */
975         lock_ObtainMutex(&tbp->mx);
976         lock_ObtainMutex(&scp->mx);
977
978         /* don't bother fetching over data that is already current */
979         if (tbp->dataVersion == scp->dataVersion) {
980             /* we don't need this buffer, since it is current */
981             lock_ReleaseMutex(&scp->mx);
982             lock_ReleaseMutex(&tbp->mx);
983             break;
984         }
985
986         flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_FETCHDATA
987             | CM_SCACHESYNC_BUFLOCKED;
988         if (!isFirst) 
989             flags |= CM_SCACHESYNC_NOWAIT;
990
991         /* wait for the buffer to serialize, if required.  Doesn't
992          * release the scp or buffer lock(s) if NOWAIT is specified.
993          */
994         code = cm_SyncOp(scp, tbp, up, reqp, 0, flags);
995         if (code) {
996             lock_ReleaseMutex(&scp->mx);
997             lock_ReleaseMutex(&tbp->mx);
998             break;
999         }
1000                 
1001         /* don't fetch over dirty buffers */
1002         if (tbp->flags & CM_BUF_DIRTY) {
1003             cm_SyncOpDone(scp, tbp, flags);
1004             lock_ReleaseMutex(&scp->mx);
1005             lock_ReleaseMutex(&tbp->mx);
1006             break;
1007         }
1008
1009         /* Release locks */
1010         lock_ReleaseMutex(&scp->mx);
1011         lock_ReleaseMutex(&tbp->mx);
1012
1013         /* add the buffer to the list */
1014         qdp = osi_QDAlloc();
1015         osi_SetQData(qdp, tbp);
1016         osi_QAdd((osi_queue_t **)&biop->bufListp, &qdp->q);
1017         if (!biop->bufListEndp) 
1018             biop->bufListEndp = qdp;
1019         buf_Hold(tbp);
1020
1021         /* from now on, a failure just stops our collection process, but
1022          * we still do the I/O to whatever we've already managed to collect.
1023          */
1024         isFirst = 0;
1025         collected += cm_data.buf_blockSize;
1026     }
1027         
1028     /* now, we've held in biop->bufListp all the buffer's we're really
1029      * interested in.  We also have holds left from heldBufListp, and we
1030      * now release those holds on the buffers.
1031      */
1032     for (qdp = heldBufListp; qdp; qdp = tqdp) {
1033         tqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1034         tbp = osi_GetQData(qdp);
1035         osi_QDFree(qdp);
1036         buf_Release(tbp);
1037     }
1038
1039     /* Caller expects this */
1040     lock_ObtainMutex(&scp->mx);
1041  
1042     /* if we got a failure setting up the first buffer, then we don't have
1043      * any side effects yet, and we also have failed an operation that the
1044      * caller requires to make any progress.  Give up now.
1045      */
1046     if (code && isFirst) {
1047         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1048         return code;
1049     }
1050         
1051     /* otherwise, we're still OK, and should just return the I/O setup we've
1052      * got.
1053      */
1054     biop->length = collected;
1055     biop->reserved = reserving;
1056     return 0;
1057 }
1058
1059 /* release a bulk I/O structure that was setup by cm_SetupFetchBIOD or by
1060  * cm_SetupStoreBIOD
1061  */
1062 void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore)
1063 {
1064     cm_scache_t *scp;
1065     cm_buf_t *bufp;
1066     osi_queueData_t *qdp;
1067     osi_queueData_t *nqdp;
1068     int flags;
1069
1070     /* Give back reserved buffers */
1071     if (biop->reserved)
1072         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1073         
1074     flags = CM_SCACHESYNC_NEEDCALLBACK;
1075     if (isStore)
1076         flags |= CM_SCACHESYNC_STOREDATA;
1077     else
1078         flags |= CM_SCACHESYNC_FETCHDATA;
1079
1080     scp = biop->scp;
1081     for(qdp = biop->bufListp; qdp; qdp = nqdp) {
1082         /* lookup next guy first, since we're going to free this one */
1083         nqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1084                 
1085         /* extract buffer and free queue data */
1086         bufp = osi_GetQData(qdp);
1087         osi_QDFree(qdp);
1088
1089         /* now, mark I/O as done, unlock the buffer and release it */
1090         lock_ObtainMutex(&bufp->mx);
1091         lock_ObtainMutex(&scp->mx);
1092         cm_SyncOpDone(scp, bufp, flags);
1093                 
1094         /* turn off writing and wakeup users */
1095         if (isStore) {
1096             if (bufp->flags & CM_BUF_WAITING) {
1097                 osi_Log2(afsd_logp, "cm_ReleaseBIOD Waking [scp 0x%p] bp 0x%p", scp, bufp);
1098                 osi_Wakeup((LONG_PTR) bufp);
1099             }
1100             bufp->flags &= ~(CM_BUF_WRITING | CM_BUF_DIRTY);
1101         }
1102
1103         lock_ReleaseMutex(&scp->mx);
1104         lock_ReleaseMutex(&bufp->mx);
1105         buf_Release(bufp);
1106     }
1107
1108     /* clean things out */
1109     biop->bufListp = NULL;
1110     biop->bufListEndp = NULL;
1111 }   
1112
1113 /* Fetch a buffer.  Called with scp locked.
1114  * The scp is locked on return.
1115  */
1116 long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *up,
1117                   cm_req_t *reqp)
1118 {
1119     long code;
1120     long nbytes;                        /* bytes in transfer */
1121     long rbytes;                        /* bytes in rx_Read call */
1122     long temp;
1123     AFSFetchStatus afsStatus;
1124     AFSCallBack callback;
1125     AFSVolSync volSync;
1126     char *bufferp;
1127     cm_buf_t *tbufp;                    /* buf we're filling */
1128     osi_queueData_t *qdp;               /* q element we're scanning */
1129     AFSFid tfid;
1130     struct rx_call *callp;
1131     struct rx_connection *rxconnp;
1132     cm_bulkIO_t biod;           /* bulk IO descriptor */
1133     cm_conn_t *connp;
1134     int getroot;
1135     long t1, t2;
1136
1137     /* now, the buffer may or may not be filled with good data (buf_GetNew
1138      * drops lots of locks, and may indeed return a properly initialized
1139      * buffer, although more likely it will just return a new, empty, buffer.
1140      */
1141
1142 #ifdef AFS_FREELANCE_CLIENT
1143
1144     // yj: if they're trying to get the /afs directory, we need to
1145     // handle it differently, since it's local rather than on any
1146     // server
1147
1148     getroot = (scp==cm_data.rootSCachep);
1149     if (getroot)
1150         osi_Log1(afsd_logp,"GetBuffer returns cm_data.rootSCachep=%x",cm_data.rootSCachep);
1151 #endif
1152
1153     cm_AFSFidFromFid(&tfid, &scp->fid);
1154
1155     code = cm_SetupFetchBIOD(scp, &bufp->offset, &biod, up, reqp);
1156     if (code) {
1157         /* couldn't even get the first page setup properly */
1158         osi_Log1(afsd_logp, "SetupFetchBIOD failure code %d", code);
1159         return code;
1160     }
1161
1162     /* once we get here, we have the callback in place, we know that no one
1163      * is fetching the data now.  Check one last time that we still have
1164      * the wrong data, and then fetch it if we're still wrong.
1165      *
1166      * We can lose a race condition and end up with biod.length zero, in
1167      * which case we just retry.
1168      */
1169     if (bufp->dataVersion == scp->dataVersion || biod.length == 0) {
1170         osi_Log3(afsd_logp, "Bad DVs %d, %d or length 0x%x",
1171                  bufp->dataVersion, scp->dataVersion, biod.length);
1172         if ((bufp->dataVersion == -1
1173              || bufp->dataVersion < scp->dataVersion)
1174              && LargeIntegerGreaterThanOrEqualTo(bufp->offset,
1175                                                  scp->serverLength)) {
1176             if (bufp->dataVersion == -1)
1177                 memset(bufp->datap, 0, cm_data.buf_blockSize);
1178             bufp->dataVersion = scp->dataVersion;
1179         }
1180         lock_ReleaseMutex(&scp->mx);
1181         cm_ReleaseBIOD(&biod, 0);
1182         lock_ObtainMutex(&scp->mx);
1183         return 0;
1184     }
1185         
1186     lock_ReleaseMutex(&scp->mx);
1187
1188 #ifdef DISKCACHE95
1189     DPRINTF("cm_GetBuffer: fetching data scpDV=%d bufDV=%d scp=%x bp=%x dcp=%x\n",
1190             scp->dataVersion, bufp->dataVersion, scp, bufp, bufp->dcp);
1191 #endif /* DISKCACHE95 */
1192
1193 #ifdef AFS_FREELANCE_CLIENT
1194
1195     // yj code
1196     // if getroot then we don't need to make any calls
1197     // just return fake data
1198         
1199     if (cm_freelanceEnabled && getroot) {
1200         // setup the fake status                        
1201         afsStatus.InterfaceVersion = 0x1;
1202         afsStatus.FileType = 0x2;
1203         afsStatus.LinkCount = scp->linkCount;
1204         afsStatus.Length = cm_fakeDirSize;
1205         afsStatus.DataVersion = cm_data.fakeDirVersion;
1206         afsStatus.Author = 0x1;
1207         afsStatus.Owner = 0x0;
1208         afsStatus.CallerAccess = 0x9;
1209         afsStatus.AnonymousAccess = 0x9;
1210         afsStatus.UnixModeBits = 0x1ff;
1211         afsStatus.ParentVnode = 0x1;
1212         afsStatus.ParentUnique = 0x1;
1213         afsStatus.ResidencyMask = 0;
1214         afsStatus.ClientModTime = (afs_uint32)FakeFreelanceModTime;
1215         afsStatus.ServerModTime = (afs_uint32)FakeFreelanceModTime;
1216         afsStatus.Group = 0;
1217         afsStatus.SyncCounter = 0;
1218         afsStatus.dataVersionHigh = 0;
1219         afsStatus.lockCount = 0;
1220         afsStatus.Length_hi = 0;
1221         afsStatus.errorCode = 0;
1222         
1223         // once we're done setting up the status info,
1224         // we just fill the buffer pages with fakedata
1225         // from cm_FakeRootDir. Extra pages are set to
1226         // 0. 
1227                 
1228         lock_ObtainMutex(&cm_Freelance_Lock);
1229         t1 = bufp->offset.LowPart;
1230         qdp = biod.bufListEndp;
1231         while (qdp) {
1232             tbufp = osi_GetQData(qdp);
1233             bufferp=tbufp->datap;
1234             memset(bufferp, 0, cm_data.buf_blockSize);
1235             t2 = cm_fakeDirSize - t1;
1236             if (t2>cm_data.buf_blockSize) t2=cm_data.buf_blockSize;
1237             if (t2 > 0) {
1238                 memcpy(bufferp, cm_FakeRootDir+t1, t2);
1239             } else {
1240                 t2 = 0;
1241             }
1242             t1+=t2;
1243             qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1244
1245         }
1246         lock_ReleaseMutex(&cm_Freelance_Lock);
1247         
1248         // once we're done, we skip over the part of the
1249         // code that does the ACTUAL fetching of data for
1250         // real files
1251
1252         goto fetchingcompleted;
1253     }
1254
1255 #endif /* AFS_FREELANCE_CLIENT */
1256
1257         /* now make the call */
1258     do {
1259         code = cm_Conn(&scp->fid, up, reqp, &connp);
1260         if (code) 
1261             continue;
1262         
1263         rxconnp = cm_GetRxConn(connp);
1264         callp = rx_NewCall(rxconnp);
1265         rx_PutConnection(rxconnp);
1266
1267         osi_Log3(afsd_logp, "CALL FetchData scp 0x%p, off 0x%x, size 0x%x",
1268                  scp, biod.offset.LowPart, biod.length);
1269
1270         code = StartRXAFS_FetchData(callp, &tfid, biod.offset.LowPart,
1271                                     biod.length);
1272
1273         /* now copy the data out of the pipe and put it in the buffer */
1274         temp  = rx_Read(callp, (char *)&nbytes, 4);
1275         if (temp == 4) {
1276             nbytes = ntohl(nbytes);
1277             if (nbytes > biod.length) 
1278                 code = (callp->error < 0) ? callp->error : -1;
1279         }
1280         else 
1281             code = (callp->error < 0) ? callp->error : -1;
1282
1283         if (code == 0) {
1284             qdp = biod.bufListEndp;
1285             if (qdp) {
1286                 tbufp = osi_GetQData(qdp);
1287                 bufferp = tbufp->datap;
1288             }
1289             else 
1290                 bufferp = NULL;
1291             /* fill nbytes of data from the pipe into the pages.
1292              * When we stop, qdp will point at the last page we're
1293              * dealing with, and bufferp will tell us where we
1294              * stopped.  We'll need this info below when we clear
1295              * the remainder of the last page out (and potentially
1296              * clear later pages out, if we fetch past EOF).
1297              */
1298             while (nbytes > 0) {
1299                 /* assert that there are still more buffers;
1300                  * our check above for nbytes being less than
1301                  * biod.length should ensure this.
1302                  */
1303                 osi_assert(bufferp != NULL);
1304
1305                 /* read rbytes of data */
1306                 rbytes = (nbytes > cm_data.buf_blockSize? cm_data.buf_blockSize : nbytes);
1307                 temp = rx_Read(callp, bufferp, rbytes);
1308                 if (temp < rbytes) {
1309                     code = (callp->error < 0) ? callp->error : -1;
1310                     break;
1311                 }
1312
1313                 /* allow read-while-fetching.
1314                  * if this is the last buffer, clear the
1315                  * PREFETCHING flag, so the reader waiting for
1316                  * this buffer will start a prefetch.
1317                  */
1318                 tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
1319                 lock_ObtainMutex(&scp->mx);
1320                 if (scp->flags & CM_SCACHEFLAG_WAITING) {
1321                     osi_Log1(afsd_logp, "CM GetBuffer Waking scp 0x%p", scp);
1322                     osi_Wakeup((LONG_PTR) &scp->flags);
1323                 }
1324                 if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
1325                     *cpffp = 1;
1326                     cm_ClearPrefetchFlag(0, scp, &biod.offset);
1327                 }
1328                 lock_ReleaseMutex(&scp->mx);
1329
1330                 /* and adjust counters */
1331                 nbytes -= temp;
1332
1333                 /* and move to the next buffer */
1334                 if (nbytes != 0) {
1335                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1336                     if (qdp) {
1337                         tbufp = osi_GetQData(qdp);
1338                         bufferp = tbufp->datap;
1339                     }
1340                     else 
1341                         bufferp = NULL;
1342                 } else 
1343                     bufferp += temp;
1344             }
1345
1346             /* zero out remainder of last pages, in case we are
1347              * fetching past EOF.  We were fetching an integral #
1348              * of pages, but stopped, potentially in the middle of
1349              * a page.  Zero the remainder of that page, and then
1350              * all of the rest of the pages.
1351              */
1352             /* bytes fetched */
1353             osi_assert((bufferp - tbufp->datap) < LONG_MAX);
1354             rbytes = (long) (bufferp - tbufp->datap);
1355
1356             /* bytes left to zero */
1357             rbytes = cm_data.buf_blockSize - rbytes;
1358             while(qdp) {
1359                 if (rbytes != 0)
1360                     memset(bufferp, 0, rbytes);
1361                 qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1362                 if (qdp == NULL) 
1363                     break;
1364                 tbufp = osi_GetQData(qdp);
1365                 bufferp = tbufp->datap;
1366                 /* bytes to clear in this page */
1367                 rbytes = cm_data.buf_blockSize;
1368             }   
1369         }
1370
1371         if (code == 0)
1372             code = EndRXAFS_FetchData(callp, &afsStatus, &callback, &volSync);
1373         else
1374             osi_Log1(afsd_logp, "CALL EndRXAFS_FetchData skipped due to error %d", code);
1375         code = rx_EndCall(callp, code);
1376         if (code == RXKADUNKNOWNKEY)
1377             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
1378         osi_Log0(afsd_logp, "CALL FetchData DONE");
1379
1380     } while (cm_Analyze(connp, up, reqp, &scp->fid, &volSync, NULL, NULL, code));
1381
1382   fetchingcompleted:
1383     code = cm_MapRPCError(code, reqp);
1384
1385     lock_ObtainMutex(&scp->mx);
1386     
1387     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_FETCHSTATUS);
1388
1389     /* we know that no one else has changed the buffer, since we still have
1390      * the fetching flag on the buffers, and we have the scp locked again.
1391      * Copy in the version # into the buffer if we got code 0 back from the
1392      * read.
1393      */
1394     if (code == 0) {
1395         for(qdp = biod.bufListp;
1396              qdp;
1397              qdp = (osi_queueData_t *) osi_QNext(&qdp->q)) {
1398             tbufp = osi_GetQData(qdp);
1399             tbufp->dataVersion = afsStatus.DataVersion;
1400
1401 #ifdef DISKCACHE95
1402             /* write buffer out to disk cache */
1403             diskcache_Update(tbufp->dcp, tbufp->datap, cm_data.buf_blockSize,
1404                               tbufp->dataVersion);
1405 #endif /* DISKCACHE95 */
1406         }
1407     }
1408
1409     /* release scatter/gather I/O structure (buffers, locks) */
1410     lock_ReleaseMutex(&scp->mx);
1411     cm_ReleaseBIOD(&biod, 0);
1412     lock_ObtainMutex(&scp->mx);
1413
1414     if (code == 0) 
1415         cm_MergeStatus(scp, &afsStatus, &volSync, up, 0);
1416     
1417     return code;
1418 }