2 * Copyright (c) 2012 Your File System, Inc.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - Neither the name of Your File System, Inc nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission from
17 * Your File System, Inc.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
23 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 #include <afsconfig.h>
33 #include <afs/param.h>
48 * cm_DirectWrite is used to write the contents of one contiguous
49 * buffer to the file server. The input buffer must not be a
50 * cm_buf_t.data field. The data is written to the file server without
51 * locking any buffers. The cm_scache object is protected
52 * by cm_SyncOp( CM_SCACHESYNC_STOREDATA_EXCL) and the resulting
53 * AFSFetchStatus is merged.
57 int_DirectWrite( IN cm_scache_t *scp,
58 IN cm_bulkIO_t *biodp,
59 IN osi_hyper_t *offsetp,
64 IN void *memoryRegionp,
65 OUT afs_uint32 *bytesWritten)
69 AFSFetchStatus outStatus;
70 AFSStoreStatus inStatus;
73 struct rx_call *rxcallp;
74 struct rx_connection *rxconnp;
77 int require_64bit_ops = 0;
78 int call_was_64bit = 0;
79 int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
80 afs_uint32 written = 0;
82 osi_assertx(userp != NULL, "null cm_user_t");
83 osi_assertx(biodp != NULL, "null cm_bulkIO_t");
84 osi_assertx(biodp->scp == scp, "cm_bulkIO_t.scp != scp");
86 memset(&volSync, 0, sizeof(volSync));
90 cm_AFSFidFromFid(&tfid, &scp->fid);
93 lock_ObtainWrite(&scp->rw);
95 /* prepare the output status for the store */
96 _InterlockedOr(&scp->mask, CM_SCACHEMASK_CLIENTMODTIME);
97 cm_StatusFromAttr(&inStatus, scp, NULL);
98 truncPos = scp->length;
99 if ((scp->mask & CM_SCACHEMASK_TRUNCPOS)
100 && LargeIntegerLessThan(scp->truncPos, truncPos)) {
101 truncPos = scp->truncPos;
102 _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_TRUNCPOS);
105 InterlockedIncrement(&scp->activeRPCs);
106 lock_ReleaseWrite(&scp->rw);
108 /* now we're ready to do the store operation */
110 code = cm_ConnFromFID(&scp->fid, userp, reqp, &connp);
115 rxconnp = cm_GetRxConn(connp);
116 rxcallp = rx_NewCall(rxconnp);
117 rx_PutConnection(rxconnp);
119 if (SERVERHAS64BIT(connp)) {
122 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
123 scp, offsetp->HighPart, offsetp->LowPart, length);
124 osi_Log2(afsd_logp, "... truncPos 0x%x:%08x", truncPos.HighPart, truncPos.LowPart);
126 code = StartRXAFS_StoreData64(rxcallp, &tfid, &inStatus,
131 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
133 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
137 if (require_64bit_ops) {
138 osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData. The operation requires large file support in the server.");
139 code = CM_ERROR_TOOBIG;
141 osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
142 scp, offsetp->HighPart, offsetp->LowPart, length);
143 osi_Log1(afsd_logp, "... truncPos 0x%08x", truncPos.LowPart);
145 code = StartRXAFS_StoreData(rxcallp, &tfid, &inStatus,
146 offsetp->LowPart, length, truncPos.LowPart);
148 osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
150 osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
156 temp = rx_Write(rxcallp, memoryRegionp, length);
157 if (temp != length) {
158 osi_Log2(afsd_logp, "rx_Write failed %d != %d", temp, length);
159 code = (rx_Error(rxcallp) < 0) ? rx_Error(rxcallp) : RX_PROTOCOL_ERROR;
162 osi_Log1(afsd_logp, "rx_Write succeeded written %d", temp);
169 if (call_was_64bit) {
170 code = EndRXAFS_StoreData64(rxcallp, &outStatus, &volSync);
172 osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
174 osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
176 code = EndRXAFS_StoreData(rxcallp, &outStatus, &volSync);
178 osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
180 osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
184 code1 = rx_EndCall(rxcallp, code);
186 if ((code == RXGEN_OPCODE || code1 == RXGEN_OPCODE) && SERVERHAS64BIT(connp)) {
187 SET_SERVERHASNO64BIT(connp);
191 /* Prefer StoreData error over rx_EndCall error */
194 } while (cm_Analyze(connp, userp, reqp, &scp->fid, NULL, 1, &outStatus, &volSync, NULL, NULL, code));
196 code = cm_MapRPCError(code, reqp);
199 osi_Log2(afsd_logp, "CALL StoreData FAILURE scp 0x%p, code 0x%x", scp, code);
201 osi_Log1(afsd_logp, "CALL StoreData SUCCESS scp 0x%p", scp);
203 /* now, clean up our state */
204 lock_ObtainWrite(&scp->rw);
209 /* now, here's something a little tricky: in AFS 3, a dirty
210 * length can't be directly stored, instead, a dirty chunk is
211 * stored that sets the file's size (by writing and by using
212 * the truncate-first option in the store call).
214 * At this point, we've just finished a store, and so the trunc
215 * pos field is clean. If the file's size at the server is at
216 * least as big as we think it should be, then we turn off the
217 * length dirty bit, since all the other dirty buffers must
218 * precede this one in the file.
220 * The file's desired size shouldn't be smaller than what's
221 * stored at the server now, since we just did the trunc pos
224 * We have to turn off the length dirty bit as soon as we can,
225 * so that we see updates made by other machines.
228 if (call_was_64bit) {
229 t.LowPart = outStatus.Length;
230 t.HighPart = outStatus.Length_hi;
232 t = ConvertLongToLargeInteger(outStatus.Length);
235 if (LargeIntegerGreaterThanOrEqualTo(t, scp->length))
236 _InterlockedAnd(&scp->mask, ~CM_SCACHEMASK_LENGTH);
238 cm_MergeStatus(NULL, scp, &outStatus, &volSync, userp, reqp,
239 CM_MERGEFLAG_STOREDATA | CM_MERGEFLAG_CACHE_BYPASS);
241 InterlockedDecrement(&scp->activeRPCs);
242 if (code == CM_ERROR_SPACE)
243 _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OUTOFSPACE);
244 else if (code == CM_ERROR_QUOTA)
245 _InterlockedOr(&scp->flags, CM_SCACHEFLAG_OVERQUOTA);
247 cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL);
250 *bytesWritten = written;
253 lock_ReleaseWrite(&scp->rw);
259 cm_DirectWrite( IN cm_scache_t *scp,
260 IN osi_hyper_t *offsetp,
261 IN afs_uint32 length,
265 IN void *memoryRegionp,
266 OUT afs_uint32 *bytesWritten)
268 rock_BkgDirectWrite_t *rockp = NULL;
269 int scp_locked = !!(flags & CM_DIRECT_SCP_LOCKED);
273 lock_ObtainWrite(&scp->rw);
275 if (scp->flags & CM_SCACHEFLAG_DELETED) {
277 lock_ReleaseWrite(&scp->rw);
278 return CM_ERROR_BADFD;
281 rockp = malloc(sizeof(*rockp));
284 lock_ReleaseWrite(&scp->rw);
288 rockp->memoryRegion = malloc(length);
289 if (rockp->memoryRegion == NULL) {
291 lock_ReleaseWrite(&scp->rw);
296 /* Serialize StoreData RPC's; for rationale see cm_scache.c */
297 code = cm_SyncOp(scp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
300 lock_ReleaseWrite(&scp->rw);
301 free(rockp->memoryRegion);
306 /* cannot hold scp->rw when calling cm_QueueBkGRequest. */
307 lock_ReleaseWrite(&scp->rw);
308 memcpy(rockp->memoryRegion, memoryRegionp, length);
309 rockp->offset = *offsetp;
310 rockp->length = length;
311 rockp->bypass_cache = TRUE;
313 cm_QueueBKGRequest(scp, cm_BkgDirectWrite, rockp, userp, reqp);
315 *bytesWritten = length; /* must lie */
317 lock_ObtainWrite(&scp->rw);
323 cm_BkgDirectWriteDone( cm_scache_t *scp, void *vrockp, afs_int32 code)
325 rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
327 lock_ObtainWrite(&scp->rw);
328 cm_ReleaseBIOD(&rockp->biod, 1, code, 1);
329 cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_STOREDATA_EXCL | CM_SCACHESYNC_ASYNCSTORE);
330 lock_ReleaseWrite(&scp->rw);
331 free(rockp->memoryRegion);
332 rockp->memoryRegion = NULL;
336 cm_BkgDirectWrite( cm_scache_t *scp, void *vrockp, struct cm_user *userp, cm_req_t *reqp)
338 rock_BkgDirectWrite_t *rockp = ((rock_BkgDirectWrite_t *)vrockp);
339 afs_uint32 flags = 0;
340 afs_uint32 bytesWritten;
344 * Must fixup biod->reqp value since we are no longer running with the
345 * same stack as when the BIOD was created.
347 rockp->biod.reqp = reqp;
349 osi_assertx(rockp->memoryRegion, "memoryRegion is NULL");
351 code = int_DirectWrite(scp, &rockp->biod, &rockp->offset, rockp->length,
353 rockp->memoryRegion, &bytesWritten);
356 case CM_ERROR_TIMEDOUT: /* or server restarting */
358 case CM_ERROR_WOULDBLOCK:
359 case CM_ERROR_ALLBUSY:
360 case CM_ERROR_ALLDOWN:
361 case CM_ERROR_ALLOFFLINE:
362 case CM_ERROR_PARTIALWRITE:
363 /* do nothing; cm_BkgDaemon will retry the request */
366 lock_ObtainWrite(&scp->rw);
367 cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
368 lock_ReleaseWrite(&scp->rw);
369 free(rockp->memoryRegion);
370 rockp->memoryRegion = NULL;
378 * cm_SetupDirectStoreBIOD differs from cm_SetupStoreBIOD in that it
379 * doesn't worry about whether or not the cm_buf_t is dirty or not. Nor
380 * does it concern itself with chunk size. All of the cm_buf_t objects
381 * that overlap the requested range must be held.
383 * scp must be locked; temporarily unlocked during processing.
384 * If returns 0, returns buffers held in biop, and with
385 * CM_BUF_CMSTORING set.
387 * Caller *must* set CM_BUF_WRITING and reset the over.hEvent field if the
388 * buffer is ever unlocked before CM_BUF_DIRTY is cleared. And if
389 * CM_BUF_WRITING is ever viewed by anyone, then it must be cleared, sleepers
390 * must be woken, and the event must be set when the I/O is done. All of this
391 * is required so that buf_WaitIO synchronizes properly with the buffer as it
392 * is being written out.
396 cm_SetupDirectStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp, afs_uint32 inSize,
397 cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp)
400 osi_queueData_t *qdp;
403 osi_hyper_t scanStart; /* where to start scan for dirty pages */
404 osi_hyper_t scanEnd; /* where to stop scan for dirty pages */
406 long flags; /* flags to cm_SyncOp */
408 /* clear things out */
409 biop->scp = scp; /* do not hold; held by caller */
410 biop->userp = userp; /* do not hold; held by caller */
412 biop->offset = *inOffsetp;
414 biop->bufListp = NULL;
415 biop->bufListEndp = NULL;
419 * reserve enough buffers to cover the full range.
420 * drop the cm_scache.rw lock because buf_ReserveBuffers()
421 * can sleep if there is insufficient room.
423 lock_ReleaseWrite(&scp->rw);
424 biop->reserved = 1 + inSize / cm_data.buf_blockSize;
425 buf_ReserveBuffers(biop->reserved);
428 * This pass is intended to ensure that a cm_buf_t object
429 * is allocated for each block of the direct store operation.
430 * No effort is going to be made to ensure that the blocks are
431 * populated with current data. Blocks that are not current and
432 * are not fully overwritten by the direct store data will not
436 lock_ObtainWrite(&scp->bufCreateLock);
439 * Compute the offset of the first buffer.
442 tbase.LowPart -= tbase.LowPart % cm_data.buf_blockSize;
445 * If the first buffer cannot be obtained, return an error
446 * immediately. There is no clean up to be performed.
448 code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
450 lock_ReleaseRead(&scp->bufCreateLock);
451 buf_UnreserveBuffers(biop->reserved);
452 lock_ObtainWrite(&scp->rw);
456 /* get buffer mutex and scp mutex safely */
457 lock_ObtainMutex(&bufp->mx);
460 * if the buffer is actively involved in I/O
461 * we wait for the I/O to complete.
463 if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
464 buf_WaitIO(scp, bufp);
466 lock_ObtainWrite(&scp->rw);
467 flags = CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS |
468 CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
469 code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
471 lock_ReleaseMutex(&bufp->mx);
473 buf_UnreserveBuffers(biop->reserved);
476 cm_SyncOpDone(scp, bufp, flags);
477 lock_ReleaseMutex(&bufp->mx);
480 * Add the first buffer into the BIOD list.
485 buf_UnreserveBuffers(1 + inSize / cm_data.buf_blockSize);
488 osi_SetQData(qdp, bufp);
491 buf_ComputeCheckSum(bufp);
493 /* don't have to hold bufp, since held by buf_Get above */
494 osi_QAddH((osi_queue_t **) &biop->bufListp,
495 (osi_queue_t **) &biop->bufListEndp,
497 biop->length = cm_data.buf_blockSize - (afs_uint32)(inOffsetp->QuadPart % cm_data.buf_blockSize);
499 if (biop->length < inSize) {
500 /* scan for the rest of the buffers */
501 thyper = ConvertLongToLargeInteger(biop->length);
502 scanStart = LargeIntegerAdd(bufp->offset, thyper);
503 thyper = ConvertLongToLargeInteger(inSize);
504 scanEnd = LargeIntegerAdd(*inOffsetp, thyper);
506 flags = CM_SCACHESYNC_GETSTATUS | CM_SCACHESYNC_STOREDATA | CM_SCACHESYNC_BUFLOCKED;
507 lock_ReleaseWrite(&scp->rw);
509 for ( tbase = scanStart, thyper = ConvertLongToLargeInteger(cm_data.buf_blockSize);
510 LargeIntegerLessThan(tbase, scanEnd);
511 tbase = LargeIntegerAdd(tbase, thyper))
513 code = buf_Get(scp, &tbase, reqp, BUF_GET_FLAG_BUFCREATE_LOCKED, &bufp);
515 /* Must tear down biod */
519 lock_ObtainMutex(&bufp->mx);
521 * if the buffer is actively involved in I/O
522 * we wait for the I/O to complete.
524 if (bufp->flags & (CM_BUF_WRITING|CM_BUF_READING))
525 buf_WaitIO(scp, bufp);
527 lock_ObtainWrite(&scp->rw);
528 code = cm_SyncOp(scp, bufp, userp, reqp, 0, flags);
529 lock_ReleaseWrite(&scp->rw);
530 lock_ReleaseMutex(&bufp->mx);
537 * Add the buffer into the BIOD list.
545 osi_SetQData(qdp, bufp);
548 buf_ComputeCheckSum(bufp);
550 /* don't have to hold bufp, since held by buf_Get above */
551 osi_QAddH( (osi_queue_t **) &biop->bufListp,
552 (osi_queue_t **) &biop->bufListEndp,
554 biop->length += cm_data.buf_blockSize;
555 bufp = NULL; /* this buffer and reference added to the queue */
558 /* update biod info describing the transfer */
559 if (biop->length > inSize)
560 biop->length = inSize;
562 lock_ObtainWrite(&scp->rw);
565 /* finally, we're done */
566 lock_ReleaseWrite(&scp->bufCreateLock);
570 lock_ReleaseWrite(&scp->bufCreateLock);
571 /* tear down biod and clear buffer reservation */
572 cm_ReleaseBIOD(biop, TRUE, code, FALSE);
573 lock_ObtainWrite(&scp->rw);