windows-undo-20050531
[openafs.git] / src / WINNT / afsd / cm_dcache.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <afs/stds.h>
12
13 #ifndef DJGPP
14 #include <windows.h>
15 #include <winsock2.h>
16 #include <nb30.h>
17 #endif /* !DJGPP */
18 #ifdef COMMENT
19 #include <malloc.h>
20 #endif
21 #include <string.h>
22 #include <stdlib.h>
23 #include <osi.h>
24
25 #include "afsd.h"
26
27 #ifdef DEBUG
28 extern void afsi_log(char *pattern, ...);
29 #endif
30
31 osi_mutex_t cm_bufGetMutex;
32 #ifdef AFS_FREELANCE_CLIENT
33 extern osi_mutex_t cm_Freelance_Lock;
34 #endif
35
36 /* functions called back from the buffer package when reading or writing data,
37  * or when holding or releasing a vnode pointer.
38  */
39 long cm_BufWrite(void *vfidp, osi_hyper_t *offsetp, long length, long flags,
40                  cm_user_t *userp, cm_req_t *reqp)
41 {
42     /* store the data back from this buffer; the buffer is locked and held,
43      * but the vnode involved isn't locked, yet.  It is held by its
44      * reference from the buffer, which won't change until the buffer is
45      * released by our caller.  Thus, we don't have to worry about holding
46      * bufp->scp.
47      */
48     long code;
49     cm_fid_t *fidp = vfidp;
50     cm_scache_t *scp;
51     long nbytes;
52     long temp;
53     AFSFetchStatus outStatus;
54     AFSStoreStatus inStatus;
55     osi_hyper_t thyper;
56     AFSVolSync volSync;
57     AFSFid tfid;
58     struct rx_call *callp;
59     struct rx_connection *rxconnp;
60     osi_queueData_t *qdp;
61     cm_buf_t *bufp;
62     long wbytes;
63     char *bufferp;
64     cm_conn_t *connp;
65     long truncPos;
66     cm_bulkIO_t biod;           /* bulk IO descriptor */
67
68     osi_assert(userp != NULL);
69
70     /* now, the buffer may or may not be filled with good data (buf_GetNew
71      * drops lots of locks, and may indeed return a properly initialized
72      * buffer, although more likely it will just return a new, empty, buffer.
73      */
74     scp = cm_FindSCache(fidp);
75     if (scp == NULL)
76         return CM_ERROR_NOSUCHFILE;     /* shouldn't happen */
77
78     cm_AFSFidFromFid(&tfid, fidp);
79
80     lock_ObtainMutex(&scp->mx);
81         
82     code = cm_SetupStoreBIOD(scp, offsetp, length, &biod, userp, reqp);
83     if (code) {
84         osi_Log1(afsd_logp, "cm_SetupStoreBIOD code %x", code);
85         lock_ReleaseMutex(&scp->mx);
86         cm_ReleaseSCache(scp);
87         return code;
88     }
89
90     if (biod.length == 0) {
91         osi_Log0(afsd_logp, "cm_SetupStoreBIOD length 0");
92         lock_ReleaseMutex(&scp->mx);
93         cm_ReleaseBIOD(&biod, 1);       /* should be a NOOP */
94         cm_ReleaseSCache(scp);
95         return 0;
96     }   
97
98     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
99     (void) cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL);
100
101     /* prepare the output status for the store */
102     scp->mask |= CM_SCACHEMASK_CLIENTMODTIME;
103     cm_StatusFromAttr(&inStatus, scp, NULL);
104     truncPos = scp->length.LowPart;
105     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
106          && scp->truncPos.LowPart < (unsigned long) truncPos)
107         truncPos = scp->truncPos.LowPart;
108         scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
109                 
110     /* compute how many bytes to write from this buffer */
111     thyper = LargeIntegerSubtract(scp->length, biod.offset);
112     if (LargeIntegerLessThanZero(thyper)) {
113         /* entire buffer is past EOF */
114         nbytes = 0;
115     }
116     else {
117         /* otherwise write out part of buffer before EOF, but not
118          * more than bufferSize bytes.
119          */
120         nbytes = thyper.LowPart;
121         if (nbytes > biod.length) 
122             nbytes = biod.length;
123     }
124
125     lock_ReleaseMutex(&scp->mx);
126         
127     /* now we're ready to do the store operation */
128     do {
129         code = cm_Conn(&scp->fid, userp, reqp, &connp);
130         if (code) 
131             continue;
132                 
133         rxconnp = cm_GetRxConn(connp);
134         callp = rx_NewCall(rxconnp);
135         rx_PutConnection(rxconnp);
136
137         osi_Log3(afsd_logp, "CALL StoreData scp 0x%x, off 0x%x, size 0x%x",
138                  (long) scp, biod.offset.LowPart, nbytes);
139
140         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
141                                     biod.offset.LowPart, nbytes, truncPos);
142
143         if (code == 0) {
144             /* write the data from the the list of buffers */
145             qdp = NULL;
146             while(nbytes > 0) {
147                 if (qdp == NULL)
148                     qdp = biod.bufListEndp;
149                 else
150                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
151                 osi_assert(qdp != NULL);
152                 bufp = osi_GetQData(qdp);
153                 bufferp = bufp->datap;
154                 wbytes = nbytes;
155                 if (wbytes > cm_data.buf_blockSize) 
156                     wbytes = cm_data.buf_blockSize;
157
158                 /* write out wbytes of data from bufferp */
159                 temp = rx_Write(callp, bufferp, wbytes);
160                 if (temp != wbytes) {
161                     osi_Log2(afsd_logp, "rx_Write failed %d != %d",temp,wbytes);
162                     code = -1;
163                     break;
164                 } else {
165                     osi_Log1(afsd_logp, "rx_Write succeeded %d",temp);
166                 }       
167                 nbytes -= wbytes;
168             }   /* while more bytes to write */
169         }               /* if RPC started successfully */
170         else {
171             osi_Log1(afsd_logp, "StartRXAFS_StoreData failed (%lX)",code);
172         }
173         if (code == 0) {
174             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
175             if (code)
176                 osi_Log1(afsd_logp, "EndRXAFS_StoreData failed (%lX)",code);
177         }
178         code = rx_EndCall(callp, code);
179                 
180     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
181     code = cm_MapRPCError(code, reqp);
182         
183     if (code)
184         osi_Log1(afsd_logp, "CALL StoreData FAILURE, code 0x%x", code);
185     else
186         osi_Log0(afsd_logp, "CALL StoreData SUCCESS");
187
188     /* now, clean up our state */
189     lock_ObtainMutex(&scp->mx);
190
191     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
192
193     if (code == 0) {
194         /* now, here's something a little tricky: in AFS 3, a dirty
195          * length can't be directly stored, instead, a dirty chunk is
196          * stored that sets the file's size (by writing and by using
197          * the truncate-first option in the store call).
198          *
199          * At this point, we've just finished a store, and so the trunc
200          * pos field is clean.  If the file's size at the server is at
201          * least as big as we think it should be, then we turn off the
202          * length dirty bit, since all the other dirty buffers must
203          * precede this one in the file.
204          *
205          * The file's desired size shouldn't be smaller than what's
206          * stored at the server now, since we just did the trunc pos
207          * store.
208          *
209          * We have to turn off the length dirty bit as soon as we can,
210          * so that we see updates made by other machines.
211          */
212         if (outStatus.Length >= scp->length.LowPart)
213             scp->mask &= ~CM_SCACHEMASK_LENGTH;
214         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
215     } else {
216         if (code == CM_ERROR_SPACE)
217             scp->flags |= CM_SCACHEFLAG_OUTOFSPACE;
218         else if (code == CM_ERROR_QUOTA)
219             scp->flags |= CM_SCACHEFLAG_OVERQUOTA;
220     }
221     lock_ReleaseMutex(&scp->mx);
222     cm_ReleaseBIOD(&biod, 1);
223     cm_ReleaseSCache(scp);
224
225     return code;
226 }
227
228 /*
229  * Truncate the file, by sending a StoreData RPC with zero length.
230  *
231  * Called with scp locked.  Releases and re-obtains the lock.
232  */
233 long cm_StoreMini(cm_scache_t *scp, cm_user_t *userp, cm_req_t *reqp)
234 {
235     AFSFetchStatus outStatus;
236     AFSStoreStatus inStatus;
237     AFSVolSync volSync;
238     AFSFid tfid;
239     long code;
240     long truncPos;
241     cm_conn_t *connp;
242     struct rx_call *callp;
243     struct rx_connection *rxconnp;
244
245     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
246     (void) cm_SyncOp(scp, NULL, userp, reqp, 0,
247                      CM_SCACHESYNC_STOREDATA_EXCL);
248
249     /* prepare the output status for the store */
250     inStatus.Mask = AFS_SETMODTIME;
251     inStatus.ClientModTime = scp->clientModTime;
252     scp->mask &= ~CM_SCACHEMASK_CLIENTMODTIME;
253
254     /* calculate truncation position */
255     truncPos = scp->length.LowPart;
256     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
257          && scp->truncPos.LowPart < (unsigned long) truncPos)
258         truncPos = scp->truncPos.LowPart;
259     scp->mask &= ~CM_SCACHEMASK_TRUNCPOS;
260
261     lock_ReleaseMutex(&scp->mx);
262
263     cm_AFSFidFromFid(&tfid, &scp->fid);
264
265     /* now we're ready to do the store operation */
266     do {
267         code = cm_Conn(&scp->fid, userp, reqp, &connp);
268         if (code) 
269             continue;
270                 
271         rxconnp = cm_GetRxConn(connp);
272         callp = rx_NewCall(rxconnp);
273         rx_PutConnection(rxconnp);
274
275         code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
276                                     0, 0, truncPos);
277
278         if (code == 0)
279             code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
280         code = rx_EndCall(callp, code);
281
282     } while (cm_Analyze(connp, userp, reqp, &scp->fid, &volSync, NULL, NULL, code));
283     code = cm_MapRPCError(code, reqp);
284         
285     /* now, clean up our state */
286     lock_ObtainMutex(&scp->mx);
287
288     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
289
290     if (code == 0) {
291         /*
292          * For explanation of handling of CM_SCACHEMASK_LENGTH,
293          * see cm_BufWrite().
294          */
295         if (outStatus.Length >= scp->length.LowPart)
296             scp->mask &= ~CM_SCACHEMASK_LENGTH;
297         cm_MergeStatus(scp, &outStatus, &volSync, userp, 0);
298     }
299
300     return code;
301 }
302
303 long cm_BufRead(cm_buf_t *bufp, long nbytes, long *bytesReadp, cm_user_t *userp)
304 {
305     *bytesReadp = cm_data.buf_blockSize;
306
307     /* now return a code that means that I/O is done */
308     return 0;
309 }
310
311 /* stabilize scache entry, and return with it locked so 
312  * it stays stable.
313  */
314 long cm_BufStabilize(void *parmp, cm_user_t *userp, cm_req_t *reqp)
315 {
316     cm_scache_t *scp;
317     long code;
318
319     scp = parmp;
320         
321     lock_ObtainMutex(&scp->mx);
322     code = cm_SyncOp(scp, NULL, userp, reqp, 0, 
323                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_SETSIZE);
324     if (code) {
325         lock_ReleaseMutex(&scp->mx);
326         return code;
327     }
328         
329     return 0;
330 }
331
332 /* undoes the work that cm_BufStabilize does: releases lock so things can change again */
333 long cm_BufUnstabilize(void *parmp, cm_user_t *userp)
334 {
335     cm_scache_t *scp;
336         
337     scp = parmp;
338         
339     lock_ReleaseMutex(&scp->mx);
340         
341     /* always succeeds */
342     return 0;
343 }
344
345 cm_buf_ops_t cm_bufOps = {
346     cm_BufWrite,
347     cm_BufRead,
348     cm_BufStabilize,
349     cm_BufUnstabilize
350 };
351
352 long cm_ValidateDCache(void)
353 {
354     return buf_ValidateBuffers();
355 }
356
357 long cm_ShutdownDCache(void)
358 {
359     return 0;
360 }
361
362 int cm_InitDCache(int newFile, long chunkSize, long nbuffers)
363 {
364     lock_InitializeMutex(&cm_bufGetMutex, "buf_Get mutex");
365     return buf_Init(newFile, &cm_bufOps, nbuffers);
366 }
367
368 /* check to see if we have an up-to-date buffer.  The buffer must have
369  * previously been obtained by calling buf_Get.
370  *
371  * Make sure we have a callback, and that the dataversion matches.
372  *
373  * Scp must be locked.
374  *
375  * Bufp *may* be locked.
376  */
377 int cm_HaveBuffer(cm_scache_t *scp, cm_buf_t *bufp, int isBufLocked)
378 {
379     int code;
380     if (!cm_HaveCallback(scp))
381         return 0;
382     if ((bufp->cmFlags
383           & (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
384          == (CM_BUF_CMFETCHING | CM_BUF_CMFULLYFETCHED))
385         return 1;
386     if (bufp->dataVersion == scp->dataVersion)
387         return 1;
388     if (!isBufLocked) {
389         code = lock_TryMutex(&bufp->mx);
390         if (code == 0) {
391             /* don't have the lock, and can't lock it, then
392              * return failure.
393              */
394             return 0;
395         }
396     }
397
398     /* remember dirty flag for later */
399     code = bufp->flags & CM_BUF_DIRTY;
400
401     /* release lock if we obtained it here */
402     if (!isBufLocked) 
403         lock_ReleaseMutex(&bufp->mx);
404
405     /* if buffer was dirty, buffer is acceptable for use */
406     if (code) 
407         return 1;
408     else 
409         return 0;
410 }
411
412 /* used when deciding whether to do a prefetch or not */
413 long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, long length,
414                         cm_user_t *up, cm_req_t *reqp, osi_hyper_t *realBasep)
415 {
416     osi_hyper_t toffset;
417     osi_hyper_t tbase;
418     long code;
419     cm_buf_t *bp;
420     int stop;
421         
422     /* now scan all buffers in the range, looking for any that look like
423      * they need work.
424      */
425     tbase = *startBasep;
426     stop = 0;
427     lock_ObtainMutex(&scp->mx);
428     while(length > 0) {
429         /* get callback so we can do a meaningful dataVersion comparison */
430         code = cm_SyncOp(scp, NULL, up, reqp, 0,
431                          CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
432         if (code) {
433             scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
434             lock_ReleaseMutex(&scp->mx);
435             return code;
436         }
437                 
438         if (LargeIntegerGreaterThanOrEqualTo(tbase, scp->length)) {
439             /* we're past the end of file */
440             break;
441         }
442
443         bp = buf_Find(scp, &tbase);
444         /* We cheat slightly by not locking the bp mutex. */
445         if (bp) {
446             if ((bp->cmFlags
447                   & (CM_BUF_CMFETCHING | CM_BUF_CMSTORING)) == 0
448                  && bp->dataVersion != scp->dataVersion)
449                 stop = 1;
450             buf_Release(bp);
451         }
452         else 
453             stop = 1;
454
455         /* if this buffer is essentially guaranteed to require a fetch,
456          * break out here and return this position.
457          */
458         if (stop) 
459             break;
460                 
461         toffset.LowPart = cm_data.buf_blockSize;
462         toffset.HighPart = 0;
463         tbase = LargeIntegerAdd(toffset, tbase);
464         length -= cm_data.buf_blockSize;
465     }
466         
467     /* if we get here, either everything is fine or stop stopped us at a
468      * particular buffer in the range that definitely needs to be fetched.
469      */
470     if (stop == 0) {
471         /* return non-zero code since realBasep won't be valid */
472         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
473         code = -1;
474     }   
475     else {
476         /* successfully found a page that will need fetching */
477         *realBasep = tbase;
478         code = 0;
479     }
480     lock_ReleaseMutex(&scp->mx);
481     return code;
482 }
483
484 void cm_BkgStore(cm_scache_t *scp, long p1, long p2, long p3, long p4,
485                  cm_user_t *userp)
486 {
487     osi_hyper_t toffset;
488     long length;
489     cm_req_t req;
490
491     cm_InitReq(&req);
492     req.flags |= CM_REQ_NORETRY;
493
494     toffset.LowPart = p1;
495     toffset.HighPart = p2;
496     length = p3;
497
498     osi_Log2(afsd_logp, "Starting BKG store vp 0x%x, base 0x%x", scp, p1);
499
500     cm_BufWrite(&scp->fid, &toffset, length, /* flags */ 0, userp, &req);
501
502     lock_ObtainMutex(&scp->mx);
503     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
504     lock_ReleaseMutex(&scp->mx);
505 }
506
507 void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base)
508 {
509     osi_hyper_t thyper;
510
511     if (code == 0) {
512         thyper.LowPart = cm_chunkSize;
513         thyper.HighPart = 0;
514         thyper =  LargeIntegerAdd(*base, thyper);
515         thyper.LowPart &= (-cm_chunkSize);
516         if (LargeIntegerGreaterThan(*base, scp->prefetch.base))
517             scp->prefetch.base = *base;
518         if (LargeIntegerGreaterThan(thyper, scp->prefetch.end))
519             scp->prefetch.end = thyper;
520     }
521     scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
522 }
523
524 /* do the prefetch */
525 void cm_BkgPrefetch(cm_scache_t *scp, long p1, long p2, long p3, long p4,
526                     cm_user_t *userp)
527 {
528     long length;
529     osi_hyper_t base;
530     long code;
531     cm_buf_t *bp;
532     int cpff = 0;                       /* cleared prefetch flag */
533     cm_req_t req;
534
535     cm_InitReq(&req);
536     req.flags |= CM_REQ_NORETRY;
537         
538     base.LowPart = p1;
539     base.HighPart = p2;
540     length = p3;
541         
542     osi_Log2(afsd_logp, "Starting BKG prefetch vp 0x%x, base 0x%x", scp, p1);
543
544     code = buf_Get(scp, &base, &bp);
545
546     lock_ObtainMutex(&scp->mx);
547
548     if (code || (bp->cmFlags & CM_BUF_CMFETCHING)) {
549         scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
550         lock_ReleaseMutex(&scp->mx);
551         return;
552     }
553
554     code = cm_GetBuffer(scp, bp, &cpff, userp, &req);
555     if (!cpff) 
556         cm_ClearPrefetchFlag(code, scp, &base);
557     lock_ReleaseMutex(&scp->mx);
558     buf_Release(bp);
559     return;
560 }
561
562 /* a read was issued to offsetp, and we have to determine whether we should
563  * do a prefetch.
564  */
565 void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp,
566                          cm_user_t *userp, cm_req_t *reqp)
567 {
568     long code;
569     osi_hyper_t realBase;
570     osi_hyper_t readBase;
571         
572     readBase = *offsetp;
573     /* round up to chunk boundary */
574     readBase.LowPart += (cm_chunkSize-1);
575     readBase.LowPart &= (-cm_chunkSize);
576
577     lock_ObtainMutex(&scp->mx);
578     if ((scp->flags & CM_SCACHEFLAG_PREFETCHING)
579          || LargeIntegerLessThanOrEqualTo(readBase, scp->prefetch.base)) {
580         lock_ReleaseMutex(&scp->mx);
581         return;
582     }
583     scp->flags |= CM_SCACHEFLAG_PREFETCHING;
584
585     /* start the scan at the latter of the end of this read or
586      * the end of the last fetched region.
587      */
588     if (LargeIntegerGreaterThan(scp->prefetch.end, readBase))
589         readBase = scp->prefetch.end;
590
591     lock_ReleaseMutex(&scp->mx);
592
593     code = cm_CheckFetchRange(scp, &readBase, cm_chunkSize, userp, reqp,
594                               &realBase);
595     if (code) 
596         return; /* can't find something to prefetch */
597
598     osi_Log2(afsd_logp, "BKG Prefetch request vp 0x%x, base 0x%x",
599              scp, realBase.LowPart);
600
601     cm_QueueBKGRequest(scp, cm_BkgPrefetch, realBase.LowPart,
602                        realBase.HighPart, cm_chunkSize, 0, userp);
603 }
604
605 /* scp must be locked; temporarily unlocked during processing.
606  * If returns 0, returns buffers held in biop, and with
607  * CM_BUF_CMSTORING set.
608  *
609  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
610  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
611  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
612  * must be woken, and the event must be set when the I/O is done.  All of this
613  * is required so that buf_WaitIO synchronizes properly with the buffer as it
614  * is being written out.
615  */
616 long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, long inSize,
617                        cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
618 {
619     cm_buf_t *bufp;
620     osi_queueData_t *qdp;
621     osi_hyper_t thyper;
622     osi_hyper_t tbase;
623     osi_hyper_t scanStart;              /* where to start scan for dirty pages */
624     osi_hyper_t scanEnd;                /* where to stop scan for dirty pages */
625     osi_hyper_t firstModOffset; /* offset of first modified page in range */
626     long temp;
627     long code;
628     long flags;                 /* flags to cm_SyncOp */
629         
630     /* clear things out */
631     biop->scp = scp;            /* don't hold */
632     biop->offset = *inOffsetp;
633     biop->length = 0;
634     biop->bufListp = NULL;
635     biop->bufListEndp = NULL;
636     biop->reserved = 0;
637
638     /* reserve a chunk's worth of buffers */
639     lock_ReleaseMutex(&scp->mx);
640     buf_ReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
641     lock_ObtainMutex(&scp->mx);
642
643     bufp = NULL;
644     for (temp = 0; temp < inSize; temp += cm_data.buf_blockSize, bufp = NULL) {
645         thyper.HighPart = 0;
646         thyper.LowPart = temp;
647         tbase = LargeIntegerAdd(*inOffsetp, thyper);
648
649         bufp = buf_Find(scp, &tbase);
650         if (bufp) {
651             /* get buffer mutex and scp mutex safely */
652             lock_ReleaseMutex(&scp->mx);
653             lock_ObtainMutex(&bufp->mx);
654             lock_ObtainMutex(&scp->mx);
655
656             flags = CM_SCACHESYNC_NEEDCALLBACK
657                 | CM_SCACHESYNC_GETSTATUS
658                     | CM_SCACHESYNC_STOREDATA
659                         | CM_SCACHESYNC_BUFLOCKED;
660             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags); 
661             if (code) {
662                 lock_ReleaseMutex(&bufp->mx);
663                 buf_Release(bufp);
664                 buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
665                 return code;
666             }   
667                         
668             /* if the buffer is dirty, we're done */
669             if (bufp->flags & CM_BUF_DIRTY) {
670                 osi_assertx(!(bufp->flags & CM_BUF_WRITING),
671                             "WRITING w/o CMSTORING in SetupStoreBIOD");
672                 bufp->flags |= CM_BUF_WRITING;
673                 break;
674             }
675
676             /* this buffer is clean, so there's no reason to process it */
677             cm_SyncOpDone(scp, bufp, flags);
678             lock_ReleaseMutex(&bufp->mx);
679             buf_Release(bufp);
680         }       
681     }
682
683     biop->reserved = 1;
684         
685     /* if we get here, if bufp is null, we didn't find any dirty buffers
686      * that weren't already being stored back, so we just quit now.
687      */
688     if (!bufp) {
689         return 0;
690     }
691
692     /* don't need buffer mutex any more */
693     lock_ReleaseMutex(&bufp->mx);
694         
695     /* put this element in the list */
696     qdp = osi_QDAlloc();
697     osi_SetQData(qdp, bufp);
698     /* don't have to hold bufp, since held by buf_Find above */
699     osi_QAddH((osi_queue_t **) &biop->bufListp,
700               (osi_queue_t **) &biop->bufListEndp,
701               &qdp->q);
702     biop->length = cm_data.buf_blockSize;
703     firstModOffset = bufp->offset;
704     biop->offset = firstModOffset;
705
706     /* compute the window surrounding *inOffsetp of size cm_chunkSize */
707     scanStart = *inOffsetp;
708     scanStart.LowPart &= (-cm_chunkSize);
709     thyper.LowPart = cm_chunkSize;
710     thyper.HighPart = 0;
711     scanEnd = LargeIntegerAdd(scanStart, thyper);
712
713     flags = CM_SCACHESYNC_NEEDCALLBACK
714         | CM_SCACHESYNC_GETSTATUS
715         | CM_SCACHESYNC_STOREDATA
716         | CM_SCACHESYNC_BUFLOCKED
717         | CM_SCACHESYNC_NOWAIT;
718
719     /* start by looking backwards until scanStart */
720     thyper.HighPart = 0;                /* hyper version of cm_data.buf_blockSize */
721     thyper.LowPart = cm_data.buf_blockSize;
722     tbase = LargeIntegerSubtract(firstModOffset, thyper);
723     while(LargeIntegerGreaterThanOrEqualTo(tbase, scanStart)) {
724         /* see if we can find the buffer */
725         bufp = buf_Find(scp, &tbase);
726         if (!bufp) 
727             break;
728
729         /* try to lock it, and quit if we can't (simplifies locking) */
730         lock_ReleaseMutex(&scp->mx);
731         code = lock_TryMutex(&bufp->mx);
732         lock_ObtainMutex(&scp->mx);
733         if (code == 0) {
734             buf_Release(bufp);
735             break;
736         }
737                 
738         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
739         if (code) {
740             lock_ReleaseMutex(&bufp->mx);
741             buf_Release(bufp);
742             break;
743         }
744                 
745         if (!(bufp->flags & CM_BUF_DIRTY)) {
746             /* buffer is clean, so we shouldn't add it */
747             cm_SyncOpDone(scp, bufp, flags);
748             lock_ReleaseMutex(&bufp->mx);
749             buf_Release(bufp);
750             break;
751         }
752
753         /* don't need buffer mutex any more */
754         lock_ReleaseMutex(&bufp->mx);
755
756         /* we have a dirty buffer ready for storing.  Add it to the tail
757          * of the list, since it immediately precedes all of the disk
758          * addresses we've already collected.
759          */
760         qdp = osi_QDAlloc();
761         osi_SetQData(qdp, bufp);
762         /* no buf_hold necessary, since we have it held from buf_Find */
763         osi_QAddT((osi_queue_t **) &biop->bufListp,
764                   (osi_queue_t **) &biop->bufListEndp,
765                   &qdp->q);
766
767         /* update biod info describing the transfer */
768         biop->offset = LargeIntegerSubtract(biop->offset, thyper);
769         biop->length += cm_data.buf_blockSize;
770
771         /* update loop pointer */
772         tbase = LargeIntegerSubtract(tbase, thyper);
773     }   /* while loop looking for pages preceding the one we found */
774
775     /* now, find later dirty, contiguous pages, and add them to the list */
776     thyper.HighPart = 0;                /* hyper version of cm_data.buf_blockSize */
777     thyper.LowPart = cm_data.buf_blockSize;
778     tbase = LargeIntegerAdd(firstModOffset, thyper);
779     while(LargeIntegerLessThan(tbase, scanEnd)) {
780         /* see if we can find the buffer */
781         bufp = buf_Find(scp, &tbase);
782         if (!bufp) 
783             break;
784
785         /* try to lock it, and quit if we can't (simplifies locking) */
786         lock_ReleaseMutex(&scp->mx);
787         code = lock_TryMutex(&bufp->mx);
788         lock_ObtainMutex(&scp->mx);
789         if (code == 0) {
790             buf_Release(bufp);
791             break;
792         }
793
794         code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
795         if (code) {
796             lock_ReleaseMutex(&bufp->mx);
797             buf_Release(bufp);
798             break;
799         }
800                 
801         if (!(bufp->flags & CM_BUF_DIRTY)) {
802             /* buffer is clean, so we shouldn't add it */
803             cm_SyncOpDone(scp, bufp, flags);
804             lock_ReleaseMutex(&bufp->mx);
805             buf_Release(bufp);
806             break;
807         }
808
809         /* don't need buffer mutex any more */
810         lock_ReleaseMutex(&bufp->mx);
811
812         /* we have a dirty buffer ready for storing.  Add it to the head
813          * of the list, since it immediately follows all of the disk
814          * addresses we've already collected.
815          */
816         qdp = osi_QDAlloc();
817         osi_SetQData(qdp, bufp);
818         /* no buf_hold necessary, since we have it held from buf_Find */
819         osi_QAddH((osi_queue_t **) &biop->bufListp,
820                   (osi_queue_t **) &biop->bufListEndp,
821                   &qdp->q);
822
823         /* update biod info describing the transfer */
824         biop->length += cm_data.buf_blockSize;
825                 
826         /* update loop pointer */
827         tbase = LargeIntegerAdd(tbase, thyper);
828     }   /* while loop looking for pages following the first page we found */
829         
830     /* finally, we're done */
831     return 0;
832 }
833
834 /* scp must be locked; temporarily unlocked during processing.
835  * If returns 0, returns buffers held in biop, and with
836  * CM_BUF_CMFETCHING flags set.
837  * If an error is returned, we don't return any buffers.
838  */
839 long cm_SetupFetchBIOD(cm_scache_t *scp, osi_hyper_t *offsetp,
840                         cm_bulkIO_t *biop, cm_user_t *up, cm_req_t *reqp)
841 {
842     long code;
843     cm_buf_t *tbp;
844     osi_hyper_t toffset;                /* a long long temp variable */
845     osi_hyper_t pageBase;               /* base offset we're looking at */
846     osi_queueData_t *qdp;               /* one temp queue structure */
847     osi_queueData_t *tqdp;              /* another temp queue structure */
848     long collected;                     /* how many bytes have been collected */
849     int isFirst;
850     long flags;
851     osi_hyper_t fileSize;               /* the # of bytes in the file */
852     osi_queueData_t *heldBufListp;      /* we hold all buffers in this list */
853     osi_queueData_t *heldBufListEndp;   /* first one */
854     int reserving;
855
856     biop->scp = scp;
857     biop->offset = *offsetp;
858     /* null out the list of buffers */
859     biop->bufListp = biop->bufListEndp = NULL;
860     biop->reserved = 0;
861
862     /* first lookup the file's length, so we know when to stop */
863     code = cm_SyncOp(scp, NULL, up, reqp, 0, 
864                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
865     if (code) 
866         return code;
867         
868     /* copy out size, since it may change */
869     fileSize = scp->serverLength;
870         
871     lock_ReleaseMutex(&scp->mx);
872
873     pageBase = *offsetp;
874     collected = pageBase.LowPart & (cm_chunkSize - 1);
875     heldBufListp = NULL;
876     heldBufListEndp = NULL;
877
878     /*
879      * Obtaining buffers can cause dirty buffers to be recycled, which
880      * can cause a storeback, so cannot be done while we have buffers
881      * reserved.
882      *
883      * To get around this, we get buffers twice.  Before reserving buffers,
884      * we obtain and release each one individually.  After reserving
885      * buffers, we try to obtain them again, but only by lookup, not by
886      * recycling.  If a buffer has gone away while we were waiting for
887      * the others, we just use whatever buffers we already have.
888      *
889      * On entry to this function, we are already holding a buffer, so we
890      * can't wait for reservation.  So we call buf_TryReserveBuffers()
891      * instead.  Not only that, we can't really even call buf_Get(), for
892      * the same reason.  We can't avoid that, though.  To avoid deadlock
893      * we allow only one thread to be executing the buf_Get()-buf_Release()
894      * sequence at a time.
895      */
896
897     // lock_ObtainMutex(&cm_bufGetMutex);
898     /* first hold all buffers, since we can't hold any locks in buf_Get */
899     while (1) {
900         /* stop at chunk boundary */
901         if (collected >= cm_chunkSize) 
902             break;
903                 
904         /* see if the next page would be past EOF */
905         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
906             break;
907
908         code = buf_Get(scp, &pageBase, &tbp);
909         if (code) {
910             lock_ReleaseMutex(&cm_bufGetMutex);
911             lock_ObtainMutex(&scp->mx);
912             cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
913             return code;
914         }
915                 
916         buf_Release(tbp);
917
918         toffset.HighPart = 0;
919         toffset.LowPart = cm_data.buf_blockSize;
920         pageBase = LargeIntegerAdd(toffset, pageBase);
921         collected += cm_data.buf_blockSize;
922     }
923
924     /* reserve a chunk's worth of buffers if possible */
925     reserving = buf_TryReserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
926
927     // lock_ReleaseMutex(&cm_bufGetMutex);
928
929     pageBase = *offsetp;
930     collected = pageBase.LowPart & (cm_chunkSize - 1);
931
932     /* now hold all buffers, if they are still there */
933     while (1) {
934         /* stop at chunk boundary */
935         if (collected >= cm_chunkSize) 
936             break;
937                 
938         /* see if the next page would be past EOF */
939         if (LargeIntegerGreaterThanOrEqualTo(pageBase, fileSize)) 
940             break;
941
942         tbp = buf_Find(scp, &pageBase);
943         if (!tbp) 
944             break;
945
946         /* add the buffer to the list */
947         qdp = osi_QDAlloc();
948         osi_SetQData(qdp, tbp);
949         osi_QAdd((osi_queue_t **)&heldBufListp, &qdp->q);
950         if (!heldBufListEndp) heldBufListEndp = qdp;
951         /* leave tbp held (from buf_Get) */
952
953         if (!reserving) 
954             break;
955
956         collected += cm_data.buf_blockSize;
957         toffset.HighPart = 0;
958         toffset.LowPart = cm_data.buf_blockSize;
959         pageBase = LargeIntegerAdd(toffset, pageBase);
960     }
961
962     /* look at each buffer, adding it into the list if it looks idle and
963      * filled with old data.  One special case: wait for idle if it is the
964      * first buffer since we really need that one for our caller to make
965      * any progress.
966      */
967     isFirst = 1;
968     collected = 0;              /* now count how many we'll really use */
969     for (tqdp = heldBufListEndp;
970         tqdp;
971           tqdp = (osi_queueData_t *) osi_QPrev(&tqdp->q)) {
972         /* get a ptr to the held buffer */
973         tbp = osi_GetQData(tqdp);
974         pageBase = tbp->offset;
975
976         /* now lock the buffer lock */
977         lock_ObtainMutex(&tbp->mx);
978         lock_ObtainMutex(&scp->mx);
979
980         /* don't bother fetching over data that is already current */
981         if (tbp->dataVersion == scp->dataVersion) {
982             /* we don't need this buffer, since it is current */
983             lock_ReleaseMutex(&scp->mx);
984             lock_ReleaseMutex(&tbp->mx);
985             break;
986         }
987
988         flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_FETCHDATA
989             | CM_SCACHESYNC_BUFLOCKED;
990         if (!isFirst) 
991             flags |= CM_SCACHESYNC_NOWAIT;
992
993         /* wait for the buffer to serialize, if required.  Doesn't
994          * release the scp or buffer lock(s) if NOWAIT is specified.
995          */
996         code = cm_SyncOp(scp, tbp, up, reqp, 0, flags);
997         if (code) {
998             lock_ReleaseMutex(&scp->mx);
999             lock_ReleaseMutex(&tbp->mx);
1000             break;
1001         }
1002                 
1003         /* don't fetch over dirty buffers */
1004         if (tbp->flags & CM_BUF_DIRTY) {
1005             cm_SyncOpDone(scp, tbp, flags);
1006             lock_ReleaseMutex(&scp->mx);
1007             lock_ReleaseMutex(&tbp->mx);
1008             break;
1009         }
1010
1011         /* Release locks */
1012         lock_ReleaseMutex(&scp->mx);
1013         lock_ReleaseMutex(&tbp->mx);
1014
1015         /* add the buffer to the list */
1016         qdp = osi_QDAlloc();
1017         osi_SetQData(qdp, tbp);
1018         osi_QAdd((osi_queue_t **)&biop->bufListp, &qdp->q);
1019         if (!biop->bufListEndp) 
1020             biop->bufListEndp = qdp;
1021         buf_Hold(tbp);
1022
1023         /* from now on, a failure just stops our collection process, but
1024          * we still do the I/O to whatever we've already managed to collect.
1025          */
1026         isFirst = 0;
1027         collected += cm_data.buf_blockSize;
1028     }
1029         
1030     /* now, we've held in biop->bufListp all the buffer's we're really
1031      * interested in.  We also have holds left from heldBufListp, and we
1032      * now release those holds on the buffers.
1033      */
1034     for (qdp = heldBufListp; qdp; qdp = tqdp) {
1035         tqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1036         tbp = osi_GetQData(qdp);
1037         osi_QDFree(qdp);
1038         buf_Release(tbp);
1039     }
1040
1041     /* Caller expects this */
1042     lock_ObtainMutex(&scp->mx);
1043  
1044     /* if we got a failure setting up the first buffer, then we don't have
1045      * any side effects yet, and we also have failed an operation that the
1046      * caller requires to make any progress.  Give up now.
1047      */
1048     if (code && isFirst) {
1049         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1050         return code;
1051     }
1052         
1053     /* otherwise, we're still OK, and should just return the I/O setup we've
1054      * got.
1055      */
1056     biop->length = collected;
1057     biop->reserved = reserving;
1058     return 0;
1059 }
1060
1061 /* release a bulk I/O structure that was setup by cm_SetupFetchBIOD or by
1062  * cm_SetupStoreBIOD
1063  */
1064 void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore)
1065 {
1066     cm_scache_t *scp;
1067     cm_buf_t *bufp;
1068     osi_queueData_t *qdp;
1069     osi_queueData_t *nqdp;
1070     int flags;
1071
1072     /* Give back reserved buffers */
1073     if (biop->reserved)
1074         buf_UnreserveBuffers(cm_chunkSize / cm_data.buf_blockSize);
1075         
1076     flags = CM_SCACHESYNC_NEEDCALLBACK;
1077     if (isStore)
1078         flags |= CM_SCACHESYNC_STOREDATA;
1079     else
1080         flags |= CM_SCACHESYNC_FETCHDATA;
1081
1082     scp = biop->scp;
1083     for(qdp = biop->bufListp; qdp; qdp = nqdp) {
1084         /* lookup next guy first, since we're going to free this one */
1085         nqdp = (osi_queueData_t *) osi_QNext(&qdp->q);
1086                 
1087         /* extract buffer and free queue data */
1088         bufp = osi_GetQData(qdp);
1089         osi_QDFree(qdp);
1090
1091         /* now, mark I/O as done, unlock the buffer and release it */
1092         lock_ObtainMutex(&bufp->mx);
1093         lock_ObtainMutex(&scp->mx);
1094         cm_SyncOpDone(scp, bufp, flags);
1095         lock_ReleaseMutex(&scp->mx);
1096                 
1097         /* turn off writing and wakeup users */
1098         if (isStore) {
1099             if (bufp->flags & CM_BUF_WAITING) {
1100                 osi_Wakeup((long) bufp);
1101             }
1102             bufp->flags &= ~(CM_BUF_WAITING | CM_BUF_WRITING | CM_BUF_DIRTY);
1103         }
1104
1105         lock_ReleaseMutex(&bufp->mx);
1106         buf_Release(bufp);
1107     }
1108
1109     /* clean things out */
1110     biop->bufListp = NULL;
1111     biop->bufListEndp = NULL;
1112 }   
1113
1114 /* Fetch a buffer.  Called with scp locked.
1115  * The scp is locked on return.
1116  */
1117 long cm_GetBuffer(cm_scache_t *scp, cm_buf_t *bufp, int *cpffp, cm_user_t *up,
1118                   cm_req_t *reqp)
1119 {
1120     long code;
1121     long nbytes;                        /* bytes in transfer */
1122     long rbytes;                        /* bytes in rx_Read call */
1123     long temp;
1124     AFSFetchStatus afsStatus;
1125     AFSCallBack callback;
1126     AFSVolSync volSync;
1127     char *bufferp;
1128     cm_buf_t *tbufp;            /* buf we're filling */
1129     osi_queueData_t *qdp;               /* q element we're scanning */
1130     AFSFid tfid;
1131     struct rx_call *callp;
1132     struct rx_connection *rxconnp;
1133     cm_bulkIO_t biod;           /* bulk IO descriptor */
1134     cm_conn_t *connp;
1135     int getroot;
1136     long t1, t2;
1137
1138     /* now, the buffer may or may not be filled with good data (buf_GetNew
1139      * drops lots of locks, and may indeed return a properly initialized
1140      * buffer, although more likely it will just return a new, empty, buffer.
1141      */
1142
1143 #ifdef AFS_FREELANCE_CLIENT
1144
1145     // yj: if they're trying to get the /afs directory, we need to
1146     // handle it differently, since it's local rather than on any
1147     // server
1148
1149     getroot = (scp==cm_data.rootSCachep);
1150     if (getroot)
1151         osi_Log1(afsd_logp,"GetBuffer returns cm_data.rootSCachep=%x",cm_data.rootSCachep);
1152 #endif
1153
1154     cm_AFSFidFromFid(&tfid, &scp->fid);
1155
1156     code = cm_SetupFetchBIOD(scp, &bufp->offset, &biod, up, reqp);
1157     if (code) {
1158         /* couldn't even get the first page setup properly */
1159         osi_Log1(afsd_logp, "SetupFetchBIOD failure code %d", code);
1160         return code;
1161     }
1162
1163     /* once we get here, we have the callback in place, we know that no one
1164      * is fetching the data now.  Check one last time that we still have
1165      * the wrong data, and then fetch it if we're still wrong.
1166      *
1167      * We can lose a race condition and end up with biod.length zero, in
1168      * which case we just retry.
1169      */
1170     if (bufp->dataVersion == scp->dataVersion || biod.length == 0) {
1171         osi_Log3(afsd_logp, "Bad DVs %d, %d or length 0x%x",
1172                  bufp->dataVersion, scp->dataVersion, biod.length);
1173         if ((bufp->dataVersion == -1
1174              || bufp->dataVersion < scp->dataVersion)
1175              && LargeIntegerGreaterThanOrEqualTo(bufp->offset,
1176                                                  scp->serverLength)) {
1177             if (bufp->dataVersion == -1)
1178                 memset(bufp->datap, 0, cm_data.buf_blockSize);
1179             bufp->dataVersion = scp->dataVersion;
1180         }
1181         lock_ReleaseMutex(&scp->mx);
1182         cm_ReleaseBIOD(&biod, 0);
1183         lock_ObtainMutex(&scp->mx);
1184         return 0;
1185     }
1186         
1187     lock_ReleaseMutex(&scp->mx);
1188
1189 #ifdef DISKCACHE95
1190     DPRINTF("cm_GetBuffer: fetching data scpDV=%d bufDV=%d scp=%x bp=%x dcp=%x\n",
1191             scp->dataVersion, bufp->dataVersion, scp, bufp, bufp->dcp);
1192 #endif /* DISKCACHE95 */
1193
1194 #ifdef AFS_FREELANCE_CLIENT
1195
1196     // yj code
1197     // if getroot then we don't need to make any calls
1198     // just return fake data
1199         
1200     if (cm_freelanceEnabled && getroot) {
1201         // setup the fake status                        
1202         afsStatus.InterfaceVersion = 0x1;
1203         afsStatus.FileType = 0x2;
1204         afsStatus.LinkCount = scp->linkCount;
1205         afsStatus.Length = cm_fakeDirSize;
1206         afsStatus.DataVersion = cm_data.fakeDirVersion;
1207         afsStatus.Author = 0x1;
1208         afsStatus.Owner = 0x0;
1209         afsStatus.CallerAccess = 0x9;
1210         afsStatus.AnonymousAccess = 0x9;
1211         afsStatus.UnixModeBits = 0x1ff;
1212         afsStatus.ParentVnode = 0x1;
1213         afsStatus.ParentUnique = 0x1;
1214         afsStatus.ResidencyMask = 0;
1215         afsStatus.ClientModTime = (afs_uint32)FakeFreelanceModTime;
1216         afsStatus.ServerModTime = (afs_uint32)FakeFreelanceModTime;
1217         afsStatus.Group = 0;
1218         afsStatus.SyncCounter = 0;
1219         afsStatus.dataVersionHigh = 0;
1220         afsStatus.lockCount = 0;
1221         afsStatus.Length_hi = 0;
1222         afsStatus.errorCode = 0;
1223         
1224         // once we're done setting up the status info,
1225         // we just fill the buffer pages with fakedata
1226         // from cm_FakeRootDir. Extra pages are set to
1227         // 0. 
1228                 
1229         lock_ObtainMutex(&cm_Freelance_Lock);
1230         t1 = bufp->offset.LowPart;
1231         qdp = biod.bufListEndp;
1232         while (qdp) {
1233             tbufp = osi_GetQData(qdp);
1234             bufferp=tbufp->datap;
1235             memset(bufferp, 0, cm_data.buf_blockSize);
1236             t2 = cm_fakeDirSize - t1;
1237             if (t2>cm_data.buf_blockSize) t2=cm_data.buf_blockSize;
1238             if (t2 > 0) {
1239                 memcpy(bufferp, cm_FakeRootDir+t1, t2);
1240             } else {
1241                 t2 = 0;
1242             }
1243             t1+=t2;
1244             qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1245
1246         }
1247         lock_ReleaseMutex(&cm_Freelance_Lock);
1248         
1249         // once we're done, we skip over the part of the
1250         // code that does the ACTUAL fetching of data for
1251         // real files
1252
1253         goto fetchingcompleted;
1254     }
1255
1256 #endif /* AFS_FREELANCE_CLIENT */
1257
1258         /* now make the call */
1259     do {
1260         code = cm_Conn(&scp->fid, up, reqp, &connp);
1261         if (code) 
1262             continue;
1263         
1264         rxconnp = cm_GetRxConn(connp);
1265         callp = rx_NewCall(rxconnp);
1266         rx_PutConnection(rxconnp);
1267
1268         osi_Log3(afsd_logp, "CALL FetchData vp %x, off 0x%x, size 0x%x",
1269                   (long) scp, biod.offset.LowPart, biod.length);
1270
1271         code = StartRXAFS_FetchData(callp, &tfid, biod.offset.LowPart,
1272                                     biod.length);
1273
1274         /* now copy the data out of the pipe and put it in the buffer */
1275         temp  = rx_Read(callp, (char *)&nbytes, 4);
1276         if (temp == 4) {
1277             nbytes = ntohl(nbytes);
1278             if (nbytes > biod.length) 
1279                 code = (callp->error < 0) ? callp->error : -1;
1280         }
1281         else 
1282             code = (callp->error < 0) ? callp->error : -1;
1283
1284         if (code == 0) {
1285             qdp = biod.bufListEndp;
1286             if (qdp) {
1287                 tbufp = osi_GetQData(qdp);
1288                 bufferp = tbufp->datap;
1289             }
1290             else 
1291                 bufferp = NULL;
1292             /* fill nbytes of data from the pipe into the pages.
1293              * When we stop, qdp will point at the last page we're
1294              * dealing with, and bufferp will tell us where we
1295              * stopped.  We'll need this info below when we clear
1296              * the remainder of the last page out (and potentially
1297              * clear later pages out, if we fetch past EOF).
1298              */
1299             while (nbytes > 0) {
1300                 /* assert that there are still more buffers;
1301                  * our check above for nbytes being less than
1302                  * biod.length should ensure this.
1303                  */
1304                 osi_assert(bufferp != NULL);
1305
1306                 /* read rbytes of data */
1307                 rbytes = (nbytes > cm_data.buf_blockSize? cm_data.buf_blockSize : nbytes);
1308                 temp = rx_Read(callp, bufferp, rbytes);
1309                 if (temp < rbytes) {
1310                     code = (callp->error < 0) ? callp->error : -1;
1311                     break;
1312                 }
1313
1314                 /* allow read-while-fetching.
1315                  * if this is the last buffer, clear the
1316                  * PREFETCHING flag, so the reader waiting for
1317                  * this buffer will start a prefetch.
1318                  */
1319                 tbufp->cmFlags |= CM_BUF_CMFULLYFETCHED;
1320                 lock_ObtainMutex(&scp->mx);
1321                 if (scp->flags & CM_SCACHEFLAG_WAITING) {
1322                     scp->flags &= ~CM_SCACHEFLAG_WAITING;
1323                     osi_Wakeup((long) &scp->flags);
1324                 }
1325                 if (cpffp && !*cpffp && !osi_QPrev(&qdp->q)) {
1326                     *cpffp = 1;
1327                     cm_ClearPrefetchFlag(0, scp, &biod.offset);
1328                 }
1329                 lock_ReleaseMutex(&scp->mx);
1330
1331                 /* and adjust counters */
1332                 nbytes -= temp;
1333
1334                 /* and move to the next buffer */
1335                 if (nbytes != 0) {
1336                     qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1337                     if (qdp) {
1338                         tbufp = osi_GetQData(qdp);
1339                         bufferp = tbufp->datap;
1340                     }
1341                     else 
1342                         bufferp = NULL;
1343                 } else 
1344                     bufferp += temp;
1345             }
1346
1347             /* zero out remainder of last pages, in case we are
1348              * fetching past EOF.  We were fetching an integral #
1349              * of pages, but stopped, potentially in the middle of
1350              * a page.  Zero the remainder of that page, and then
1351              * all of the rest of the pages.
1352              */
1353             /* bytes fetched */
1354             rbytes = bufferp - tbufp->datap;
1355             /* bytes left to zero */
1356             rbytes = cm_data.buf_blockSize - rbytes;
1357             while(qdp) {
1358                 if (rbytes != 0)
1359                     memset(bufferp, 0, rbytes);
1360                 qdp = (osi_queueData_t *) osi_QPrev(&qdp->q);
1361                 if (qdp == NULL) 
1362                     break;
1363                 tbufp = osi_GetQData(qdp);
1364                 bufferp = tbufp->datap;
1365                 /* bytes to clear in this page */
1366                 rbytes = cm_data.buf_blockSize;
1367             }   
1368         }
1369
1370         if (code == 0)
1371             code = EndRXAFS_FetchData(callp, &afsStatus, &callback, &volSync);
1372         else
1373             osi_Log0(afsd_logp, "CALL EndRXAFS_FetchData skipped due to error");
1374         code = rx_EndCall(callp, code);
1375         if (code == RXKADUNKNOWNKEY)
1376             osi_Log0(afsd_logp, "CALL EndCall returns RXKADUNKNOWNKEY");
1377         osi_Log0(afsd_logp, "CALL FetchData DONE");
1378
1379     } while (cm_Analyze(connp, up, reqp, &scp->fid, &volSync, NULL, NULL, code));
1380
1381   fetchingcompleted:
1382     code = cm_MapRPCError(code, reqp);
1383
1384     lock_ObtainMutex(&scp->mx);
1385     
1386     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_FETCHSTATUS);
1387
1388     /* we know that no one else has changed the buffer, since we still have
1389      * the fetching flag on the buffers, and we have the scp locked again.
1390      * Copy in the version # into the buffer if we got code 0 back from the
1391      * read.
1392      */
1393     if (code == 0) {
1394         for(qdp = biod.bufListp;
1395              qdp;
1396              qdp = (osi_queueData_t *) osi_QNext(&qdp->q)) {
1397             tbufp = osi_GetQData(qdp);
1398             tbufp->dataVersion = afsStatus.DataVersion;
1399
1400 #ifdef DISKCACHE95
1401             /* write buffer out to disk cache */
1402             diskcache_Update(tbufp->dcp, tbufp->datap, cm_data.buf_blockSize,
1403                               tbufp->dataVersion);
1404 #endif /* DISKCACHE95 */
1405         }
1406     }
1407
1408     /* release scatter/gather I/O structure (buffers, locks) */
1409     lock_ReleaseMutex(&scp->mx);
1410     cm_ReleaseBIOD(&biod, 0);
1411     lock_ObtainMutex(&scp->mx);
1412
1413     if (code == 0) 
1414         cm_MergeStatus(scp, &afsStatus, &volSync, up, 0);
1415     return code;
1416 }