#endif /* RX_LOCKS_DB */
/* rxi_ReadProc -- internal version.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held.
+ * LOCKS USED -- called at netpri
*/
int
rxi_ReadProc(struct rx_call *call, char *buf,
do {
if (call->nLeft == 0) {
/* Get next packet */
+ MUTEX_ENTER(&call->lock);
for (;;) {
if (call->error || (call->mode != RX_MODE_RECEIVING)) {
if (call->error) {
+ call->mode = RX_MODE_ERROR;
+ MUTEX_EXIT(&call->lock);
return 0;
}
if (call->mode == RX_MODE_SENDING) {
+ MUTEX_EXIT(&call->lock);
rxi_FlushWrite(call);
+ MUTEX_ENTER(&call->lock);
continue;
}
}
rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
MUTEX_EXIT(&conn->conn_data_lock);
rxi_FreePacket(rp);
- MUTEX_ENTER(&call->lock);
return 0;
}
/* Are there ever going to be any more packets? */
if (call->flags & RX_CALL_RECEIVE_DONE) {
+ MUTEX_EXIT(&call->lock);
return requestCount - nbytes;
}
/* Wait for in-sequence packet */
call->startWait = 0;
#ifdef RX_ENABLE_LOCKS
if (call->error) {
+ MUTEX_EXIT(&call->lock);
return 0;
}
#endif /* RX_ENABLE_LOCKS */
}
+ MUTEX_EXIT(&call->lock);
} else
/* assert(cp); */
/* MTUXXX this should be replaced by some error-recovery code before shipping */
}
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_ReadProc(call, buf, nbytes);
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
}
}
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
* except the last packet (new current packet) are moved to the iovq
* while the application is processing the data.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held.
+ * LOCKS USED -- called at netpri.
*/
int
rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
int nbytes)
{
+ int bytes;
+
/* Free any packets from the last call to ReadvProc/WritevProc */
if (queue_IsNotEmpty(&call->iovq)) {
#ifdef RXDEBUG_PACKET
rxi_FlushWrite(call);
}
- if (call->error) {
- return 0;
- }
+ MUTEX_ENTER(&call->lock);
+ if (call->error)
+ goto error;
/* Get whatever data is currently available in the receive queue.
* If rxi_FillReadVec sends an ack packet then it is possible
call->startWait = 0;
}
call->flags &= ~RX_CALL_IOVEC_WAIT;
-#ifdef RX_ENABLE_LOCKS
- if (call->error) {
- return 0;
- }
-#endif /* RX_ENABLE_LOCKS */
+
+ if (call->error)
+ goto error;
call->iov = NULL;
*nio = call->iovNext;
- return nbytes - call->iovNBytes;
+ bytes = nbytes - call->iovNBytes;
+ MUTEX_EXIT(&call->lock);
+ return bytes;
+
+ error:
+ MUTEX_EXIT(&call->lock);
+ call->mode = RX_MODE_ERROR;
+ return 0;
}
int
SPLVAR;
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
}
/* rxi_WriteProc -- internal version.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
+ * LOCKS USED -- called at netpri
+ */
int
rxi_WriteProc(struct rx_call *call, char *buf,
* anyway. */
do {
if (call->nFree == 0) {
+ MUTEX_ENTER(&call->lock);
+ if (call->error)
+ call->mode = RX_MODE_ERROR;
if (!call->error && cp) {
/* Clear the current packet now so that if
* we are forced to wait and drop the lock
call->startWait = 0;
#ifdef RX_ENABLE_LOCKS
if (call->error) {
+ call->mode = RX_MODE_ERROR;
+ MUTEX_EXIT(&call->lock);
return 0;
}
#endif /* RX_ENABLE_LOCKS */
cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
}
if (call->error) {
+ call->mode = RX_MODE_ERROR;
if (cp) {
#ifdef RX_TRACK_PACKETS
cp->flags &= ~RX_PKTFLAG_CP;
rxi_FreePacket(cp);
call->currentPacket = NULL;
}
+ MUTEX_EXIT(&call->lock);
return 0;
}
+ MUTEX_EXIT(&call->lock);
}
if (cp && (int)call->nFree < nbytes) {
}
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_WriteProc(call, buf, nbytes);
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
}
}
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
}
* Fill in an iovec to point to data in packet buffers. The application
* calls rxi_WritevProc when the buffers are full.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
+ * LOCKS USED -- called at netpri.
+ */
int
rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
if (tnFree == 0) {
/* current packet is full, allocate a new one */
+ MUTEX_ENTER(&call->lock);
cp = rxi_AllocSendPacket(call, nbytes);
+ MUTEX_EXIT(&call->lock);
if (cp == NULL) {
/* out of space, return what we have */
*nio = nextio;
SPLVAR;
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
}
*
* Send buffers allocated in rxi_WritevAlloc.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
-
+ * LOCKS USED -- called at netpri.
+ */
int
rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
{
requestCount = nbytes;
nextio = 0;
- if (call->mode != RX_MODE_SENDING) {
+ MUTEX_ENTER(&call->lock);
+ if (call->error) {
+ call->mode = RX_MODE_ERROR;
+ } else if (call->mode != RX_MODE_SENDING) {
call->error = RX_PROTOCOL_ERROR;
}
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_TQ_WAIT;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- /* cp is no longer valid since we may have given up the lock */
cp = call->currentPacket;
if (call->error) {
+ call->mode = RX_MODE_ERROR;
+ MUTEX_EXIT(&call->lock);
if (cp) {
#ifdef RX_TRACK_PACKETS
cp->flags &= ~RX_PKTFLAG_CP;
#ifdef RXDEBUG_PACKET
call->iovqc++;
#endif /* RXDEBUG_PACKET */
- cp = call->currentPacket = (struct rx_packet *)0;
}
#ifdef RXDEBUG_PACKET
call->iovqc -=
/* The head of the iovq is now the current packet */
if (nbytes) {
if (queue_IsEmpty(&call->iovq)) {
+ MUTEX_EXIT(&call->lock);
call->error = RX_PROTOCOL_ERROR;
#ifdef RXDEBUG_PACKET
tmpqc -=
if (iov[nextio].iov_base != call->curpos
|| iov[nextio].iov_len > (int)call->curlen) {
call->error = RX_PROTOCOL_ERROR;
+ MUTEX_EXIT(&call->lock);
if (cp) {
#ifdef RX_TRACK_PACKETS
cp->flags &= ~RX_PKTFLAG_CP;
p->flags |= RX_PKTFLAG_TQ;
}
#endif
+
+ if (call->error)
+ call->mode = RX_MODE_ERROR;
+
queue_SpliceAppend(&call->tq, &tmpq);
if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
#endif
call->startWait = 0;
}
+
/* cp is no longer valid since we may have given up the lock */
cp = call->currentPacket;
if (call->error) {
+ call->mode = RX_MODE_ERROR;
+ call->currentPacket = NULL;
+ MUTEX_EXIT(&call->lock);
if (cp) {
#ifdef RX_TRACK_PACKETS
cp->flags &= ~RX_PKTFLAG_CP;
#endif
rxi_FreePacket(cp);
- cp = call->currentPacket = (struct rx_packet *)0;
}
return 0;
}
+ MUTEX_EXIT(&call->lock);
return requestCount - nbytes;
}
SPLVAR;
NETPRI;
- MUTEX_ENTER(&call->lock);
bytes = rxi_WritevProc(call, iov, nio, nbytes);
- MUTEX_EXIT(&call->lock);
USERPRI;
return bytes;
}
/* Flush any buffered data to the stream, switch to read mode
- * (clients) or to EOF mode (servers) */
+ * (clients) or to EOF mode (servers)
+ *
+ * LOCKS HELD: called at netpri.
+ */
void
rxi_FlushWrite(struct rx_call *call)
{
}
#endif
+ MUTEX_ENTER(&call->lock);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Wait until TQ_BUSY is reset before adding any
* packets to the transmit queue
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- /* cp is no longer valid since we may have given up the lock */
+ if (call->error)
+ call->mode = RX_MODE_ERROR;
+
cp = call->currentPacket;
if (cp) {
flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
rxi_Start(0, call, 0, 0);
}
+ MUTEX_EXIT(&call->lock);
}
}
{
SPLVAR;
NETPRI;
- MUTEX_ENTER(&call->lock);
rxi_FlushWrite(call);
- MUTEX_EXIT(&call->lock);
USERPRI;
}