void buf_Release(cm_buf_t *bp)
#endif
{
- afs_int32 refCount;
-
- /* ensure that we're in the LRU queue if our ref count is 0 */
- osi_assertx(bp->magic == CM_BUF_MAGIC,"incorrect cm_buf_t magic");
-
- refCount = InterlockedDecrement(&bp->refCount);
-#ifdef DEBUG_REFCOUNT
- osi_Log2(afsd_logp,"buf_Release bp 0x%p ref %d", bp, refCount);
- afsi_log("%s:%d buf_ReleaseLocked bp 0x%p, ref %d", file, line, bp, refCount);
-#endif
-#ifdef DEBUG
- if (refCount < 0)
- osi_panic("buf refcount 0",__FILE__,__LINE__);;
-#else
- osi_assertx(refCount >= 0, "cm_buf_t refCount == 0");
-#endif
- if (refCount == 0) {
- lock_ObtainWrite(&buf_globalLock);
- if (bp->refCount == 0 &&
- !(bp->qFlags & (CM_BUF_QINLRU|CM_BUF_QREDIR))) {
- osi_QAddH( (osi_queue_t **) &cm_data.buf_freeListp,
- (osi_queue_t **) &cm_data.buf_freeListEndp,
- &bp->q);
- _InterlockedOr(&bp->qFlags, CM_BUF_QINLRU);
- buf_IncrementFreeCount();
- }
- lock_ReleaseWrite(&buf_globalLock);
- }
+ lock_ObtainRead(&buf_globalLock);
+ buf_ReleaseLocked(bp, FALSE);
+ lock_ReleaseRead(&buf_globalLock);
}
long
case vl_unknown:
cm_InitReq(&req);
req.flags |= CM_REQ_NORETRY;
- buf_CleanAsyncLocked(NULL, bp, &req, 0, &dirty);
+ buf_CleanLocked(NULL, bp, &req, 0, &dirty);
wasDirty |= dirty;
}
cm_PutVolume(volp);
}
/* incremental sync daemon. Writes all dirty buffers every 5000 ms */
-void buf_IncrSyncer(long parm)
+static void *
+buf_IncrSyncer(void * parm)
{
long wasDirty = 0;
long i;
wasDirty = buf_Sync(1);
} /* whole daemon's while loop */
+
+ pthread_exit(NULL);
+ return NULL;
}
long
{
static osi_once_t once;
cm_buf_t *bp;
- thread_t phandle;
+ pthread_t phandle;
+ pthread_attr_t tattr;
+ int pstatus;
long i;
- unsigned long pid;
char *data;
if ( newFile ) {
cm_data.buf_nOrigBuffers = cm_data.buf_nbuffers;
/* lower hash size to a prime number */
- cm_data.buf_hashSize = osi_PrimeLessThan((afs_uint32)(cm_data.buf_nbuffers/7 + 1));
+ cm_data.buf_hashSize = cm_NextHighestPowerOf2((afs_uint32)(cm_data.buf_nbuffers/7));
/* create hash table */
memset((void *)cm_data.buf_scacheHashTablepp, 0, cm_data.buf_hashSize * sizeof(cm_buf_t *));
bp = cm_data.bufHeaderBaseAddress;
data = cm_data.bufDataBaseAddress;
+ lock_ObtainWrite(&buf_globalLock);
for (i=0; i<cm_data.buf_nbuffers; i++) {
lock_InitializeMutex(&bp->mx, "Buffer mutex", LOCK_HIERARCHY_BUFFER);
bp->userp = NULL;
* extent was not returned by the file system driver.
* clean up the mess.
*/
+ buf_RemoveFromRedirQueue(NULL, bp);
bp->dataVersion = CM_BUF_VERSION_BAD;
- _InterlockedAnd(&bp->qFlags, ~CM_BUF_QREDIR);
- osi_QRemoveHT( (osi_queue_t **) &cm_data.buf_redirListp,
- (osi_queue_t **) &cm_data.buf_redirListEndp,
- &bp->q);
- buf_DecrementRedirCount();
bp->redirq.nextp = bp->redirq.prevp = NULL;
bp->redirLastAccess = 0;
bp->redirReleaseRequested = 0;
- buf_Release(bp);
+ buf_ReleaseLocked(bp, TRUE);
}
bp++;
}
* extent was not returned by the file system driver.
* clean up the mess.
*/
+ buf_RemoveFromRedirQueue(NULL, bp);
bp->dataVersion = CM_BUF_VERSION_BAD;
- _InterlockedAnd(&bp->qFlags, ~CM_BUF_QREDIR);
- osi_QRemoveHT( (osi_queue_t **) &cm_data.buf_redirListp,
- (osi_queue_t **) &cm_data.buf_redirListEndp,
- &bp->q);
- buf_DecrementRedirCount();
bp->redirq.nextp = bp->redirq.prevp = NULL;
bp->redirLastAccess = 0;
bp->redirReleaseRequested = 0;
- buf_Release(bp);
+ buf_ReleaseLocked(bp, TRUE);
}
+ lock_ReleaseWrite(&buf_globalLock);
}
#ifdef TESTING
osi_EndOnce(&once);
/* and create the incr-syncer */
- phandle = thrd_Create(0, 0,
- (ThreadFunc) buf_IncrSyncer, 0, 0, &pid,
- "buf_IncrSyncer");
+ pthread_attr_init(&tattr);
+ pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
+
+ pstatus = pthread_create(&phandle, &tattr, buf_IncrSyncer, 0);
+ osi_assertx(pstatus == 0, "buf: can't create incremental sync proc");
- osi_assertx(phandle != NULL, "buf: can't create incremental sync proc");
- CloseHandle(phandle);
+ pthread_attr_destroy(&tattr);
}
#ifdef TESTING
* 'scp' may or may not be NULL. If it is not NULL, the FID for both cm_scache_t
* and cm_buf_t must match.
*/
-afs_uint32 buf_CleanAsyncLocked(cm_scache_t *scp, cm_buf_t *bp, cm_req_t *reqp,
+afs_uint32 buf_CleanLocked(cm_scache_t *scp, cm_buf_t *bp, cm_req_t *reqp,
afs_uint32 flags, afs_uint32 *pisdirty)
{
afs_uint32 code = 0;
* that the cm_scache_t was recycled out of the cache even though
* a cm_buf_t with the same FID is in the cache.
*/
- if (scp == NULL) {
- if ((scp = cm_FindSCache(&bp->fid)) ||
- (cm_GetSCache(&bp->fid, &scp,
- bp->userp ? bp->userp : cm_rootUserp,
- reqp) == 0)) {
- release_scp = 1;
- }
+ if (scp == NULL &&
+ cm_GetSCache(&bp->fid, NULL, &scp,
+ bp->userp ? bp->userp : cm_rootUserp,
+ reqp) == 0)
+ {
+ release_scp = 1;
}
while ((bp->flags & CM_BUF_DIRTY) == CM_BUF_DIRTY) {
* in fact be the case but we don't know that until we attempt
* a FetchStatus on the FID.
*/
- osi_Log1(buf_logp, "buf_CleanAsyncLocked unable to start I/O - scp not found buf 0x%p", bp);
+ osi_Log1(buf_logp, "buf_CleanLocked unable to start I/O - scp not found buf 0x%p", bp);
code = CM_ERROR_NOSUCHFILE;
} else {
- osi_Log2(buf_logp, "buf_CleanAsyncLocked starts I/O on scp 0x%p buf 0x%p", scp, bp);
+ osi_Log2(buf_logp, "buf_CleanLocked starts I/O on scp 0x%p buf 0x%p", scp, bp);
offset = bp->offset;
LargeIntegerAdd(offset, ConvertLongToLargeInteger(bp->dirty_offset));
+ /*
+ * Only specify the dirty length of the current buffer in the call
+ * to cm_BufWrite(). It is the responsibility of cm_BufWrite()
+ * to determine if it is appropriate to fill a full chunk of data
+ * when storing to the file server.
+ */
code = (*cm_buf_opsp->Writep)(scp, &offset,
-#if 1
- /* we might as well try to write all of the contiguous
- * dirty buffers in one RPC
- */
- cm_chunkSize,
-#else
bp->dirty_length,
-#endif
flags, bp->userp, reqp);
- osi_Log3(buf_logp, "buf_CleanAsyncLocked I/O on scp 0x%p buf 0x%p, done=%d", scp, bp, code);
+ osi_Log3(buf_logp, "buf_CleanLocked I/O on scp 0x%p buf 0x%p, done=%d", scp, bp, code);
}
lock_ObtainMutex(&bp->mx);
/* if the Write routine returns No Such File, clear the dirty flag
}
/* Called with a zero-ref count buffer and with the buf_globalLock write locked.
- * recycles the buffer, and leaves it ready for reuse with a ref count of 1.
+ * recycles the buffer, and leaves it ready for reuse with a ref count of 0.
* The buffer must already be clean, and no I/O should be happening to it.
*/
void buf_Recycle(cm_buf_t *bp)
* a clean buffer, we rehash it, lock it and return it.
*/
for (bp = cm_data.buf_freeListEndp; bp; bp=(cm_buf_t *) osi_QPrev(&bp->q)) {
+ int cleaned = 0;
+
n_bufs++;
+ retry_2:
/* check to see if it really has zero ref count. This
* code can bump refcounts, at least, so it may not be
* zero.
if (bp->qFlags & CM_BUF_QREDIR)
continue;
+ /* protect against cleaning the same buffer more than once. */
+ if (cleaned)
+ continue;
+
/* if the buffer is dirty, start cleaning it and
* move on to the next buffer. We do this with
* just the lock required to minimize contention
lock_ReleaseWrite(&buf_globalLock);
lock_ReleaseRead(&scp->bufCreateLock);
- /* grab required lock and clean; this only
- * starts the I/O. By the time we're back,
- * it'll still be marked dirty, but it will also
- * have the WRITING flag set, so we won't get
- * back here.
+ /*
+ * grab required lock and clean.
+ * previously the claim was that the cleaning
+ * operation was async which it is not. It would
+ * be a good idea to use an async mechanism here
+ * but there is none at the moment other than
+ * the buf_IncrSyncer() thread.
*/
if (cm_FidCmp(&scp->fid, &bp->fid) == 0)
- buf_CleanAsync(scp, bp, reqp, 0, NULL);
+ buf_Clean(scp, bp, reqp, 0, NULL);
else
- buf_CleanAsync(NULL, bp, reqp, 0, NULL);
+ buf_Clean(NULL, bp, reqp, 0, NULL);
/* now put it back and go around again */
buf_Release(bp);
lock_ReleaseRead(&scp->bufCreateLock);
return CM_BUF_EXISTS;
}
- continue;
+
+ /*
+ * We just cleaned this buffer so we need to
+ * restart the loop with this buffer so it
+ * can be retested. Set 'cleaned' so we
+ * do not attempt another call to buf_Clean()
+ * if the prior attempt failed.
+ */
+ cleaned = 1;
+ goto retry_2;
}
osi_Log3(afsd_logp, "buf_GetNewLocked: scp 0x%p examined %u buffers before recycling bufp 0x%p",
buf_DecrementFreeCount();
/* prepare to return it. Give it a refcount */
- bp->refCount = 1;
+ InterlockedIncrement(&bp->refCount);
#ifdef DEBUG_REFCOUNT
osi_Log2(afsd_logp,"buf_GetNewLocked bp 0x%p ref %d", bp, 1);
afsi_log("%s:%d buf_GetNewLocked bp 0x%p, ref %d", __FILE__, __LINE__, bp, 1);
}
/* clean a buffer synchronously */
-afs_uint32 buf_CleanAsync(cm_scache_t *scp, cm_buf_t *bp, cm_req_t *reqp, afs_uint32 flags, afs_uint32 *pisdirty)
+afs_uint32 buf_Clean(cm_scache_t *scp, cm_buf_t *bp, cm_req_t *reqp, afs_uint32 flags, afs_uint32 *pisdirty)
{
long code;
osi_assertx(bp->magic == CM_BUF_MAGIC, "invalid cm_buf_t magic");
osi_assertx(!(flags & CM_BUF_WRITE_SCP_LOCKED), "scp->rw must not be held when calling buf_CleanAsync");
lock_ObtainMutex(&bp->mx);
- code = buf_CleanAsyncLocked(scp, bp, reqp, flags, pisdirty);
+ code = buf_CleanLocked(scp, bp, reqp, flags, pisdirty);
lock_ReleaseMutex(&bp->mx);
return code;
cm_InitReq(&req);
req.flags |= CM_REQ_NORETRY;
- buf_CleanAsync(NULL, bp, &req, 0, NULL);
+ buf_Clean(NULL, bp, &req, 0, NULL);
buf_CleanWait(NULL, bp, FALSE);
/* relock and release buffer */
long code;
long bufferPos;
afs_uint32 i;
+ afs_uint32 invalidate = 0;
/* assert that cm_bufCreateLock is held in write mode */
lock_AssertWrite(&scp->bufCreateLock);
lock_ReleaseWrite(&buf_globalLock);
}
} else {
- if (RDR_Initialized)
- RDR_InvalidateObject(scp->fid.cell, scp->fid.volume, scp->fid.vnode,
- scp->fid.unique, scp->fid.hash,
- scp->fileType, AFS_INVALIDATE_SMB);
+ invalidate = 1;
}
_InterlockedAnd(&bufp->flags, ~CM_BUF_DIRTY);
bufp->error = 0;
buf_ValidateBufQueues();
#endif /* TESTING */
+ if (invalidate && RDR_Initialized)
+ RDR_InvalidateObject(scp->fid.cell, scp->fid.volume, scp->fid.vnode,
+ scp->fid.unique, scp->fid.hash,
+ scp->fileType, AFS_INVALIDATE_SMB);
+
/* done */
return code;
}
lock_ObtainMutex(&bp->mx);
/* start cleaning the buffer, and wait for it to finish */
- buf_CleanAsyncLocked(scp, bp, reqp, 0, NULL);
+ buf_CleanLocked(scp, bp, reqp, 0, NULL);
buf_WaitIO(scp, bp);
lock_ReleaseMutex(&bp->mx);
}
/* Must be called with scp->rw held */
+long buf_InvalidateBuffers(cm_scache_t * scp)
+{
+ cm_buf_t * bp;
+ afs_uint32 i;
+ int found = 0;
+
+ lock_AssertAny(&scp->rw);
+
+ i = BUF_FILEHASH(&scp->fid);
+
+ lock_ObtainRead(&buf_globalLock);
+
+ for (bp = cm_data.buf_fileHashTablepp[i]; bp; bp = bp->fileHashp) {
+ if (cm_FidCmp(&bp->fid, &scp->fid) == 0) {
+ bp->dataVersion = CM_BUF_VERSION_BAD;
+ found = 1;
+ }
+ }
+ lock_ReleaseRead(&buf_globalLock);
+
+ if (found)
+ return 0;
+ else
+ return ENOENT;
+}
+
+/* Must be called with scp->rw held */
long buf_ForceDataVersion(cm_scache_t * scp, afs_uint64 fromVersion, afs_uint64 toVersion)
{
cm_buf_t * bp;
*/
break;
default:
- code = buf_CleanAsyncLocked(scp, bp, reqp, 0, &wasDirty);
+ code = buf_CleanLocked(scp, bp, reqp, 0, &wasDirty);
if (bp->flags & CM_BUF_ERROR) {
code = bp->error;
if (code == 0)
{
lock_AssertWrite(&buf_globalLock);
+ if (!(bufp->qFlags & CM_BUF_QREDIR))
+ return;
+
lock_ObtainMutex(&scp->redirMx);
_InterlockedAnd(&bufp->qFlags, ~CM_BUF_QREDIR);
buf_MoveToHeadOfRedirQueue(cm_scache_t *scp, cm_buf_t *bufp)
{
lock_AssertWrite(&buf_globalLock);
- osi_assertx(bufp->qFlags & CM_BUF_QREDIR,
- "buf_MoveToHeadOfRedirQueue buffer not held by redirector");
+ if (!(bufp->qFlags & CM_BUF_QREDIR))
+ return;
lock_ObtainMutex(&scp->redirMx);