Windows: remove last bits of BIOD from Direct Store
[openafs.git] / src / WINNT / afsd / cm_direct.c
1 /*
2  * Copyright (c) 2012 Your File System, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright
12  *   notice, this list of conditions and the following disclaimer in the
13  *   documentation and/or other materials provided with the distribution.
14  * - Neither the name of Your File System, Inc nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission from
17  *   Your File System, Inc.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
23  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  */
31
32 #include <afsconfig.h>
33 #include <afs/param.h>
34 #include <roken.h>
35
36 #include <afs/stds.h>
37
38 #include <windows.h>
39 #include <winsock2.h>
40 #include <nb30.h>
41 #include <string.h>
42 #include <stdlib.h>
43 #include <osi.h>
44
45 #include "afsd.h"
46
47 /*
48  * cm_DirectWrite is used to write the contents of one contiguous
49  * buffer to the file server.  The input buffer must not be a
50  * cm_buf_t.data field.  The data is written to the file server without
51  * locking any buffers.  The cm_scache object is protected
52  * by cm_SyncOp( CM_SCACHESYNC_STOREDATA_EXCL) and the resulting
53  * AFSFetchStatus is merged.
54  */
55
56 static afs_int32
57 int_DirectWrite( IN cm_scache_t *scp,
58                  IN osi_hyper_t *offsetp,
59                  IN afs_uint32   length,
60                  IN afs_uint32   flags,
61                  IN cm_user_t   *userp,
62                  IN cm_req_t    *reqp,
63                  IN void        *memoryRegionp,
64                  OUT afs_uint32 *bytesWritten)
65 {
66     long code, code1;
67     long temp;
68     AFSFetchStatus outStatus;
69     AFSStoreStatus inStatus;
70     AFSVolSync volSync;
71     AFSFid tfid;
72     struct rx_call *rxcallp;
73     struct rx_connection *rxconnp;
74     cm_conn_t *connp;
75     osi_hyper_t truncPos;
76     int require_64bit_ops = 0;
77     int call_was_64bit = 0;
78     int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
79     afs_uint32 written = 0;
80
81     osi_assertx(userp != NULL, "null cm_user_t");
82
83     memset(&volSync, 0, sizeof(volSync));
84     if (bytesWritten)
85         *bytesWritten = 0;
86
87     cm_AFSFidFromFid(&tfid, &scp->fid);
88
89     if (!scp_locked)
90         lock_ObtainWrite(&scp->rw);
91
92     /* prepare the output status for the store */
93     _InterlockedOr(&scp->mask, CM_SCACHEMASK_CLIENTMODTIME);
94     cm_StatusFromAttr(&inStatus, scp, NULL);
95     truncPos = scp->length;
96     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
97          && LargeIntegerLessThan(scp->truncPos, truncPos)) {
98         truncPos = scp->truncPos;
99         _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_TRUNCPOS);
100     }
101
102     InterlockedIncrement(&scp->activeRPCs);
103     lock_ReleaseWrite(&scp->rw);
104
105     /* now we're ready to do the store operation */
106     do {
107         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
108         if (code)
109             continue;
110
111     retry:
112         rxconnp = cm_GetRxConn(connp);
113         rxcallp = rx_NewCall(rxconnp);
114         rx_PutConnection(rxconnp);
115
116         if (SERVERHAS64BIT(connp)) {
117             call_was_64bit = 1;
118
119             osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
120                      scp, offsetp->HighPart, offsetp->LowPart, length);
121             osi_Log2(afsd_logp, "... truncPos 0x%x:%08x",  truncPos.HighPart, truncPos.LowPart);
122
123             code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
124                                           offsetp->QuadPart,
125                                           length,
126                                           truncPos.QuadPart);
127             if (code)
128                 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
129             else
130                 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
131         } else {
132             call_was_64bit = 0;
133
134             if (require_64bit_ops) {
135                 osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData.  The operation requires large file support in the server.");
136                 code = CM_ERROR_TOOBIG;
137             } else {
138                 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
139                          scp, offsetp->HighPart, offsetp->LowPart, length);
140                 osi_Log1(afsd_logp, "... truncPos 0x%08x",  truncPos.LowPart);
141
142                 code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
143                                             offsetp->LowPart, length, truncPos.LowPart);
144                 if (code)
145                     osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
146                 else
147                     osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
148             }
149         }
150
151         /* Write the data */
152         if (code == 0) {
153             temp = rx_Write(rxcallp, memoryRegionp, length);
154             if (temp != length) {
155                 osi_Log2(afsd_logp, "rx_Write failed %d != %d", temp, length);
156                 code = (rx_Error(rxcallp) < 0) ? rx_Error(rxcallp) : RX_PROTOCOL_ERROR;
157                 break;
158             } else {
159                 osi_Log1(afsd_logp, "rx_Write succeeded written %d", temp);
160                 written += temp;
161             }
162         }
163
164         /* End the call */
165         if (code == 0) {
166             if (call_was_64bit) {
167                 code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
168                 if (code)
169                     osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
170                 else
171                     osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
172             } else {
173                 code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
174                 if (code)
175                     osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
176                 else
177                     osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
178             }
179         }
180
181         code1 = rx_EndCall(rxcallp, code);
182
183         if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
184             SET_SERVERHASNO64BIT(connp);
185             goto retry;
186         }
187
188         /* Prefer StoreData error over rx_EndCall error */
189         if (code1 != 0)
190             code = code1;
191     } while (cm_Analyze(connp, userp, reqp, &scp->fid, NULL, 1, &outStatus, &volSync, NULL, NULL, code));
192
193     code = cm_MapRPCError(code, reqp);
194
195     if (code)
196         osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
197     else
198         osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
199
200     /* now, clean up our state */
201     lock_ObtainWrite(&scp->rw);
202
203     if (code == 0) {
204         osi_hyper_t t;
205
206         /* now, here's something a little tricky: in AFS 3, a dirty
207          * length can't be directly stored, instead, a dirty chunk is
208          * stored that sets the file's size (by writing and by using
209          * the truncate-first option in the store call).
210          *
211          * At this point, we've just finished a store, and so the trunc
212          * pos field is clean.  If the file's size at the server is at
213          * least as big as we think it should be, then we turn off the
214          * length dirty bit, since all the other dirty buffers must
215          * precede this one in the file.
216          *
217          * The file's desired size shouldn't be smaller than what's
218          * stored at the server now, since we just did the trunc pos
219          * store.
220          *
221          * We have to turn off the length dirty bit as soon as we can,
222          * so that we see updates made by other machines.
223          */
224
225         if (call_was_64bit) {
226             t.LowPart = outStatus.Length;
227             t.HighPart = outStatus.Length_hi;
228         } else {
229             t = ConvertLongToLargeInteger(outStatus.Length);
230         }
231
232         if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
233             _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_LENGTH);
234
235         cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp,
236                        CM_MERGEFLAG_STOREDATA | CM_MERGEFLAG_CACHE_BYPASS);
237     } else {
238         InterlockedDecrement(&scp->activeRPCs);
239         if (code == CM_ERROR_SPACE)
240             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OUTOFSPACE);
241         else if (code == CM_ERROR_QUOTA)
242             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OVERQUOTA);
243     }
244     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
245
246     if (bytesWritten)
247         *bytesWritten = written;
248
249     if (!scp_locked)
250         lock_ReleaseWrite(&scp->rw);
251
252     return code;
253 }
254
255 afs_int32
256 cm_DirectWrite( IN cm_scache_t *scp,
257                 IN osi_hyper_t *offsetp,
258                 IN afs_uint32   length,
259                 IN afs_uint32   flags,
260                 IN cm_user_t   *userp,
261                 IN cm_req_t    *reqp,
262                 IN void        *memoryRegionp,
263                 OUT afs_uint32 *bytesWritten)
264 {
265     rock_BkgDirectWrite_t *rockp = NULL;
266     int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
267     afs_int32 code;
268
269     if (!scp_locked)
270         lock_ObtainWrite(&scp->rw);
271
272     if (scp->flags & CM_SCACHEFLAG_DELETED) {
273         if (!scp_locked)
274             lock_ReleaseWrite(&scp->rw);
275         return CM_ERROR_BADFD;
276     }
277
278     rockp = malloc(sizeof(*rockp));
279     if (!rockp) {
280         if (!scp_locked)
281             lock_ReleaseWrite(&scp->rw);
282         return ENOMEM;
283     }
284
285     rockp->memoryRegion = malloc(length);
286     if (rockp->memoryRegion == NULL) {
287         if (!scp_locked)
288             lock_ReleaseWrite(&scp->rw);
289         free(rockp);
290         return ENOMEM;
291     }
292
293     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
294     code = cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
295     if (code) {
296         if (!scp_locked)
297             lock_ReleaseWrite(&scp->rw);
298         free(rockp->memoryRegion);
299         free(rockp);
300         return ENOMEM;
301     }
302
303     /* cannot hold scp->rw when calling cm_QueueBkGRequest. */
304     lock_ReleaseWrite(&scp->rw);
305     memcpy(rockp->memoryRegion, memoryRegionp, length);
306     rockp->offset = *offsetp;
307     rockp->length = length;
308     rockp->bypass_cache = TRUE;
309
310     cm_QueueBKGRequest(scp, cm_BkgDirectWrite, rockp, userp, reqp);
311
312     *bytesWritten = length;     /* must lie */
313     if (scp_locked)
314         lock_ObtainWrite(&scp->rw);
315
316     return code;
317 }
318
319 void
320 cm_BkgDirectWriteDone( cm_scache_t *scp, void *vrockp, afs_int32 code)
321 {
322     rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
323
324     lock_ObtainWrite(&scp->rw);
325     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
326     lock_ReleaseWrite(&scp->rw);
327     free(rockp->memoryRegion);
328     rockp->memoryRegion = NULL;
329 }
330
331 afs_int32
332 cm_BkgDirectWrite( cm_scache_t *scp, void *vrockp, struct cm_user *userp, cm_req_t *reqp)
333 {
334     rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
335     afs_uint32 flags = 0;
336     afs_uint32 bytesWritten;
337     afs_int32  code;
338
339     osi_assertx(rockp->memoryRegion, "memoryRegion is NULL");
340
341     code = int_DirectWrite(scp, &rockp->offset, rockp->length,
342                            flags, userp, reqp,
343                            rockp->memoryRegion, &bytesWritten);
344
345     switch ( code ) {
346     case CM_ERROR_TIMEDOUT:     /* or server restarting */
347     case CM_ERROR_RETRY:
348     case CM_ERROR_WOULDBLOCK:
349     case CM_ERROR_ALLBUSY:
350     case CM_ERROR_ALLDOWN:
351     case CM_ERROR_ALLOFFLINE:
352     case CM_ERROR_PARTIALWRITE:
353         /* do nothing; cm_BkgDaemon will retry the request */
354         break;
355     default:
356         lock_ObtainWrite(&scp->rw);
357         cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
358         lock_ReleaseWrite(&scp->rw);
359         free(rockp->memoryRegion);
360         rockp->memoryRegion = NULL;
361         break;
362     }
363     return code;
364 }
365
366 /*
367  * cm_SetupDirectStoreBIOD differs from cm_SetupStoreBIOD in that it
368  * doesn't worry about whether or not the cm_buf_t is dirty or not.  Nor
369  * does it concern itself with chunk size.  All of the cm_buf_t objects
370  * that overlap the requested range must be held.
371  *
372  * scp must be locked; temporarily unlocked during processing.
373  * If returns 0, returns buffers held in biop, and with
374  * CM_BUF_CMSTORING set.
375  *
376  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
377  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
378  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
379  * must be woken, and the event must be set when the I/O is done.  All of this
380  * is required so that buf_WaitIO synchronizes properly with the buffer as it
381  * is being written out.
382  *
383  * Not currently used but want to make sure the code does not rot.
384  */
385 afs_int32
386 cm_SetupDirectStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, afs_uint32 inSize,
387                         cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
388 {
389     cm_buf_t *bufp;
390     osi_queueData_t *qdp;
391     osi_hyper_t thyper;
392     osi_hyper_t tbase;
393     osi_hyper_t scanStart;      /* where to start scan for dirty pages */
394     osi_hyper_t scanEnd;        /* where to stop scan for dirty pages */
395     long code;
396     long flags;                 /* flags to cm_SyncOp */
397
398     /* clear things out */
399     biop->scp = scp;            /* do not hold; held by caller */
400     biop->userp = userp;        /* do not hold; held by caller */
401     biop->reqp = reqp;
402     biop->offset = *inOffsetp;
403     biop->length = 0;
404     biop->bufListp = NULL;
405     biop->bufListEndp = NULL;
406     biop->reserved = 0;
407
408     /*
409      * reserve enough buffers to cover the full range.
410      * drop the cm_scache.rw lock because buf_ReserveBuffers()
411      * can sleep if there is insufficient room.
412      */
413     lock_ReleaseWrite(&scp->rw);
414     biop->reserved = 1 + inSize / cm_data.buf_blockSize;
415     buf_ReserveBuffers(biop->reserved);
416
417     /*
418      * This pass is intended to ensure that a cm_buf_t object
419      * is allocated for each block of the direct store operation.
420      * No effort is going to be made to ensure that the blocks are
421      * populated with current data.  Blocks that are not current and
422      * are not fully overwritten by the direct store data will not
423      * be cached.
424      */
425
426     lock_ObtainWrite(&scp->bufCreateLock);
427
428     /*
429      * Compute the offset of the first buffer.
430      */
431     tbase = *inOffsetp;
432     tbase.LowPart -= tbase.LowPart % cm_data.buf_blockSize;
433
434     /*
435      * If the first buffer cannot be obtained, return an error
436      * immediately.  There is no clean up to be performed.
437      */
438     code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
439     if (code) {
440         lock_ReleaseRead(&scp->bufCreateLock);
441         buf_UnreserveBuffers(biop->reserved);
442         lock_ObtainWrite(&scp->rw);
443         return code;
444     }
445
446     /* get buffer mutex and scp mutex safely */
447     lock_ObtainMutex(&bufp->mx);
448
449     /*
450      * if the buffer is actively involved in I/O
451      * we wait for the I/O to complete.
452      */
453     if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
454         buf_WaitIO(scp, bufp);
455
456     lock_ObtainWrite(&scp->rw);
457     flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS |
458             CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
459     code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
460     if (code) {
461         lock_ReleaseMutex(&bufp->mx);
462         buf_Release(bufp);
463         buf_UnreserveBuffers(biop->reserved);
464         return code;
465     }
466     cm_SyncOpDone(scp, bufp, flags);
467     lock_ReleaseMutex(&bufp->mx);
468
469     /*
470      * Add the first buffer into the BIOD list.
471      */
472     qdp = osi_QDAlloc();
473     if (qdp == NULL) {
474         buf_Release(bufp);
475         buf_UnreserveBuffers(1 + inSize / cm_data.buf_blockSize);
476         return ENOMEM;
477     }
478     osi_SetQData(qdp, bufp);
479
480     if ( cm_verifyData )
481         buf_ComputeCheckSum(bufp);
482
483     /* don't have to hold bufp, since held by buf_Get above */
484     osi_QAddH((osi_queue_t **) &biop->bufListp,
485               (osi_queue_t **) &biop->bufListEndp,
486               &qdp->q);
487     biop->length = cm_data.buf_blockSize - (afs_uint32)(inOffsetp->QuadPart % cm_data.buf_blockSize);
488
489     if (biop->length < inSize) {
490         /* scan for the rest of the buffers */
491         thyper = ConvertLongToLargeInteger(biop->length);
492         scanStart = LargeIntegerAdd(bufp->offset, thyper);
493         thyper = ConvertLongToLargeInteger(inSize);
494         scanEnd = LargeIntegerAdd(*inOffsetp, thyper);
495
496         flags = CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
497         lock_ReleaseWrite(&scp->rw);
498
499         for ( tbase = scanStart, thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
500               LargeIntegerLessThan(tbase, scanEnd);
501               tbase = LargeIntegerAdd(tbase, thyper))
502         {
503             code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
504             if (code) {
505                 /* Must tear down biod */
506                 goto error;
507             }
508
509             lock_ObtainMutex(&bufp->mx);
510             /*
511             * if the buffer is actively involved in I/O
512             * we wait for the I/O to complete.
513             */
514             if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
515                 buf_WaitIO(scp, bufp);
516
517             lock_ObtainWrite(&scp->rw);
518             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
519             lock_ReleaseWrite(&scp->rw);
520             lock_ReleaseMutex(&bufp->mx);
521             if (code) {
522                 buf_Release(bufp);
523                 goto error;
524             }
525
526             /*
527              * Add the buffer into the BIOD list.
528              */
529             qdp = osi_QDAlloc();
530             if (qdp == NULL) {
531                 buf_Release(bufp);
532                 code = ENOMEM;
533                 goto error;
534             }
535             osi_SetQData(qdp, bufp);
536
537             if ( cm_verifyData )
538                 buf_ComputeCheckSum(bufp);
539
540             /* don't have to hold bufp, since held by buf_Get above */
541             osi_QAddH( (osi_queue_t **) &biop->bufListp,
542                        (osi_queue_t **) &biop->bufListEndp,
543                        &qdp->q);
544             biop->length += cm_data.buf_blockSize;
545             bufp = NULL;        /* this buffer and reference added to the queue */
546         }
547
548         /* update biod info describing the transfer */
549         if (biop->length > inSize)
550             biop->length = inSize;
551
552         lock_ObtainWrite(&scp->rw);
553     }
554
555     /* finally, we're done */
556     lock_ReleaseWrite(&scp->bufCreateLock);
557     return 0;
558
559   error:
560     lock_ReleaseWrite(&scp->bufCreateLock);
561     /* tear down biod and clear buffer reservation */
562     cm_ReleaseBIOD(biop, TRUE, code, FALSE);
563     lock_ObtainWrite(&scp->rw);
564     return code;
565 }