* kernel version, and may interrupt the macros rx_Read or
* rx_Write, which run at normal priority for efficiency. */
if (call->currentPacket) {
- rxi_FreePacket(call->currentPacket);
+ queue_Prepend(&call->iovq, call->currentPacket);
call->currentPacket = (struct rx_packet *)0;
- call->nLeft = call->nFree = call->curlen = 0;
- } else
- call->nLeft = call->nFree = call->curlen = 0;
+ }
+
+ call->nLeft = call->nFree = call->curlen = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ rxi_FreePackets(0, &call->iovq);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
MUTEX_EXIT(&call->lock);
int someAcked = 0;
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
- if (!p)
- break;
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
- if (!p)
- break;
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
}
} else {
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
- if (!p)
- break;
- queue_Remove(p);
- rxi_FreePacket(p);
- }
+ rxi_FreePackets(0, &call->tq);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_TQ_CLEARME;
}
void
rxi_ClearReceiveQueue(register struct rx_call *call)
{
- register struct rx_packet *p, *tp;
if (queue_IsNotEmpty(&call->rq)) {
- for (queue_Scan(&call->rq, p, tp, rx_packet)) {
- if (!p)
- break;
- queue_Remove(p);
- rxi_FreePacket(p);
- rx_packetReclaims++;
- }
+ rx_packetReclaims += rxi_FreePackets(0, &call->rq);
call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST);
}
if (call->state == RX_STATE_PRECALL) {
register struct rx_packet *p;
u_char offset;
afs_int32 templ;
+#ifdef RX_ENABLE_TSFPQ
+ struct rx_ts_info_t * rx_ts_info;
+#endif
/*
* Open the receive window once a thread starts reading packets
if (p) {
rx_computelen(p, p->length); /* reset length, you never know */
} /* where that's been... */
+#ifdef RX_ENABLE_TSFPQ
+ else {
+ RX_TS_INFO_GET(rx_ts_info);
+ if ((p = rx_ts_info->local_special_packet)) {
+ rx_computelen(p, p->length);
+ } else if ((p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
+ rx_ts_info->local_special_packet = p;
+ } else { /* We won't send the ack, but don't panic. */
+ return optionalPacket;
+ }
+ }
+#else
else if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
/* We won't send the ack, but don't panic. */
return optionalPacket;
}
+#endif
templ =
rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32) -
rx_GetDataSize(p);
if (templ > 0) {
- if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
+ if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL) > 0) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
return optionalPacket;
}
templ = rx_AckDataSize(call->rwind) + 2 * sizeof(afs_int32);
if (rx_Contiguous(p) < templ) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
return optionalPacket;
}
}
for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
if (!rqp || !call->rq.next
|| (rqp->header.seq > (call->rnext + call->rwind))) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
rxi_CallError(call, RX_CALL_DEAD);
return optionalPacket;
}
ap->acks[offset++] = RX_ACK_TYPE_ACK;
if ((offset > (u_char) rx_maxReceiveWindow) || (offset > call->rwind)) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
rxi_CallError(call, RX_CALL_DEAD);
return optionalPacket;
}
MUTEX_ENTER(&rx_stats_mutex);
rx_stats.ackPacketsSent++;
MUTEX_EXIT(&rx_stats_mutex);
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
return optionalPacket; /* Return packet for re-use by caller */
}
/* FPQ stats */
int checkin_ops;
+ int checkin_xfer;
int checkout_ops;
+ int checkout_xfer;
int gtol_ops;
int gtol_xfer;
int ltog_ops;
int alloc_ops;
int alloc_xfer;
} _FPQ;
+ struct rx_packet * local_special_packet;
} rx_ts_info_t;
EXT struct rx_ts_info_t * rx_ts_info_init(); /* init function for thread-specific data struct */
#define RX_TS_INFO_GET(ts_info_p) \
(p)->niovecs = 2; \
(p)->length = RX_FIRSTBUFFERSIZE; \
} while(0)
+
#ifdef RX_ENABLE_LOCKS
EXT afs_kmutex_t rx_freePktQ_lock;
#endif /* RX_ENABLE_LOCKS */
RX_FPQ_MARK_USED(p); \
(rx_ts_info_p)->_FPQ.len--; \
(rx_ts_info_p)->_FPQ.checkout_ops++; \
+ (rx_ts_info_p)->_FPQ.checkout_xfer++; \
+ } while(0)
+/* checkout multiple packets from the thread-specific free packet queue */
+#define RX_TS_FPQ_CHECKOUT2(rx_ts_info_p,num_transfer,q) \
+ do { \
+ register int i; \
+ register struct rx_packet *p; \
+ for (i=0, p=queue_First(&((rx_ts_info_p)->_FPQ), rx_packet); \
+ i < (num_transfer); \
+ i++, p=queue_Next(p, rx_packet)) { \
+ RX_FPQ_MARK_USED(p); \
+ } \
+ queue_SplitBeforeAppend(&((rx_ts_info_p)->_FPQ),(q),p); \
+ (rx_ts_info_p)->_FPQ.len -= (num_transfer); \
+ (rx_ts_info_p)->_FPQ.checkout_ops++; \
+ (rx_ts_info_p)->_FPQ.checkout_xfer += (num_transfer); \
} while(0)
/* check a packet into the thread-specific free packet queue */
#define RX_TS_FPQ_CHECKIN(rx_ts_info_p,p) \
RX_FPQ_MARK_FREE(p); \
(rx_ts_info_p)->_FPQ.len++; \
(rx_ts_info_p)->_FPQ.checkin_ops++; \
+ (rx_ts_info_p)->_FPQ.checkin_xfer++; \
+ } while(0)
+/* check multiple packets into the thread-specific free packet queue */
+/* num_transfer must equal length of (q); it is not a means of checking
+ * in part of (q). passing num_transfer just saves us instructions
+ * since caller already knows length of (q) for other reasons */
+#define RX_TS_FPQ_CHECKIN2(rx_ts_info_p,num_transfer,q) \
+ do { \
+ register struct rx_packet *p, *np; \
+ for (queue_Scan((q), p, np, rx_packet)) { \
+ RX_FPQ_MARK_FREE(p); \
+ } \
+ queue_SplicePrepend(&((rx_ts_info_p)->_FPQ),(q)); \
+ (rx_ts_info_p)->_FPQ.len += (num_transfer); \
+ (rx_ts_info_p)->_FPQ.checkin_ops++; \
+ (rx_ts_info_p)->_FPQ.checkin_xfer += (num_transfer); \
} while(0)
#endif /* AFS_PTHREAD_ENV */
extern char cml_version_number[];
extern int (*rx_almostSent) ();
+static int AllocPacketBufs(int class, int num_pkts, struct rx_queue *q);
+
static void rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
afs_int32 ahost, short aport,
afs_int32 istack);
return (resid ? (r - resid) : r);
}
+int
+rxi_AllocPackets(int class, int num_pkts, struct rx_queue * q)
+{
+ register struct rx_packet *p, *np;
+
+ num_pkts = AllocPacketBufs(class, num_pkts, q);
+
+ for (queue_Scan(q, p, np, rx_packet)) {
+ RX_PACKET_IOV_FULLINIT(p);
+ }
+
+ return num_pkts;
+}
+
#ifdef RX_ENABLE_TSFPQ
-static struct rx_packet *
-allocCBuf(int class)
+static int
+AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
{
- struct rx_packet *c;
+ register struct rx_packet *c;
register struct rx_ts_info_t * rx_ts_info;
+ int transfer, alloc;
SPLVAR;
RX_TS_INFO_GET(rx_ts_info);
- if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ transfer = num_pkts - rx_ts_info->_FPQ.len;
+ if (transfer > 0) {
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
-
- if (queue_IsEmpty(&rx_freePacketQueue)) {
- rxi_MorePacketsNoLock(rx_initSendWindow);
+
+ if ((transfer + rx_TSFPQGlobSize) <= rx_nFreePackets) {
+ transfer += rx_TSFPQGlobSize;
+ } else if (transfer <= rx_nFreePackets) {
+ transfer = rx_nFreePackets;
+ } else {
+ /* alloc enough for us, plus a few globs for other threads */
+ alloc = transfer + (3 * rx_TSFPQGlobSize) - rx_nFreePackets;
+ rxi_MorePacketsNoLock(MAX(alloc, rx_initSendWindow));
+ transfer += rx_TSFPQGlobSize;
}
- RX_TS_FPQ_GTOL(rx_ts_info);
+ RX_TS_FPQ_GTOL2(rx_ts_info, transfer);
MUTEX_EXIT(&rx_freePktQ_lock);
USERPRI;
}
- RX_TS_FPQ_CHECKOUT(rx_ts_info, c);
+ RX_TS_FPQ_CHECKOUT2(rx_ts_info, num_pkts, q);
- return c;
+ return num_pkts;
}
#else /* RX_ENABLE_TSFPQ */
-static struct rx_packet *
-allocCBuf(int class)
+static int
+AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
{
struct rx_packet *c;
+ int i, overq = 0;
SPLVAR;
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
#ifdef KERNEL
- if (rxi_OverQuota(class)) {
- c = NULL;
+ for (; (num_pkts > 0) && (rxi_OverQuota2(class,num_pkts));
+ num_pkts--, overq++);
+
+ if (overq) {
rxi_NeedMorePackets = TRUE;
MUTEX_ENTER(&rx_stats_mutex);
switch (class) {
break;
}
MUTEX_EXIT(&rx_stats_mutex);
- goto done;
}
- if (queue_IsEmpty(&rx_freePacketQueue)) {
- c = NULL;
+ if (rx_nFreePackets < num_pkts)
+ num_pkts = rx_nFreePackets;
+
+ if (!num_pkts) {
rxi_NeedMorePackets = TRUE;
goto done;
}
#else /* KERNEL */
- if (queue_IsEmpty(&rx_freePacketQueue)) {
- rxi_MorePacketsNoLock(rx_initSendWindow);
+ if (rx_nFreePackets < num_pkts) {
+ rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), rx_initSendWindow));
}
#endif /* KERNEL */
- rx_nFreePackets--;
- c = queue_First(&rx_freePacketQueue, rx_packet);
- queue_Remove(c);
- if (!(c->flags & RX_PKTFLAG_FREE))
- osi_Panic("rxi_AllocPacket: packet not free\n");
- c->flags = 0; /* clear RX_PKTFLAG_FREE, initialize the rest */
- c->header.flags = 0;
+ for (i=0, c=queue_First(&rx_freePacketQueue, rx_packet);
+ i < num_pkts;
+ i++, c=queue_Next(c, rx_packet)) {
+ RX_FPQ_MARK_USED(c);
+ }
+
+ queue_SplitBeforeAppend(&rx_freePacketQueue,q,c);
+
+ rx_nFreePackets -= num_pkts;
#ifdef KERNEL
done:
MUTEX_EXIT(&rx_freePktQ_lock);
USERPRI;
- return c;
+ return num_pkts;
}
#endif /* RX_ENABLE_TSFPQ */
* Free a packet currently used as a continuation buffer
*/
#ifdef RX_ENABLE_TSFPQ
-void
-rxi_freeCBuf(struct rx_packet *c)
+/* num_pkts=0 means queue length is unknown */
+int
+rxi_FreePackets(int num_pkts, struct rx_queue * q)
{
register struct rx_ts_info_t * rx_ts_info;
- register int i;
+ register struct rx_packet *c, *nc;
SPLVAR;
+ if (!num_pkts) {
+ queue_Count(q, c, nc, rx_packet, num_pkts);
+ if (!num_pkts)
+ return 0;
+ }
+
RX_TS_INFO_GET(rx_ts_info);
- RX_TS_FPQ_CHECKIN(rx_ts_info,c);
+ RX_TS_FPQ_CHECKIN2(rx_ts_info, num_pkts, q);
if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) {
NETPRI;
MUTEX_EXIT(&rx_freePktQ_lock);
USERPRI;
}
+
+ return num_pkts;
}
#else /* RX_ENABLE_TSFPQ */
-void
-rxi_freeCBuf(struct rx_packet *c)
+/* num_pkts=0 means queue length is unknown */
+int
+rxi_FreePackets(int num_pkts, struct rx_queue *q)
{
+ register struct rx_packet *p, *np;
SPLVAR;
+ if (!num_pkts) {
+ for (queue_Scan(q, p, np, rx_packet), num_pkts++) {
+ RX_FPQ_MARK_FREE(p);
+ }
+ if (!num_pkts)
+ return 0;
+ } else {
+ for (queue_Scan(q, p, np, rx_packet)) {
+ RX_FPQ_MARK_FREE(p);
+ }
+ }
+
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
- rxi_FreePacketNoLock(c);
+ queue_SpliceAppend(&rx_freePacketQueue, q);
+ rx_nFreePackets += num_pkts;
+
/* Wakeup anyone waiting for packets */
rxi_PacketsUnWait();
MUTEX_EXIT(&rx_freePktQ_lock);
USERPRI;
+
+ return num_pkts;
}
#endif /* RX_ENABLE_TSFPQ */
* returns the number of bytes >0 which it failed to come up with.
* Don't need to worry about locking on packet, since only
* one thread can manipulate one at a time. Locking on continution
- * packets is handled by allocCBuf */
+ * packets is handled by AllocPacketBufs */
/* MTUXXX don't need to go throught the for loop if we can trust niovecs */
int
rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
{
- int i;
-
- for (i = p->niovecs; nb > 0 && i < RX_MAXWVECS; i++) {
- register struct rx_packet *cb;
- if ((cb = allocCBuf(class))) {
- p->wirevec[i].iov_base = (caddr_t) cb->localdata;
- p->wirevec[i].iov_len = RX_CBUFFERSIZE;
- nb -= RX_CBUFFERSIZE;
- p->length += RX_CBUFFERSIZE;
- p->niovecs++;
- } else
- break;
- }
+ int i, nv;
+ struct rx_queue q;
+ register struct rx_packet *cb, *ncb;
+
+ /* compute the number of cbuf's we need */
+ nv = nb / RX_CBUFFERSIZE;
+ if ((nv * RX_CBUFFERSIZE) < nb)
+ nv++;
+ if ((nv + p->niovecs) > RX_MAXWVECS)
+ nv = RX_MAXWVECS - p->niovecs;
+ if (nv < 1)
+ return nb;
+
+ /* allocate buffers */
+ queue_Init(&q);
+ nv = AllocPacketBufs(class, nv, &q);
+
+ /* setup packet iovs */
+ for (i = p->niovecs, queue_Scan(&q, cb, ncb, rx_packet), i++) {
+ queue_Remove(cb);
+ p->wirevec[i].iov_base = (caddr_t) cb->localdata;
+ p->wirevec[i].iov_len = RX_CBUFFERSIZE;
+ }
+
+ nb -= (nv * RX_CBUFFERSIZE);
+ p->length += (nv * RX_CBUFFERSIZE);
+ p->niovecs += nv;
return nb;
}
if (len > 0) {
osi_Panic("PrepareSendPacket 1\n"); /* MTUXXX */
} else {
+ struct rx_queue q;
+ int nb;
+
+ queue_Init(&q);
+
/* Free any extra elements in the wirevec */
- for (j = MAX(2, i); j < p->niovecs; j++) {
- rxi_freeCBuf(RX_CBUF_TO_PACKET(p->wirevec[j].iov_base, p));
+ for (j = MAX(2, i), nb = j - p->niovecs; j < p->niovecs; j++) {
+ queue_Append(&q,RX_CBUF_TO_PACKET(p->wirevec[j].iov_base, p));
}
+ if (nb)
+ rxi_FreePackets(nb, &q);
+
p->niovecs = i;
p->wirevec[i - 1].iov_len += len;
}
#ifdef KERNEL
#define rxi_OverQuota(packetclass) (rx_nFreePackets - 1 < rx_packetQuota[packetclass])
+#define rxi_OverQuota2(packetclass,num_alloc) (rx_nFreePackets - (num_alloc) < rx_packetQuota[packetclass])
#endif /* KERNEL */
/* this returns an afs_int32 from byte offset o in packet p. offset must
unsigned int offset, int resid, char *out);
extern afs_int32 rx_SlowWritePacket(struct rx_packet *packet, int offset,
int resid, char *in);
-extern void rxi_freeCBuf(struct rx_packet *c);
extern int rxi_RoundUpPacket(struct rx_packet *p, unsigned int nb);
extern int rxi_AllocDataBuf(struct rx_packet *p, int nb, int cla_ss);
extern void rxi_MorePackets(int apackets);
extern void rxi_FreePacket(struct rx_packet *p);
extern struct rx_packet *rxi_AllocPacketNoLock(int cla_ss);
extern struct rx_packet *rxi_AllocPacket(int cla_ss);
+extern int rxi_AllocPackets(int cla_ss, int num_pkts, struct rx_queue *q);
+extern int rxi_FreePackets(int num_pkts, struct rx_queue *q);
extern struct rx_packet *rxi_AllocSendPacket(register struct rx_call *call,
int want);
extern int rxi_ReadPacket(int socket, register struct rx_packet *p,
{
register struct rx_packet *cp = call->currentPacket;
register struct rx_packet *rp;
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
register int requestCount;
register unsigned int t;
+
/* XXXX took out clock_NewTime from here. Was it needed? */
requestCount = nbytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- if (!queue_IsEmpty(&call->iovq)) {
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
do {
* ReadvProc/WritevProc.
*/
if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ rxi_FreePackets(0, &call->iovq);
}
/*
* ReadvProc/WritevProc.
*/
if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ rxi_FreePackets(0, &call->iovq);
}
/*
int nbytes)
{
struct rx_packet *rp;
- struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
int requestCount;
int nextio;
nextio = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode == RX_MODE_SENDING) {
{
struct rx_connection *conn = call->conn;
register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
register unsigned int t;
int requestCount = nbytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- if (!queue_IsEmpty(&call->iovq)) {
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode != RX_MODE_SENDING) {
* RX_CALL_IOVEC_WAIT is always cleared before returning from
* ReadvProc/WritevProc.
*/
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
/*
* RX_CALL_IOVEC_WAIT is always cleared before returning from
* ReadvProc/WritevProc.
*/
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
/*
{
struct rx_connection *conn = call->conn;
struct rx_packet *cp = call->currentPacket;
- struct rx_packet *tp; /* temporary packet pointer */
- struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
int requestCount;
int nextio;
/* Temporary values, real work is done in rxi_WritevProc */
nextio = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode != RX_MODE_SENDING) {
rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
{
struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
int nextio;
int requestCount;
struct rx_queue tmpq;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
if (call->error) {
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
if (cp) {
- rxi_FreePacket(cp);
+ queue_Prepend(&call->iovq, cp);
cp = call->currentPacket = NULL;
}
+ rxi_FreePackets(0, &call->iovq);
return 0;
}
if (queue_IsEmpty(&call->iovq)) {
call->error = RX_PROTOCOL_ERROR;
cp = call->currentPacket = NULL;
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ rxi_FreePackets(0, &tmpq);
return 0;
}
cp = queue_First(&call->iovq, rx_packet);
if (iov[nextio].iov_base != call->curpos
|| iov[nextio].iov_len > (int)call->curlen) {
call->error = RX_PROTOCOL_ERROR;
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
if (cp) {
- rxi_FreePacket(cp);
+ queue_Prepend(&tmpq, cp);
call->currentPacket = NULL;
}
+ rxi_FreePackets(0, &tmpq);
return 0;
}
nbytes -= iov[nextio].iov_len;
/* Move the packets from the temporary queue onto the transmit queue.
* We may end up with more than call->twind packets on the queue. */
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- queue_Append(&call->tq, tp);
- }
+ queue_SpliceAppend(&call->tq, &tmpq);
if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
rxi_Start(0, call, 0, 0);
rxi_FlushWrite(register struct rx_call *call)
{
register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode == RX_MODE_SENDING) {