2 * Copyright (c) 2012 Your File System, Inc.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - Neither the name of Your File System, Inc nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission from
17 * Your File System, Inc.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
23 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 #include <afsconfig.h>
33 #include <afs/param.h>
48 * cm_DirectWrite is used to write the contents of one contiguous
49 * buffer to the file server. The input buffer must not be a
50 * cm_buf_t.data field. The data is written to the file server without
51 * locking any buffers. The cm_scache object is protected
52 * by cm_SyncOp( CM_SCACHESYNC_STOREDATA_EXCL) and the resulting
53 * AFSFetchStatus is merged.
57 int_DirectWrite( IN cm_scache_t *scp,
58 IN osi_hyper_t *offsetp,
63 IN void *memoryRegionp,
64 OUT afs_uint32 *bytesWritten)
68 AFSFetchStatus outStatus;
69 AFSStoreStatus inStatus;
72 struct rx_call *rxcallp;
73 struct rx_connection *rxconnp;
76 int require_64bit_ops = 0;
77 int call_was_64bit = 0;
78 int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
79 afs_uint32 written = 0;
81 osi_assertx(userp != NULL, "null cm_user_t");
83 memset(&volSync, 0, sizeof(volSync));
87 cm_AFSFidFromFid(&tfid, &scp->fid);
90 lock_ObtainWrite(&scp->rw);
92 /* prepare the output status for the store */
93 _InterlockedOr(&scp->mask, CM_SCACHEMASK_CLIENTMODTIME);
94 cm_StatusFromAttr(&inStatus, scp, NULL);
95 truncPos = scp->length;
96 if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
97 && LargeIntegerLessThan(scp->truncPos, truncPos)) {
98 truncPos = scp->truncPos;
99 _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_TRUNCPOS);
102 InterlockedIncrement(&scp->activeRPCs);
103 lock_ReleaseWrite(&scp->rw);
105 /* now we're ready to do the store operation */
107 code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
112 rxconnp = cm_GetRxConn(connp);
113 rxcallp = rx_NewCall(rxconnp);
114 rx_PutConnection(rxconnp);
116 if (SERVERHAS64BIT(connp)) {
119 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
120 scp, offsetp->HighPart, offsetp->LowPart, length);
121 osi_Log2(afsd_logp, "... truncPos 0x%x:%08x", truncPos.HighPart, truncPos.LowPart);
123 code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
128 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
130 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
134 if (require_64bit_ops) {
135 osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData. The operation requires large file support in the server.");
136 code = CM_ERROR_TOOBIG;
138 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
139 scp, offsetp->HighPart, offsetp->LowPart, length);
140 osi_Log1(afsd_logp, "... truncPos 0x%08x", truncPos.LowPart);
142 code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
143 offsetp->LowPart, length, truncPos.LowPart);
145 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
147 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
153 temp = rx_Write(rxcallp, memoryRegionp, length);
154 if (temp != length) {
155 osi_Log2(afsd_logp, "rx_Write failed %d != %d", temp, length);
156 code = (rx_Error(rxcallp) < 0) ? rx_Error(rxcallp) : RX_PROTOCOL_ERROR;
159 osi_Log1(afsd_logp, "rx_Write succeeded written %d", temp);
166 if (call_was_64bit) {
167 code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
169 osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
171 osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
173 code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
175 osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
177 osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
181 code1 = rx_EndCall(rxcallp, code);
183 if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
184 SET_SERVERHASNO64BIT(connp);
188 /* Prefer StoreData error over rx_EndCall error */
191 } while (cm_Analyze(connp, userp, reqp, &scp->fid, NULL, 1, &outStatus, &volSync, NULL, NULL, code));
193 code = cm_MapRPCError(code, reqp);
196 osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
198 osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
200 /* now, clean up our state */
201 lock_ObtainWrite(&scp->rw);
206 /* now, here's something a little tricky: in AFS 3, a dirty
207 * length can't be directly stored, instead, a dirty chunk is
208 * stored that sets the file's size (by writing and by using
209 * the truncate-first option in the store call).
211 * At this point, we've just finished a store, and so the trunc
212 * pos field is clean. If the file's size at the server is at
213 * least as big as we think it should be, then we turn off the
214 * length dirty bit, since all the other dirty buffers must
215 * precede this one in the file.
217 * The file's desired size shouldn't be smaller than what's
218 * stored at the server now, since we just did the trunc pos
221 * We have to turn off the length dirty bit as soon as we can,
222 * so that we see updates made by other machines.
225 if (call_was_64bit) {
226 t.LowPart = outStatus.Length;
227 t.HighPart = outStatus.Length_hi;
229 t = ConvertLongToLargeInteger(outStatus.Length);
232 if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
233 _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_LENGTH);
235 cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp,
236 CM_MERGEFLAG_STOREDATA | CM_MERGEFLAG_CACHE_BYPASS);
238 InterlockedDecrement(&scp->activeRPCs);
239 if (code == CM_ERROR_SPACE)
240 _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OUTOFSPACE);
241 else if (code == CM_ERROR_QUOTA)
242 _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OVERQUOTA);
244 cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
247 *bytesWritten = written;
250 lock_ReleaseWrite(&scp->rw);
256 cm_DirectWrite( IN cm_scache_t *scp,
257 IN osi_hyper_t *offsetp,
258 IN afs_uint32 length,
262 IN void *memoryRegionp,
263 OUT afs_uint32 *bytesWritten)
265 rock_BkgDirectWrite_t *rockp = NULL;
266 int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
270 lock_ObtainWrite(&scp->rw);
272 if (scp->flags & CM_SCACHEFLAG_DELETED) {
274 lock_ReleaseWrite(&scp->rw);
275 return CM_ERROR_BADFD;
278 rockp = malloc(sizeof(*rockp));
281 lock_ReleaseWrite(&scp->rw);
285 rockp->memoryRegion = malloc(length);
286 if (rockp->memoryRegion == NULL) {
288 lock_ReleaseWrite(&scp->rw);
293 /* Serialize StoreData RPC's; for rationale see cm_scache.c */
294 code = cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
297 lock_ReleaseWrite(&scp->rw);
298 free(rockp->memoryRegion);
303 /* cannot hold scp->rw when calling cm_QueueBkGRequest. */
304 lock_ReleaseWrite(&scp->rw);
305 memcpy(rockp->memoryRegion, memoryRegionp, length);
306 rockp->offset = *offsetp;
307 rockp->length = length;
308 rockp->bypass_cache = TRUE;
310 cm_QueueBKGRequest(scp, cm_BkgDirectWrite, rockp, userp, reqp);
312 *bytesWritten = length; /* must lie */
314 lock_ObtainWrite(&scp->rw);
320 cm_BkgDirectWriteDone( cm_scache_t *scp, void *vrockp, afs_int32 code)
322 rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
324 lock_ObtainWrite(&scp->rw);
325 cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
326 lock_ReleaseWrite(&scp->rw);
327 free(rockp->memoryRegion);
328 rockp->memoryRegion = NULL;
332 cm_BkgDirectWrite( cm_scache_t *scp, void *vrockp, struct cm_user *userp, cm_req_t *reqp)
334 rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
335 afs_uint32 flags = 0;
336 afs_uint32 bytesWritten;
339 osi_assertx(rockp->memoryRegion, "memoryRegion is NULL");
341 code = int_DirectWrite(scp, &rockp->offset, rockp->length,
343 rockp->memoryRegion, &bytesWritten);
346 case CM_ERROR_TIMEDOUT: /* or server restarting */
348 case CM_ERROR_WOULDBLOCK:
349 case CM_ERROR_ALLBUSY:
350 case CM_ERROR_ALLDOWN:
351 case CM_ERROR_ALLOFFLINE:
352 case CM_ERROR_PARTIALWRITE:
353 /* do nothing; cm_BkgDaemon will retry the request */
356 lock_ObtainWrite(&scp->rw);
357 cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
358 lock_ReleaseWrite(&scp->rw);
359 free(rockp->memoryRegion);
360 rockp->memoryRegion = NULL;
367 * cm_SetupDirectStoreBIOD differs from cm_SetupStoreBIOD in that it
368 * doesn't worry about whether or not the cm_buf_t is dirty or not. Nor
369 * does it concern itself with chunk size. All of the cm_buf_t objects
370 * that overlap the requested range must be held.
372 * scp must be locked; temporarily unlocked during processing.
373 * If returns 0, returns buffers held in biop, and with
374 * CM_BUF_CMSTORING set.
376 * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
377 * buffer is ever unlocked before CM_BUF_DIRTY is cleared. And if
378 * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
379 * must be woken, and the event must be set when the I/O is done. All of this
380 * is required so that buf_WaitIO synchronizes properly with the buffer as it
381 * is being written out.
383 * Not currently used but want to make sure the code does not rot.
386 cm_SetupDirectStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, afs_uint32 inSize,
387 cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
390 osi_queueData_t *qdp;
393 osi_hyper_t scanStart; /* where to start scan for dirty pages */
394 osi_hyper_t scanEnd; /* where to stop scan for dirty pages */
396 long flags; /* flags to cm_SyncOp */
398 /* clear things out */
399 biop->scp = scp; /* do not hold; held by caller */
400 biop->userp = userp; /* do not hold; held by caller */
402 biop->offset = *inOffsetp;
404 biop->bufListp = NULL;
405 biop->bufListEndp = NULL;
409 * reserve enough buffers to cover the full range.
410 * drop the cm_scache.rw lock because buf_ReserveBuffers()
411 * can sleep if there is insufficient room.
413 lock_ReleaseWrite(&scp->rw);
414 biop->reserved = 1 + inSize / cm_data.buf_blockSize;
415 buf_ReserveBuffers(biop->reserved);
418 * This pass is intended to ensure that a cm_buf_t object
419 * is allocated for each block of the direct store operation.
420 * No effort is going to be made to ensure that the blocks are
421 * populated with current data. Blocks that are not current and
422 * are not fully overwritten by the direct store data will not
426 lock_ObtainWrite(&scp->bufCreateLock);
429 * Compute the offset of the first buffer.
432 tbase.LowPart -= tbase.LowPart % cm_data.buf_blockSize;
435 * If the first buffer cannot be obtained, return an error
436 * immediately. There is no clean up to be performed.
438 code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
440 lock_ReleaseRead(&scp->bufCreateLock);
441 buf_UnreserveBuffers(biop->reserved);
442 lock_ObtainWrite(&scp->rw);
446 /* get buffer mutex and scp mutex safely */
447 lock_ObtainMutex(&bufp->mx);
450 * if the buffer is actively involved in I/O
451 * we wait for the I/O to complete.
453 if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
454 buf_WaitIO(scp, bufp);
456 lock_ObtainWrite(&scp->rw);
457 flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS |
458 CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
459 code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
461 lock_ReleaseMutex(&bufp->mx);
463 buf_UnreserveBuffers(biop->reserved);
466 cm_SyncOpDone(scp, bufp, flags);
467 lock_ReleaseMutex(&bufp->mx);
470 * Add the first buffer into the BIOD list.
475 buf_UnreserveBuffers(1 + inSize / cm_data.buf_blockSize);
478 osi_SetQData(qdp, bufp);
481 buf_ComputeCheckSum(bufp);
483 /* don't have to hold bufp, since held by buf_Get above */
484 osi_QAddH((osi_queue_t **) &biop->bufListp,
485 (osi_queue_t **) &biop->bufListEndp,
487 biop->length = cm_data.buf_blockSize - (afs_uint32)(inOffsetp->QuadPart % cm_data.buf_blockSize);
489 if (biop->length < inSize) {
490 /* scan for the rest of the buffers */
491 thyper = ConvertLongToLargeInteger(biop->length);
492 scanStart = LargeIntegerAdd(bufp->offset, thyper);
493 thyper = ConvertLongToLargeInteger(inSize);
494 scanEnd = LargeIntegerAdd(*inOffsetp, thyper);
496 flags = CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
497 lock_ReleaseWrite(&scp->rw);
499 for ( tbase = scanStart, thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
500 LargeIntegerLessThan(tbase, scanEnd);
501 tbase = LargeIntegerAdd(tbase, thyper))
503 code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
505 /* Must tear down biod */
509 lock_ObtainMutex(&bufp->mx);
511 * if the buffer is actively involved in I/O
512 * we wait for the I/O to complete.
514 if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
515 buf_WaitIO(scp, bufp);
517 lock_ObtainWrite(&scp->rw);
518 code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
519 lock_ReleaseWrite(&scp->rw);
520 lock_ReleaseMutex(&bufp->mx);
527 * Add the buffer into the BIOD list.
535 osi_SetQData(qdp, bufp);
538 buf_ComputeCheckSum(bufp);
540 /* don't have to hold bufp, since held by buf_Get above */
541 osi_QAddH( (osi_queue_t **) &biop->bufListp,
542 (osi_queue_t **) &biop->bufListEndp,
544 biop->length += cm_data.buf_blockSize;
545 bufp = NULL; /* this buffer and reference added to the queue */
548 /* update biod info describing the transfer */
549 if (biop->length > inSize)
550 biop->length = inSize;
552 lock_ObtainWrite(&scp->rw);
555 /* finally, we're done */
556 lock_ReleaseWrite(&scp->bufCreateLock);
560 lock_ReleaseWrite(&scp->bufCreateLock);
561 /* tear down biod and clear buffer reservation */
562 cm_ReleaseBIOD(biop, TRUE, code, FALSE);
563 lock_ObtainWrite(&scp->rw);