From 559ea99b1e6e6e82ec6a77541ef9844ccc764de8 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Thu, 25 Sep 2008 19:59:33 +0000 Subject: [PATCH] rx-flag-all-packets-20080925 LICENSE IPL10 flag packets for all queues, and when they are currentPacket somewhere --- src/rx/rx.c | 16 +++++++++++++++- src/rx/rx_globals.h | 1 + src/rx/rx_packet.h | 5 ++++- src/rx/rx_rdwr.c | 37 ++++++++++++++++++++++++++++++++----- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 1ad46eb..e887680 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -2065,7 +2065,8 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc) * kernel version, and may interrupt the macros rx_Read or * rx_Write, which run at normal priority for efficiency. */ if (call->currentPacket) { - queue_Prepend(&call->iovq, call->currentPacket); + call->currentPacket->flags &= ~RX_PKTFLAG_CP; + rxi_FreePacket(call->currentPacket); call->currentPacket = (struct rx_packet *)0; } @@ -3335,6 +3336,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call, /* It's the next packet. Stick it on the receive queue * for this call. Set newPackets to make sure we wake * the reader once all packets have been processed */ + np->flags |= RX_PKTFLAG_RQ; queue_Prepend(&call->rq, np); call->nSoftAcks++; np = NULL; /* We can't use this anymore */ @@ -3781,6 +3783,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np, #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ { queue_Remove(tp); + tp->flags &= ~RX_PKTFLAG_TQ; rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */ } } @@ -4661,6 +4664,16 @@ rxi_ResetCall(register struct rx_call *call, register int newcall) rxi_ClearReceiveQueue(call); /* why init the queue if you just emptied it? queue_Init(&call->rq); */ + + if (call->currentPacket) { + call->currentPacket->flags &= ~RX_PKTFLAG_CP; + rxi_FreePacket(call->currentPacket); + call->currentPacket = (struct rx_packet *)0; + } + call->curlen = call->nLeft = call->nFree = 0; + + rxi_FreePackets(0, &call->iovq); + call->error = 0; call->twind = call->conn->twind[call->channel]; call->rwind = call->conn->rwind[call->channel]; @@ -5429,6 +5442,7 @@ rxi_Start(struct rxevent *event, if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) { queue_Remove(p); + p->flags &= ~RX_PKTFLAG_TQ; rxi_FreePacket(p); } else missing = 1; diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 684fea2..101c172 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -203,6 +203,7 @@ EXT struct rx_queue rx_freePacketQueue; if ((p)->flags & RX_PKTFLAG_FREE) \ osi_Panic("rx packet already free\n"); \ (p)->flags |= RX_PKTFLAG_FREE; \ + (p)->flags &= ~(RX_PKTFLAG_TQ|RX_PKTFLAG_IOVQ|RX_PKTFLAG_RQ|RX_PKTFLAG_CP); \ (p)->length = 0; \ (p)->niovecs = 0; \ } while(0) diff --git a/src/rx/rx_packet.h b/src/rx/rx_packet.h index 9559c19..d04b3fe 100644 --- a/src/rx/rx_packet.h +++ b/src/rx/rx_packet.h @@ -173,7 +173,10 @@ */ #define RX_PKTFLAG_ACKED 0x01 #define RX_PKTFLAG_FREE 0x02 - +#define RX_PKTFLAG_TQ 0x04 +#define RX_PKTFLAG_RQ 0x08 +#define RX_PKTFLAG_IOVQ 0x10 +#define RX_PKTFLAG_CP 0x20 /* The rx part of the header of a packet, in host form */ struct rx_header { diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index 5baff8a..1417144 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -137,6 +137,7 @@ rxi_ReadProc(register struct rx_call *call, register char *buf, afs_int32 error; register struct rx_connection *conn = call->conn; queue_Remove(rp); + rp->flags &= ~RX_PKTFLAG_RQ; /* RXS_CheckPacket called to undo RXS_PreparePacket's * work. It may reduce the length of the packet by up @@ -161,6 +162,7 @@ rxi_ReadProc(register struct rx_call *call, register char *buf, } call->rnext++; cp = call->currentPacket = rp; + call->currentPacket->flags |= RX_PKTFLAG_CP; call->curvec = 1; /* 0th vec is always header */ /* begin at the beginning [ more or less ], continue * on until the end, then stop. */ @@ -263,6 +265,7 @@ MTUXXX doesn't there need to be an "else" here ??? if (!call->nLeft) { /* out of packet. Get another one. */ + call->currentPacket->flags &= ~RX_PKTFLAG_CP; rxi_FreePacket(cp); cp = call->currentPacket = (struct rx_packet *)0; } else if (!call->curlen) { @@ -270,6 +273,7 @@ MTUXXX doesn't there need to be an "else" here ??? if (++call->curvec >= cp->niovecs) { /* current packet is exhausted, get ready for another */ /* don't worry about curvec and stuff, they get set somewhere else */ + call->currentPacket->flags &= ~RX_PKTFLAG_CP; rxi_FreePacket(cp); cp = call->currentPacket = (struct rx_packet *)0; call->nLeft = 0; @@ -436,6 +440,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) afs_int32 error; register struct rx_connection *conn = call->conn; queue_Remove(rp); + rp->flags &= ~RX_PKTFLAG_RQ; /* RXS_CheckPacket called to undo RXS_PreparePacket's * work. It may reduce the length of the packet by up @@ -459,6 +464,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) } call->rnext++; curp = call->currentPacket = rp; + call->currentPacket->flags |= RX_PKTFLAG_CP; call->curvec = 1; /* 0th vec is always header */ cur_iov = &curp->wirevec[1]; /* begin at the beginning [ more or less ], continue @@ -513,6 +519,8 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) if (!call->nLeft) { /* out of packet. Get another one. */ + curp->flags &= ~RX_PKTFLAG_CP; + curp->flags |= RX_PKTFLAG_IOVQ; queue_Append(&call->iovq, curp); curp = call->currentPacket = (struct rx_packet *)0; } else if (!call->curlen) { @@ -520,6 +528,8 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) if (++call->curvec >= curp->niovecs) { /* current packet is exhausted, get ready for another */ /* don't worry about curvec and stuff, they get set somewhere else */ + curp->flags &= ~RX_PKTFLAG_CP; + curp->flags |= RX_PKTFLAG_IOVQ; queue_Append(&call->iovq, curp); curp = call->currentPacket = (struct rx_packet *)0; call->nLeft = 0; @@ -670,6 +680,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf, && (call->mode == RX_MODE_RECEIVING)) { call->mode = RX_MODE_SENDING; if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; rxi_FreePacket(cp); cp = call->currentPacket = (struct rx_packet *)0; call->nLeft = 0; @@ -707,14 +718,20 @@ rxi_WriteProc(register struct rx_call *call, register char *buf, * conn->securityMaxTrailerSize */ hadd32(call->bytesSent, cp->length); rxi_PrepareSendPacket(call, cp, 0); + cp->flags &= ~RX_PKTFLAG_CP; + cp->flags |= RX_PKTFLAG_TQ; queue_Append(&call->tq, cp); - cp = call->currentPacket = NULL; + cp = call->currentPacket = (struct rx_packet *)0; if (! (call-> flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) { rxi_Start(0, call, 0, 0); } + } else else if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; + rxi_FreePacket(cp); + cp = call->currentPacket = (struct rx_packet *)0; } /* Wait for transmit window to open up */ while (!call->error @@ -737,6 +754,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf, #endif /* RX_ENABLE_LOCKS */ } if ((cp = rxi_AllocSendPacket(call, nbytes))) { + cp->flags |= RX_PKTFLAG_CP; call->currentPacket = cp; call->nFree = cp->length; call->curvec = 1; /* 0th vec is always header */ @@ -750,6 +768,7 @@ rxi_WriteProc(register struct rx_call *call, register char *buf, } if (call->error) { if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; rxi_FreePacket(cp); call->currentPacket = NULL; } @@ -944,6 +963,7 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, && (call->mode == RX_MODE_RECEIVING)) { call->mode = RX_MODE_SENDING; if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; rxi_FreePacket(cp); cp = call->currentPacket = (struct rx_packet *)0; call->nLeft = 0; @@ -970,6 +990,7 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio, *nio = nextio; return requestCount - nbytes; } + cp->flags |= RX_PKTFLAG_IOVQ; queue_Append(&call->iovq, cp); tnFree = cp->length; tcurvec = 1; @@ -1073,8 +1094,10 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) if (call->error) { if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; + cp->flags |= RX_PKTFLAG_IOVQ; queue_Prepend(&call->iovq, cp); - cp = call->currentPacket = NULL; + cp = call->currentPacket = (struct rx_packet *)0; } rxi_FreePackets(0, &call->iovq); return 0; @@ -1096,18 +1119,20 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) * conn->securityMaxTrailerSize */ hadd32(call->bytesSent, cp->length); rxi_PrepareSendPacket(call, cp, 0); + cp->flags |= RX_PKTFLAG_TQ; queue_Append(&tmpq, cp); /* The head of the iovq is now the current packet */ if (nbytes) { if (queue_IsEmpty(&call->iovq)) { call->error = RX_PROTOCOL_ERROR; - cp = call->currentPacket = NULL; rxi_FreePackets(0, &tmpq); return 0; } cp = queue_First(&call->iovq, rx_packet); queue_Remove(cp); + cp->flags &= ~RX_PKTFLAG_IOVQ; + cp->flags |= RX_PKTFLAG_CP; call->currentPacket = cp; call->nFree = cp->length; call->curvec = 1; @@ -1125,8 +1150,8 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) || iov[nextio].iov_len > (int)call->curlen) { call->error = RX_PROTOCOL_ERROR; if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; queue_Prepend(&tmpq, cp); - call->currentPacket = NULL; } rxi_FreePackets(0, &tmpq); return 0; @@ -1170,8 +1195,8 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes) if (call->error) { if (cp) { + cp->flags &= ~RX_PKTFLAG_CP; rxi_FreePacket(cp); - cp = call->currentPacket = NULL; } return 0; } @@ -1242,6 +1267,7 @@ rxi_FlushWrite(register struct rx_call *call) /* cp->length is only supposed to be the user's data */ /* cp->length was already set to (then-current) * MaxUserDataSize or less. */ + cp->flags &= ~RX_PKTFLAG_CP; cp->length -= call->nFree; call->currentPacket = (struct rx_packet *)0; call->nFree = 0; @@ -1259,6 +1285,7 @@ rxi_FlushWrite(register struct rx_call *call) /* The 1 specifies that this is the last packet */ hadd32(call->bytesSent, cp->length); rxi_PrepareSendPacket(call, cp, 1); + cp->flags |= RX_PKTFLAG_TQ; queue_Append(&call->tq, cp); if (! (call-> -- 1.9.4