Windows: Direct IO Support for Service
[openafs.git] / src / WINNT / afsd / cm_direct.c
1 /*
2  * Copyright (c) 2012 Your File System, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright
12  *   notice, this list of conditions and the following disclaimer in the
13  *   documentation and/or other materials provided with the distribution.
14  * - Neither the name of Your File System, Inc nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission from
17  *   Your File System, Inc.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
23  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  */
31
32 #include <afsconfig.h>
33 #include <afs/param.h>
34 #include <roken.h>
35
36 #include <afs/stds.h>
37
38 #include <windows.h>
39 #include <winsock2.h>
40 #include <nb30.h>
41 #include <string.h>
42 #include <stdlib.h>
43 #include <osi.h>
44
45 #include "afsd.h"
46
47 /*
48  * cm_DirectWrite is used to write the contents of one contiguous
49  * buffer to the file server.  The input buffer must not be a
50  * cm_buf_t.data field.  The data is written to the file server without
51  * locking any buffers.  The cm_scache object is protected
52  * by cm_SyncOp( CM_SCACHESYNC_STOREDATA_EXCL) and the resulting
53  * AFSFetchStatus is merged.
54  */
55
56 static afs_int32
57 int_DirectWrite( IN cm_scache_t *scp,
58                  IN cm_bulkIO_t *biodp,
59                  IN osi_hyper_t *offsetp,
60                  IN afs_uint32   length,
61                  IN afs_uint32   flags,
62                  IN cm_user_t   *userp,
63                  IN cm_req_t    *reqp,
64                  IN void        *memoryRegionp,
65                  OUT afs_uint32 *bytesWritten)
66 {
67     long code, code1;
68     long temp;
69     AFSFetchStatus outStatus;
70     AFSStoreStatus inStatus;
71     AFSVolSync volSync;
72     AFSFid tfid;
73     struct rx_call *rxcallp;
74     struct rx_connection *rxconnp;
75     cm_conn_t *connp;
76     osi_hyper_t truncPos;
77     int require_64bit_ops = 0;
78     int call_was_64bit = 0;
79     int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
80     afs_uint32 written = 0;
81
82     osi_assertx(userp != NULL, "null cm_user_t");
83     osi_assertx(biodp != NULL, "null cm_bulkIO_t");
84     osi_assertx(biodp->scp == scp, "cm_bulkIO_t.scp != scp");
85
86     memset(&volSync, 0, sizeof(volSync));
87     if (bytesWritten)
88         *bytesWritten = 0;
89
90     cm_AFSFidFromFid(&tfid, &scp->fid);
91
92     if (!scp_locked)
93         lock_ObtainWrite(&scp->rw);
94
95     /* prepare the output status for the store */
96     _InterlockedOr(&scp->mask, CM_SCACHEMASK_CLIENTMODTIME);
97     cm_StatusFromAttr(&inStatus, scp, NULL);
98     truncPos = scp->length;
99     if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
100          && LargeIntegerLessThan(scp->truncPos, truncPos)) {
101         truncPos = scp->truncPos;
102         _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_TRUNCPOS);
103     }
104
105     InterlockedIncrement(&scp->activeRPCs);
106     lock_ReleaseWrite(&scp->rw);
107
108     /* now we're ready to do the store operation */
109     do {
110         code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
111         if (code)
112             continue;
113
114     retry:
115         rxconnp = cm_GetRxConn(connp);
116         rxcallp = rx_NewCall(rxconnp);
117         rx_PutConnection(rxconnp);
118
119         if (SERVERHAS64BIT(connp)) {
120             call_was_64bit = 1;
121
122             osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
123                      scp, offsetp->HighPart, offsetp->LowPart, length);
124             osi_Log2(afsd_logp, "... truncPos 0x%x:%08x",  truncPos.HighPart, truncPos.LowPart);
125
126             code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
127                                           offsetp->QuadPart,
128                                           length,
129                                           truncPos.QuadPart);
130             if (code)
131                 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
132             else
133                 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
134         } else {
135             call_was_64bit = 0;
136
137             if (require_64bit_ops) {
138                 osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData.  The operation requires large file support in the server.");
139                 code = CM_ERROR_TOOBIG;
140             } else {
141                 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
142                          scp, offsetp->HighPart, offsetp->LowPart, length);
143                 osi_Log1(afsd_logp, "... truncPos 0x%08x",  truncPos.LowPart);
144
145                 code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
146                                             offsetp->LowPart, length, truncPos.LowPart);
147                 if (code)
148                     osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
149                 else
150                     osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
151             }
152         }
153
154         /* Write the data */
155         if (code == 0) {
156             temp = rx_Write(rxcallp, memoryRegionp, length);
157             if (temp != length) {
158                 osi_Log2(afsd_logp, "rx_Write failed %d != %d", temp, length);
159                 code = (rx_Error(rxcallp) < 0) ? rx_Error(rxcallp) : RX_PROTOCOL_ERROR;
160                 break;
161             } else {
162                 osi_Log1(afsd_logp, "rx_Write succeeded written %d", temp);
163                 written += temp;
164             }
165         }
166
167         /* End the call */
168         if (code == 0) {
169             if (call_was_64bit) {
170                 code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
171                 if (code)
172                     osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
173                 else
174                     osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
175             } else {
176                 code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
177                 if (code)
178                     osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
179                 else
180                     osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
181             }
182         }
183
184         code1 = rx_EndCall(rxcallp, code);
185
186         if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
187             SET_SERVERHASNO64BIT(connp);
188             goto retry;
189         }
190
191         /* Prefer StoreData error over rx_EndCall error */
192         if (code1 != 0)
193             code = code1;
194     } while (cm_Analyze(connp, userp, reqp, &scp->fid, NULL, 1, &outStatus, &volSync, NULL, NULL, code));
195
196     code = cm_MapRPCError(code, reqp);
197
198     if (code)
199         osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
200     else
201         osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
202
203     /* now, clean up our state */
204     lock_ObtainWrite(&scp->rw);
205
206     if (code == 0) {
207         osi_hyper_t t;
208
209         /* now, here's something a little tricky: in AFS 3, a dirty
210          * length can't be directly stored, instead, a dirty chunk is
211          * stored that sets the file's size (by writing and by using
212          * the truncate-first option in the store call).
213          *
214          * At this point, we've just finished a store, and so the trunc
215          * pos field is clean.  If the file's size at the server is at
216          * least as big as we think it should be, then we turn off the
217          * length dirty bit, since all the other dirty buffers must
218          * precede this one in the file.
219          *
220          * The file's desired size shouldn't be smaller than what's
221          * stored at the server now, since we just did the trunc pos
222          * store.
223          *
224          * We have to turn off the length dirty bit as soon as we can,
225          * so that we see updates made by other machines.
226          */
227
228         if (call_was_64bit) {
229             t.LowPart = outStatus.Length;
230             t.HighPart = outStatus.Length_hi;
231         } else {
232             t = ConvertLongToLargeInteger(outStatus.Length);
233         }
234
235         if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
236             _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_LENGTH);
237
238         cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp,
239                        CM_MERGEFLAG_STOREDATA | CM_MERGEFLAG_CACHE_BYPASS);
240     } else {
241         InterlockedDecrement(&scp->activeRPCs);
242         if (code == CM_ERROR_SPACE)
243             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OUTOFSPACE);
244         else if (code == CM_ERROR_QUOTA)
245             _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OVERQUOTA);
246     }
247     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
248
249     if (bytesWritten)
250         *bytesWritten = written;
251
252     if (!scp_locked)
253         lock_ReleaseWrite(&scp->rw);
254
255     return code;
256 }
257
258 afs_int32
259 cm_DirectWrite( IN cm_scache_t *scp,
260                 IN osi_hyper_t *offsetp,
261                 IN afs_uint32   length,
262                 IN afs_uint32   flags,
263                 IN cm_user_t   *userp,
264                 IN cm_req_t    *reqp,
265                 IN void        *memoryRegionp,
266                 OUT afs_uint32 *bytesWritten)
267 {
268     rock_BkgDirectWrite_t *rockp = NULL;
269     int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
270     afs_int32 code;
271
272     if (!scp_locked)
273         lock_ObtainWrite(&scp->rw);
274
275     if (scp->flags & CM_SCACHEFLAG_DELETED) {
276         if (!scp_locked)
277             lock_ReleaseWrite(&scp->rw);
278         return CM_ERROR_BADFD;
279     }
280
281     rockp = malloc(sizeof(*rockp));
282     if (!rockp) {
283         if (!scp_locked)
284             lock_ReleaseWrite(&scp->rw);
285         return ENOMEM;
286     }
287
288     rockp->memoryRegion = malloc(length);
289     if (rockp->memoryRegion == NULL) {
290         if (!scp_locked)
291             lock_ReleaseWrite(&scp->rw);
292         free(rockp);
293         return ENOMEM;
294     }
295
296     /* Serialize StoreData RPC's; for rationale see cm_scache.c */
297     code = cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
298     if (code) {
299         if (!scp_locked)
300             lock_ReleaseWrite(&scp->rw);
301         free(rockp->memoryRegion);
302         free(rockp);
303         return ENOMEM;
304     }
305
306     /* cannot hold scp->rw when calling cm_QueueBkGRequest. */
307     lock_ReleaseWrite(&scp->rw);
308     memcpy(rockp->memoryRegion, memoryRegionp, length);
309     rockp->offset = *offsetp;
310     rockp->length = length;
311     rockp->bypass_cache = TRUE;
312
313     cm_QueueBKGRequest(scp, cm_BkgDirectWrite, rockp, userp, reqp);
314
315     *bytesWritten = length;     /* must lie */
316     if (scp_locked)
317         lock_ObtainWrite(&scp->rw);
318
319     return code;
320 }
321
322 void
323 cm_BkgDirectWriteDone( cm_scache_t *scp, void *vrockp, afs_int32 code)
324 {
325     rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
326
327     lock_ObtainWrite(&scp->rw);
328     cm_ReleaseBIOD(&rockp->biod, 1, code, 1);
329     cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
330     lock_ReleaseWrite(&scp->rw);
331     free(rockp->memoryRegion);
332     rockp->memoryRegion = NULL;
333 }
334
335 afs_int32
336 cm_BkgDirectWrite( cm_scache_t *scp, void *vrockp, struct cm_user *userp, cm_req_t *reqp)
337 {
338     rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
339     afs_uint32 flags = 0;
340     afs_uint32 bytesWritten;
341     afs_int32  code;
342
343     /*
344      * Must fixup biod->reqp value since we are no longer running with the
345      * same stack as when the BIOD was created.
346      */
347     rockp->biod.reqp = reqp;
348
349     osi_assertx(rockp->memoryRegion, "memoryRegion is NULL");
350
351     code = int_DirectWrite(scp, &rockp->biod, &rockp->offset, rockp->length,
352                            flags, userp, reqp,
353                            rockp->memoryRegion, &bytesWritten);
354
355     switch ( code ) {
356     case CM_ERROR_TIMEDOUT:     /* or server restarting */
357     case CM_ERROR_RETRY:
358     case CM_ERROR_WOULDBLOCK:
359     case CM_ERROR_ALLBUSY:
360     case CM_ERROR_ALLDOWN:
361     case CM_ERROR_ALLOFFLINE:
362     case CM_ERROR_PARTIALWRITE:
363         /* do nothing; cm_BkgDaemon will retry the request */
364         break;
365     default:
366         lock_ObtainWrite(&scp->rw);
367         cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
368         lock_ReleaseWrite(&scp->rw);
369         free(rockp->memoryRegion);
370         rockp->memoryRegion = NULL;
371         break;
372     }
373     return code;
374 }
375
376
377 /*
378  * cm_SetupDirectStoreBIOD differs from cm_SetupStoreBIOD in that it
379  * doesn't worry about whether or not the cm_buf_t is dirty or not.  Nor
380  * does it concern itself with chunk size.  All of the cm_buf_t objects
381  * that overlap the requested range must be held.
382  *
383  * scp must be locked; temporarily unlocked during processing.
384  * If returns 0, returns buffers held in biop, and with
385  * CM_BUF_CMSTORING set.
386  *
387  * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
388  * buffer is ever unlocked before CM_BUF_DIRTY is cleared.  And if
389  * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
390  * must be woken, and the event must be set when the I/O is done.  All of this
391  * is required so that buf_WaitIO synchronizes properly with the buffer as it
392  * is being written out.
393  *
394  */
395 afs_int32
396 cm_SetupDirectStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, afs_uint32 inSize,
397                         cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
398 {
399     cm_buf_t *bufp;
400     osi_queueData_t *qdp;
401     osi_hyper_t thyper;
402     osi_hyper_t tbase;
403     osi_hyper_t scanStart;      /* where to start scan for dirty pages */
404     osi_hyper_t scanEnd;        /* where to stop scan for dirty pages */
405     long code;
406     long flags;                 /* flags to cm_SyncOp */
407
408     /* clear things out */
409     biop->scp = scp;            /* do not hold; held by caller */
410     biop->userp = userp;        /* do not hold; held by caller */
411     biop->reqp = reqp;
412     biop->offset = *inOffsetp;
413     biop->length = 0;
414     biop->bufListp = NULL;
415     biop->bufListEndp = NULL;
416     biop->reserved = 0;
417
418     /*
419      * reserve enough buffers to cover the full range.
420      * drop the cm_scache.rw lock because buf_ReserveBuffers()
421      * can sleep if there is insufficient room.
422      */
423     lock_ReleaseWrite(&scp->rw);
424     biop->reserved = 1 + inSize / cm_data.buf_blockSize;
425     buf_ReserveBuffers(biop->reserved);
426
427     /*
428      * This pass is intended to ensure that a cm_buf_t object
429      * is allocated for each block of the direct store operation.
430      * No effort is going to be made to ensure that the blocks are
431      * populated with current data.  Blocks that are not current and
432      * are not fully overwritten by the direct store data will not
433      * be cached.
434      */
435
436     lock_ObtainWrite(&scp->bufCreateLock);
437
438     /*
439      * Compute the offset of the first buffer.
440      */
441     tbase = *inOffsetp;
442     tbase.LowPart -= tbase.LowPart % cm_data.buf_blockSize;
443
444     /*
445      * If the first buffer cannot be obtained, return an error
446      * immediately.  There is no clean up to be performed.
447      */
448     code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
449     if (code) {
450         lock_ReleaseRead(&scp->bufCreateLock);
451         buf_UnreserveBuffers(biop->reserved);
452         lock_ObtainWrite(&scp->rw);
453         return code;
454     }
455
456     /* get buffer mutex and scp mutex safely */
457     lock_ObtainMutex(&bufp->mx);
458
459     /*
460      * if the buffer is actively involved in I/O
461      * we wait for the I/O to complete.
462      */
463     if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
464         buf_WaitIO(scp, bufp);
465
466     lock_ObtainWrite(&scp->rw);
467     flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS |
468             CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
469     code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
470     if (code) {
471         lock_ReleaseMutex(&bufp->mx);
472         buf_Release(bufp);
473         buf_UnreserveBuffers(biop->reserved);
474         return code;
475     }
476     cm_SyncOpDone(scp, bufp, flags);
477     lock_ReleaseMutex(&bufp->mx);
478
479     /*
480      * Add the first buffer into the BIOD list.
481      */
482     qdp = osi_QDAlloc();
483     if (qdp == NULL) {
484         buf_Release(bufp);
485         buf_UnreserveBuffers(1 + inSize / cm_data.buf_blockSize);
486         return ENOMEM;
487     }
488     osi_SetQData(qdp, bufp);
489
490     if ( cm_verifyData )
491         buf_ComputeCheckSum(bufp);
492
493     /* don't have to hold bufp, since held by buf_Get above */
494     osi_QAddH((osi_queue_t **) &biop->bufListp,
495               (osi_queue_t **) &biop->bufListEndp,
496               &qdp->q);
497     biop->length = cm_data.buf_blockSize - (afs_uint32)(inOffsetp->QuadPart % cm_data.buf_blockSize);
498
499     if (biop->length < inSize) {
500         /* scan for the rest of the buffers */
501         thyper = ConvertLongToLargeInteger(biop->length);
502         scanStart = LargeIntegerAdd(bufp->offset, thyper);
503         thyper = ConvertLongToLargeInteger(inSize);
504         scanEnd = LargeIntegerAdd(*inOffsetp, thyper);
505
506         flags = CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
507         lock_ReleaseWrite(&scp->rw);
508
509         for ( tbase = scanStart, thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
510               LargeIntegerLessThan(tbase, scanEnd);
511               tbase = LargeIntegerAdd(tbase, thyper))
512         {
513             code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
514             if (code) {
515                 /* Must tear down biod */
516                 goto error;
517             }
518
519             lock_ObtainMutex(&bufp->mx);
520             /*
521             * if the buffer is actively involved in I/O
522             * we wait for the I/O to complete.
523             */
524             if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
525                 buf_WaitIO(scp, bufp);
526
527             lock_ObtainWrite(&scp->rw);
528             code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
529             lock_ReleaseWrite(&scp->rw);
530             lock_ReleaseMutex(&bufp->mx);
531             if (code) {
532                 buf_Release(bufp);
533                 goto error;
534             }
535
536             /*
537              * Add the buffer into the BIOD list.
538              */
539             qdp = osi_QDAlloc();
540             if (qdp == NULL) {
541                 buf_Release(bufp);
542                 code = ENOMEM;
543                 goto error;
544             }
545             osi_SetQData(qdp, bufp);
546
547             if ( cm_verifyData )
548                 buf_ComputeCheckSum(bufp);
549
550             /* don't have to hold bufp, since held by buf_Get above */
551             osi_QAddH( (osi_queue_t **) &biop->bufListp,
552                        (osi_queue_t **) &biop->bufListEndp,
553                        &qdp->q);
554             biop->length += cm_data.buf_blockSize;
555             bufp = NULL;        /* this buffer and reference added to the queue */
556         }
557
558         /* update biod info describing the transfer */
559         if (biop->length > inSize)
560             biop->length = inSize;
561
562         lock_ObtainWrite(&scp->rw);
563     }
564
565     /* finally, we're done */
566     lock_ReleaseWrite(&scp->bufCreateLock);
567     return 0;
568
569   error:
570     lock_ReleaseWrite(&scp->bufCreateLock);
571     /* tear down biod and clear buffer reservation */
572     cm_ReleaseBIOD(biop, TRUE, code, FALSE);
573     lock_ObtainWrite(&scp->rw);
574     return code;
575 }