/*
* Copyright 2000, International Business Machines Corporation and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#include <afsconfig.h>
-#ifdef KERNEL
-#include "afs/param.h"
-#else
#include <afs/param.h>
-#endif
-
-RCSID
- ("$Header$");
#ifdef KERNEL
-#if defined(UKERNEL)
-#include "afs/sysincludes.h"
-#include "afsincludes.h"
-#include "rx/rx_kcommon.h"
-#include "rx/rx_clock.h"
-#include "rx/rx_queue.h"
-#include "rx/rx_packet.h"
-#else /* defined(UKERNEL) */
-#ifdef RX_KERNEL_TRACE
-#include "../rx/rx_kcommon.h"
-#endif
-#include "h/types.h"
-#ifndef AFS_LINUX20_ENV
-#include "h/systm.h"
-#endif
-#if defined(AFS_SGI_ENV) || defined(AFS_HPUX110_ENV)
-#include "afs/sysincludes.h"
-#endif
-#if defined(AFS_OBSD_ENV)
-#include "h/proc.h"
-#endif
-#include "h/socket.h"
-#if !defined(AFS_SUN5_ENV) && !defined(AFS_LINUX20_ENV) && !defined(AFS_HPUX110_ENV)
-#if !defined(AFS_OSF_ENV) && !defined(AFS_AIX41_ENV)
-#include "sys/mount.h" /* it gets pulled in by something later anyway */
-#endif
-#include "h/mbuf.h"
-#endif
-#include "netinet/in.h"
-#include "afs/afs_osi.h"
-#include "rx_kmutex.h"
-#include "rx/rx_clock.h"
-#include "rx/rx_queue.h"
-#ifdef AFS_SUN5_ENV
-#include <sys/sysmacros.h>
-#endif
-#include "rx/rx_packet.h"
-#endif /* defined(UKERNEL) */
-#include "rx/rx_globals.h"
+# if defined(UKERNEL)
+# include "afs/sysincludes.h"
+# include "afsincludes.h"
+# include "rx_kcommon.h"
+# else /* defined(UKERNEL) */
+# ifdef RX_KERNEL_TRACE
+# include "rx_kcommon.h"
+# endif
+# include "h/types.h"
+# ifndef AFS_LINUX20_ENV
+# include "h/systm.h"
+# endif
+# if defined(AFS_SGI_ENV) || defined(AFS_HPUX110_ENV) || defined(AFS_NBSD50_ENV)
+# include "afs/sysincludes.h"
+# endif
+# if defined(AFS_OBSD_ENV)
+# include "h/proc.h"
+# endif
+# include "h/socket.h"
+# if !defined(AFS_SUN5_ENV) && !defined(AFS_LINUX20_ENV) && !defined(AFS_HPUX110_ENV)
+# if !defined(AFS_OSF_ENV) && !defined(AFS_AIX41_ENV)
+# include "sys/mount.h" /* it gets pulled in by something later anyway */
+# endif
+# include "h/mbuf.h"
+# endif
+# include "netinet/in.h"
+# include "afs/afs_osi.h"
+# include "rx_kmutex.h"
+# endif /* defined(UKERNEL) */
#else /* KERNEL */
-#include "sys/types.h"
-#include <sys/stat.h>
-#include <errno.h>
-#if defined(AFS_NT40_ENV) || defined(AFS_DJGPP_ENV)
-#ifdef AFS_NT40_ENV
-#include <winsock2.h>
-#ifndef EWOULDBLOCK
-#define EWOULDBLOCK WSAEWOULDBLOCK
-#endif
-#else
-#include <sys/socket.h>
-#include <netinet/in.h>
-#endif /* AFS_NT40_ENV */
-#include "rx_user.h"
-#include "rx_xmit_nt.h"
-#include <stdlib.h>
-#else
-#include <sys/socket.h>
-#include <netinet/in.h>
-#endif
-#include "rx_clock.h"
-#include "rx.h"
-#include "rx_queue.h"
+# include <roken.h>
+# include <assert.h>
+# include <afs/opr.h>
+# if defined(AFS_NT40_ENV)
+# ifndef EWOULDBLOCK
+# define EWOULDBLOCK WSAEWOULDBLOCK
+# endif
+# include "rx_user.h"
+# include "rx_xmit_nt.h"
+# endif
+# include <lwp.h>
+#endif /* KERNEL */
+
#ifdef AFS_SUN5_ENV
-#include <sys/sysmacros.h>
+# include <sys/sysmacros.h>
#endif
+
+#include <opr/queue.h>
+
+#include "rx.h"
+#include "rx_clock.h"
#include "rx_packet.h"
+#include "rx_atomic.h"
#include "rx_globals.h"
-#include <lwp.h>
-#include <assert.h>
-#ifdef HAVE_STRING_H
-#include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#endif /* KERNEL */
+#include "rx_internal.h"
+#include "rx_stats.h"
+
+#include "rx_peer.h"
+#include "rx_conn.h"
+#include "rx_call.h"
+
+/*!
+ * \brief structure used to keep track of allocated packets
+ */
+struct rx_mallocedPacket {
+ struct opr_queue entry; /*!< chained using opr_queue */
+ struct rx_packet *addr; /*!< address of the first element */
+ afs_uint32 size; /*!< array size in bytes */
+};
#ifdef RX_LOCKS_DB
/* rxdb_fileID is used to identify the lock location, along with line#. */
static int rxdb_fileID = RXDB_FILE_RX_PACKET;
#endif /* RX_LOCKS_DB */
-struct rx_packet *rx_mallocedP = 0;
+static struct rx_packet *rx_mallocedP = 0;
+#ifdef RXDEBUG_PACKET
+static afs_uint32 rx_packet_id = 0;
+#endif
extern char cml_version_number[];
-extern int (*rx_almostSent) ();
-static int AllocPacketBufs(int class, int num_pkts, struct rx_queue *q);
+static int AllocPacketBufs(int class, int num_pkts, struct opr_queue *q);
static void rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
- struct sockaddr_storage *saddr, int slen,
+ afs_uint32 ahost, short aport,
afs_int32 istack);
+static struct rx_packet *rxi_AllocPacketNoLock(int class);
-static int rxi_FreeDataBufsToQueue(struct rx_packet *p, int first,
- struct rx_queue * q);
+#ifndef KERNEL
+static void rxi_MorePacketsNoLock(int apackets);
+#endif
+
+#ifdef RX_ENABLE_TSFPQ
+static int rxi_FreeDataBufsTSFPQ(struct rx_packet *p, afs_uint32 first,
+ int flush_global);
+static void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local,
+ int allow_overcommit);
+#else
+static void rxi_FreePacketNoLock(struct rx_packet *p);
+static int rxi_FreeDataBufsNoLock(struct rx_packet *p, afs_uint32 first);
+static int rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first,
+ struct opr_queue * q);
+#endif
+
+extern struct opr_queue rx_idleServerQueue;
/* some rules about packets:
* 1. When a packet is allocated, the final iov_buf contains room for
*/
/* Preconditions:
- * all packet buffers (iov_base) are integral multiples of
+ * all packet buffers (iov_base) are integral multiples of
* the word size.
* offset is an integral multiple of the word size.
*/
* offset only applies to the first iovec.
*/
r = resid;
- while ((resid > 0) && (i < packet->niovecs)) {
- j = MIN(resid, packet->wirevec[i].iov_len - (offset - l));
+ while ((r > 0) && (i < packet->niovecs)) {
+ j = MIN(r, packet->wirevec[i].iov_len - (offset - l));
memcpy(out, (char *)(packet->wirevec[i].iov_base) + (offset - l), j);
- resid -= j;
+ r -= j;
out += j;
l += packet->wirevec[i].iov_len;
offset = l;
i++;
}
- return (resid ? (r - resid) : r);
+ return (r ? (resid - r) : resid);
}
afs_int32
rx_SlowWritePacket(struct rx_packet * packet, int offset, int resid, char *in)
{
- int i, j, l, r;
+ unsigned int i, j, l, o, r;
char *b;
- for (l = 0, i = 1; i < packet->niovecs; i++) {
- if (l + packet->wirevec[i].iov_len > offset) {
+ for (l = 0, i = 1, o = offset; i < packet->niovecs; i++) {
+ if (l + packet->wirevec[i].iov_len > o) {
break;
}
l += packet->wirevec[i].iov_len;
* offset only applies to the first iovec.
*/
r = resid;
- while ((resid > 0) && (i < RX_MAXWVECS)) {
+ while ((r > 0) && (i <= RX_MAXWVECS)) {
if (i >= packet->niovecs)
- if (rxi_AllocDataBuf(packet, resid, RX_PACKET_CLASS_SEND_CBUF) > 0) /* ++niovecs as a side-effect */
+ if (rxi_AllocDataBuf(packet, r, RX_PACKET_CLASS_SEND_CBUF) > 0) /* ++niovecs as a side-effect */
break;
b = (char *)(packet->wirevec[i].iov_base) + (offset - l);
- j = MIN(resid, packet->wirevec[i].iov_len - (offset - l));
+ j = MIN(r, packet->wirevec[i].iov_len - (offset - l));
memcpy(b, in, j);
- resid -= j;
+ r -= j;
in += j;
l += packet->wirevec[i].iov_len;
offset = l;
i++;
}
- return (resid ? (r - resid) : r);
+ return (r ? (resid - r) : resid);
}
int
-rxi_AllocPackets(int class, int num_pkts, struct rx_queue * q)
+rxi_AllocPackets(int class, int num_pkts, struct opr_queue * q)
{
- register struct rx_packet *p, *np;
+ struct opr_queue *c;
num_pkts = AllocPacketBufs(class, num_pkts, q);
- for (queue_Scan(q, p, np, rx_packet)) {
- RX_PACKET_IOV_FULLINIT(p);
+ for (opr_queue_Scan(q, c)) {
+ RX_PACKET_IOV_FULLINIT(opr_queue_Entry(c, struct rx_packet, entry));
}
return num_pkts;
#ifdef RX_ENABLE_TSFPQ
static int
-AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
+AllocPacketBufs(int class, int num_pkts, struct opr_queue * q)
{
- register struct rx_packet *c;
- register struct rx_ts_info_t * rx_ts_info;
- int transfer, alloc;
+ struct rx_ts_info_t * rx_ts_info;
+ int transfer;
SPLVAR;
RX_TS_INFO_GET(rx_ts_info);
if (transfer > 0) {
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
-
- if ((transfer + rx_TSFPQGlobSize) <= rx_nFreePackets) {
- transfer += rx_TSFPQGlobSize;
- } else if (transfer <= rx_nFreePackets) {
- transfer = rx_nFreePackets;
- } else {
+ transfer = MAX(transfer, rx_TSFPQGlobSize);
+ if (transfer > rx_nFreePackets) {
/* alloc enough for us, plus a few globs for other threads */
- alloc = transfer + (3 * rx_TSFPQGlobSize) - rx_nFreePackets;
- rxi_MorePacketsNoLock(MAX(alloc, rx_initSendWindow));
- transfer += rx_TSFPQGlobSize;
+ rxi_MorePacketsNoLock(transfer + 4 * rx_initSendWindow);
}
RX_TS_FPQ_GTOL2(rx_ts_info, transfer);
USERPRI;
}
- RX_TS_FPQ_CHECKOUT2(rx_ts_info, num_pkts, q);
+ RX_TS_FPQ_QCHECKOUT(rx_ts_info, num_pkts, q);
return num_pkts;
}
#else /* RX_ENABLE_TSFPQ */
static int
-AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
+AllocPacketBufs(int class, int num_pkts, struct opr_queue * q)
{
struct rx_packet *c;
- int i, overq = 0;
+ int i;
+#ifdef KERNEL
+ int overq = 0;
+#endif
SPLVAR;
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
#ifdef KERNEL
- for (; (num_pkts > 0) && (rxi_OverQuota2(class,num_pkts));
+ for (; (num_pkts > 0) && (rxi_OverQuota2(class,num_pkts));
num_pkts--, overq++);
if (overq) {
rxi_NeedMorePackets = TRUE;
- MUTEX_ENTER(&rx_stats_mutex);
- switch (class) {
- case RX_PACKET_CLASS_RECEIVE:
- rx_stats.receivePktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SEND:
- rx_stats.sendPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SPECIAL:
- rx_stats.specialPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_RECV_CBUF:
- rx_stats.receiveCbufPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SEND_CBUF:
- rx_stats.sendCbufPktAllocFailures++;
- break;
+ if (rx_stats_active) {
+ switch (class) {
+ case RX_PACKET_CLASS_RECEIVE:
+ rx_atomic_inc(&rx_stats.receivePktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_SEND:
+ rx_atomic_inc(&rx_stats.sendPktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_SPECIAL:
+ rx_atomic_inc(&rx_stats.specialPktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_RECV_CBUF:
+ rx_atomic_inc(&rx_stats.receiveCbufPktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_SEND_CBUF:
+ rx_atomic_inc(&rx_stats.sendCbufPktAllocFailures);
+ break;
+ }
}
- MUTEX_EXIT(&rx_stats_mutex);
}
if (rx_nFreePackets < num_pkts)
}
#else /* KERNEL */
if (rx_nFreePackets < num_pkts) {
- rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), rx_initSendWindow));
+ rxi_MorePacketsNoLock(MAX((num_pkts-rx_nFreePackets), 4 * rx_initSendWindow));
}
#endif /* KERNEL */
- for (i=0, c=queue_First(&rx_freePacketQueue, rx_packet);
- i < num_pkts;
- i++, c=queue_Next(c, rx_packet)) {
+ for (i=0, c=opr_queue_First(&rx_freePacketQueue, struct rx_packet, entry);
+ i < num_pkts;
+ i++, c=opr_queue_Next(&c->entry, struct rx_packet, entry)) {
RX_FPQ_MARK_USED(c);
}
- queue_SplitBeforeAppend(&rx_freePacketQueue,q,c);
+ opr_queue_SplitBeforeAppend(&rx_freePacketQueue, q, &c->entry);
rx_nFreePackets -= num_pkts;
#ifdef RX_ENABLE_TSFPQ
/* num_pkts=0 means queue length is unknown */
int
-rxi_FreePackets(int num_pkts, struct rx_queue * q)
+rxi_FreePackets(int num_pkts, struct opr_queue * q)
{
- register struct rx_ts_info_t * rx_ts_info;
- register struct rx_packet *c, *nc;
+ struct rx_ts_info_t * rx_ts_info;
+ struct opr_queue *cursor, *store;
SPLVAR;
osi_Assert(num_pkts >= 0);
RX_TS_INFO_GET(rx_ts_info);
if (!num_pkts) {
- for (queue_Scan(q, c, nc, rx_packet), num_pkts++) {
- rxi_FreeDataBufsTSFPQ(c, 1, 0);
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ num_pkts++;
+ rxi_FreeDataBufsTSFPQ(opr_queue_Entry(cursor, struct rx_packet,
+ entry), 2, 0);
}
} else {
- for (queue_Scan(q, c, nc, rx_packet)) {
- rxi_FreeDataBufsTSFPQ(c, 1, 0);
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ rxi_FreeDataBufsTSFPQ(opr_queue_Entry(cursor, struct rx_packet,
+ entry), 2, 0);
}
}
if (num_pkts) {
- RX_TS_FPQ_CHECKIN2(rx_ts_info, num_pkts, q);
+ RX_TS_FPQ_QCHECKIN(rx_ts_info, num_pkts, q);
}
if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) {
#else /* RX_ENABLE_TSFPQ */
/* num_pkts=0 means queue length is unknown */
int
-rxi_FreePackets(int num_pkts, struct rx_queue *q)
+rxi_FreePackets(int num_pkts, struct opr_queue *q)
{
- struct rx_queue cbs;
- register struct rx_packet *p, *np;
+ struct opr_queue cbs;
+ struct opr_queue *cursor, *store;
int qlen = 0;
SPLVAR;
osi_Assert(num_pkts >= 0);
- queue_Init(&cbs);
+ opr_queue_Init(&cbs);
if (!num_pkts) {
- for (queue_Scan(q, p, np, rx_packet), num_pkts++) {
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
if (p->niovecs > 2) {
qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
}
RX_FPQ_MARK_FREE(p);
+ num_pkts++;
}
if (!num_pkts)
return 0;
} else {
- for (queue_Scan(q, p, np, rx_packet)) {
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->niovecs > 2) {
qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
}
}
if (qlen) {
- queue_SpliceAppend(q, &cbs);
+ opr_queue_SpliceAppend(q, &cbs);
qlen += num_pkts;
} else
qlen = num_pkts;
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
- queue_SpliceAppend(&rx_freePacketQueue, q);
+ opr_queue_SpliceAppend(&rx_freePacketQueue, q);
rx_nFreePackets += qlen;
/* Wakeup anyone waiting for packets */
#endif /* RX_ENABLE_TSFPQ */
/* this one is kind of awful.
- * In rxkad, the packet has been all shortened, and everything, ready for
+ * In rxkad, the packet has been all shortened, and everything, ready for
* sending. All of a sudden, we discover we need some of that space back.
* This isn't terribly general, because it knows that the packets are only
* rounded up to the EBS (userdata + security header).
rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
{
int i, nv;
- struct rx_queue q;
- register struct rx_packet *cb, *ncb;
+ struct opr_queue q, *cursor, *store;
/* compute the number of cbuf's we need */
nv = nb / RX_CBUFFERSIZE;
return nb;
/* allocate buffers */
- queue_Init(&q);
+ opr_queue_Init(&q);
nv = AllocPacketBufs(class, nv, &q);
/* setup packet iovs */
- for (i = p->niovecs, queue_Scan(&q, cb, ncb, rx_packet), i++) {
- queue_Remove(cb);
+ i = p ->niovecs;
+ for (opr_queue_ScanSafe(&q, cursor, store)) {
+ struct rx_packet *cb
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
+ opr_queue_Remove(&cb->entry);
p->wirevec[i].iov_base = (caddr_t) cb->localdata;
p->wirevec[i].iov_len = RX_CBUFFERSIZE;
+ i++;
}
nb -= (nv * RX_CBUFFERSIZE);
return nb;
}
+/**
+ * Register allocated packets.
+ *
+ * @param[in] addr array of packets
+ * @param[in] npkt number of packets
+ *
+ * @return none
+ */
+static void
+registerPackets(struct rx_packet *addr, afs_uint32 npkt)
+{
+ struct rx_mallocedPacket *mp;
+
+ mp = osi_Alloc(sizeof(*mp));
+
+ osi_Assert(mp != NULL);
+ memset(mp, 0, sizeof(*mp));
+
+ mp->addr = addr;
+ mp->size = npkt * sizeof(struct rx_packet);
+ osi_Assert(npkt <= MAX_AFS_UINT32 / sizeof(struct rx_packet));
+
+ MUTEX_ENTER(&rx_mallocedPktQ_lock);
+ opr_queue_Append(&rx_mallocedPacketQueue, &mp->entry);
+ MUTEX_EXIT(&rx_mallocedPktQ_lock);
+}
+
/* Add more packet buffers */
#ifdef RX_ENABLE_TSFPQ
void
rxi_MorePackets(int apackets)
{
struct rx_packet *p, *e;
- register struct rx_ts_info_t * rx_ts_info;
+ struct rx_ts_info_t * rx_ts_info;
int getme;
SPLVAR;
getme = apackets * sizeof(struct rx_packet);
- p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+ p = osi_Alloc(getme);
+ osi_Assert(p);
+ registerPackets(p, apackets);
PIN(p, getme); /* XXXXX */
- memset((char *)p, 0, getme);
+ memset(p, 0, getme);
RX_TS_INFO_GET(rx_ts_info);
+ RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
+ /* TSFPQ patch also needs to keep track of total packets */
+
+ MUTEX_ENTER(&rx_packets_mutex);
+ rx_nPackets += apackets;
+ RX_TS_FPQ_COMPUTE_LIMITS;
+ MUTEX_EXIT(&rx_packets_mutex);
+
for (e = p + apackets; p < e; p++) {
RX_PACKET_IOV_INIT(p);
p->niovecs = 2;
RX_TS_FPQ_CHECKIN(rx_ts_info,p);
+
+ NETPRI;
+ MUTEX_ENTER(&rx_freePktQ_lock);
+#ifdef RXDEBUG_PACKET
+ p->packetId = rx_packet_id++;
+ p->allNextp = rx_mallocedP;
+#endif /* RXDEBUG_PACKET */
+ rx_mallocedP = p;
+ MUTEX_EXIT(&rx_freePktQ_lock);
+ USERPRI;
}
rx_ts_info->_FPQ.delta += apackets;
SPLVAR;
getme = apackets * sizeof(struct rx_packet);
- p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+ p = osi_Alloc(getme);
+ osi_Assert(p);
+ registerPackets(p, apackets);
PIN(p, getme); /* XXXXX */
- memset((char *)p, 0, getme);
+ memset(p, 0, getme);
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
for (e = p + apackets; p < e; p++) {
RX_PACKET_IOV_INIT(p);
+#ifdef RX_TRACK_PACKETS
p->flags |= RX_PKTFLAG_FREE;
+#endif
p->niovecs = 2;
- queue_Append(&rx_freePacketQueue, p);
+ opr_queue_Append(&rx_freePacketQueue, &p->entry);
+#ifdef RXDEBUG_PACKET
+ p->packetId = rx_packet_id++;
+ p->allNextp = rx_mallocedP;
+#endif /* RXDEBUG_PACKET */
+ rx_mallocedP = p;
}
+
+ rx_nPackets += apackets;
rx_nFreePackets += apackets;
rxi_NeedMorePackets = FALSE;
rxi_PacketsUnWait();
rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
{
struct rx_packet *p, *e;
- register struct rx_ts_info_t * rx_ts_info;
+ struct rx_ts_info_t * rx_ts_info;
int getme;
SPLVAR;
getme = apackets * sizeof(struct rx_packet);
- p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+ p = osi_Alloc(getme);
+ registerPackets(p, apackets);
PIN(p, getme); /* XXXXX */
- memset((char *)p, 0, getme);
+ memset(p, 0, getme);
RX_TS_INFO_GET(rx_ts_info);
+ RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
+ /* TSFPQ patch also needs to keep track of total packets */
+ MUTEX_ENTER(&rx_packets_mutex);
+ rx_nPackets += apackets;
+ RX_TS_FPQ_COMPUTE_LIMITS;
+ MUTEX_EXIT(&rx_packets_mutex);
+
for (e = p + apackets; p < e; p++) {
RX_PACKET_IOV_INIT(p);
p->niovecs = 2;
-
RX_TS_FPQ_CHECKIN(rx_ts_info,p);
+
+ NETPRI;
+ MUTEX_ENTER(&rx_freePktQ_lock);
+#ifdef RXDEBUG_PACKET
+ p->packetId = rx_packet_id++;
+ p->allNextp = rx_mallocedP;
+#endif /* RXDEBUG_PACKET */
+ rx_mallocedP = p;
+ MUTEX_EXIT(&rx_freePktQ_lock);
+ USERPRI;
}
rx_ts_info->_FPQ.delta += apackets;
- if (flush_global &&
+ if (flush_global &&
(num_keep_local < apackets)) {
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
#ifndef KERNEL
/* Add more packet buffers */
-void
+static void
rxi_MorePacketsNoLock(int apackets)
{
+#ifdef RX_ENABLE_TSFPQ
+ struct rx_ts_info_t * rx_ts_info;
+#endif /* RX_ENABLE_TSFPQ */
struct rx_packet *p, *e;
int getme;
* to hold maximal amounts of data */
apackets += (apackets / 4)
* ((rx_maxJumboRecvSize - RX_FIRSTBUFFERSIZE) / RX_CBUFFERSIZE);
- getme = apackets * sizeof(struct rx_packet);
- p = rx_mallocedP = (struct rx_packet *)osi_Alloc(getme);
+ do {
+ getme = apackets * sizeof(struct rx_packet);
+ p = osi_Alloc(getme);
+ if (p == NULL) {
+ apackets -= apackets / 4;
+ osi_Assert(apackets > 0);
+ }
+ } while(p == NULL);
+ memset(p, 0, getme);
+ registerPackets(p, apackets);
- memset((char *)p, 0, getme);
+#ifdef RX_ENABLE_TSFPQ
+ RX_TS_INFO_GET(rx_ts_info);
+ RX_TS_FPQ_GLOBAL_ALLOC(rx_ts_info,apackets);
+#endif /* RX_ENABLE_TSFPQ */
for (e = p + apackets; p < e; p++) {
RX_PACKET_IOV_INIT(p);
+#ifdef RX_TRACK_PACKETS
p->flags |= RX_PKTFLAG_FREE;
+#endif
p->niovecs = 2;
- queue_Append(&rx_freePacketQueue, p);
+ opr_queue_Append(&rx_freePacketQueue, &p->entry);
+#ifdef RXDEBUG_PACKET
+ p->packetId = rx_packet_id++;
+ p->allNextp = rx_mallocedP;
+#endif /* RXDEBUG_PACKET */
+ rx_mallocedP = p;
}
rx_nFreePackets += apackets;
-#ifdef RX_ENABLE_TSFPQ
- /* TSFPQ patch also needs to keep track of total packets */
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_packets_mutex);
rx_nPackets += apackets;
+#ifdef RX_ENABLE_TSFPQ
RX_TS_FPQ_COMPUTE_LIMITS;
- MUTEX_EXIT(&rx_stats_mutex);
#endif /* RX_ENABLE_TSFPQ */
+ MUTEX_EXIT(&rx_packets_mutex);
rxi_NeedMorePackets = FALSE;
rxi_PacketsUnWait();
}
void
rxi_FreeAllPackets(void)
{
- /* must be called at proper interrupt level, etcetera */
- /* MTUXXX need to free all Packets */
- osi_Free(rx_mallocedP,
- (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
- UNPIN(rx_mallocedP, (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
+ struct rx_mallocedPacket *mp;
+
+ MUTEX_ENTER(&rx_mallocedPktQ_lock);
+
+ while (!opr_queue_IsEmpty(&rx_mallocedPacketQueue)) {
+ mp = opr_queue_First(&rx_mallocedPacketQueue,
+ struct rx_mallocedPacket, entry);
+ opr_queue_Remove(&mp->entry);
+ osi_Free(mp->addr, mp->size);
+ UNPIN(mp->addr, mp->size);
+ osi_Free(mp, sizeof(*mp));
+ }
+ MUTEX_EXIT(&rx_mallocedPktQ_lock);
}
#ifdef RX_ENABLE_TSFPQ
-void
+static void
rxi_AdjustLocalPacketsTSFPQ(int num_keep_local, int allow_overcommit)
{
- register struct rx_ts_info_t * rx_ts_info;
- register int xfer;
+ struct rx_ts_info_t * rx_ts_info;
+ int xfer;
SPLVAR;
RX_TS_INFO_GET(rx_ts_info);
if ((num_keep_local > rx_TSFPQLocalMax) && !allow_overcommit)
xfer = rx_TSFPQLocalMax - rx_ts_info->_FPQ.len;
if (rx_nFreePackets < xfer) {
- rxi_MorePacketsNoLock(xfer - rx_nFreePackets);
+ rxi_MorePacketsNoLock(MAX(xfer - rx_nFreePackets, 4 * rx_initSendWindow));
}
RX_TS_FPQ_GTOL2(rx_ts_info, xfer);
}
rx_CheckPackets(void)
{
if (rxi_NeedMorePackets) {
- rxi_MorePackets(rx_initSendWindow);
+ rxi_MorePackets(rx_maxSendWindow);
}
}
In any event, we assume the former, and append the packets to the end
of the free list. */
/* This explanation is bogus. The free list doesn't remain in any kind of
- useful order for afs_int32: the packets in use get pretty much randomly scattered
+ useful order for afs_int32: the packets in use get pretty much randomly scattered
across all the pages. In order to permit unused {packets,bufs} to page out, they
- must be stored so that packets which are adjacent in memory are adjacent in the
+ must be stored so that packets which are adjacent in memory are adjacent in the
free list. An array springs rapidly to mind.
*/
/* Actually free the packet p. */
-#ifdef RX_ENABLE_TSFPQ
-void
-rxi_FreePacketNoLock(struct rx_packet *p)
-{
- register struct rx_ts_info_t * rx_ts_info;
- dpf(("Free %lx\n", (unsigned long)p));
-
- RX_TS_INFO_GET(rx_ts_info);
- RX_TS_FPQ_CHECKIN(rx_ts_info,p);
- if (rx_ts_info->_FPQ.len > rx_TSFPQLocalMax) {
- RX_TS_FPQ_LTOG(rx_ts_info);
- }
-}
-#else /* RX_ENABLE_TSFPQ */
-void
+#ifndef RX_ENABLE_TSFPQ
+static void
rxi_FreePacketNoLock(struct rx_packet *p)
{
- dpf(("Free %lx\n", (unsigned long)p));
+ dpf(("Free %"AFS_PTR_FMT"\n", p));
RX_FPQ_MARK_FREE(p);
rx_nFreePackets++;
- queue_Append(&rx_freePacketQueue, p);
+ opr_queue_Append(&rx_freePacketQueue, &p->entry);
}
#endif /* RX_ENABLE_TSFPQ */
#ifdef RX_ENABLE_TSFPQ
-void
+static void
rxi_FreePacketTSFPQ(struct rx_packet *p, int flush_global)
{
- register struct rx_ts_info_t * rx_ts_info;
- dpf(("Free %lx\n", (unsigned long)p));
+ struct rx_ts_info_t * rx_ts_info;
+ dpf(("Free %"AFS_PTR_FMT"\n", p));
RX_TS_INFO_GET(rx_ts_info);
RX_TS_FPQ_CHECKIN(rx_ts_info,p);
}
#endif /* RX_ENABLE_TSFPQ */
-/* free continuation buffers off a packet into a queue of buffers */
+/*
+ * free continuation buffers off a packet into a queue
+ *
+ * [IN] p -- packet from which continuation buffers will be freed
+ * [IN] first -- iovec offset of first continuation buffer to free
+ * [IN] q -- queue into which continuation buffers will be chained
+ *
+ * returns:
+ * number of continuation buffers freed
+ */
+#ifndef RX_ENABLE_TSFPQ
static int
-rxi_FreeDataBufsToQueue(struct rx_packet *p, int first, struct rx_queue * q)
+rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first, struct opr_queue * q)
{
struct iovec *iov;
struct rx_packet * cb;
int count = 0;
- if (first < 2)
- first = 2;
- for (; first < p->niovecs; first++, count++) {
+ for (first = MAX(2, first); first < p->niovecs; first++, count++) {
iov = &p->wirevec[first];
if (!iov->iov_base)
- osi_Panic("rxi_PacketIOVToQueue: unexpected NULL iov");
+ osi_Panic("rxi_FreeDataBufsToQueue: unexpected NULL iov");
cb = RX_CBUF_TO_PACKET(iov->iov_base, p);
RX_FPQ_MARK_FREE(cb);
- queue_Append(q, cb);
+ opr_queue_Append(q, &cb->entry);
}
p->length = 0;
p->niovecs = 0;
return count;
}
-int
-rxi_FreeDataBufsNoLock(struct rx_packet *p, int first)
+/*
+ * free packet continuation buffers into the global free packet pool
+ *
+ * [IN] p -- packet from which to free continuation buffers
+ * [IN] first -- iovec offset of first continuation buffer to free
+ *
+ * returns:
+ * zero always
+ */
+static int
+rxi_FreeDataBufsNoLock(struct rx_packet *p, afs_uint32 first)
{
- struct iovec *iov, *end;
+ struct iovec *iov;
- if (first != 1) /* MTUXXX */
- osi_Panic("FreeDataBufs 1: first must be 1");
- iov = &p->wirevec[1];
- end = iov + (p->niovecs - 1);
- if (iov->iov_base != (caddr_t) p->localdata) /* MTUXXX */
- osi_Panic("FreeDataBufs 2: vec 1 must be localdata");
- for (iov++; iov < end; iov++) {
+ for (first = MAX(2, first); first < p->niovecs; first++) {
+ iov = &p->wirevec[first];
if (!iov->iov_base)
- osi_Panic("FreeDataBufs 3: vecs 2-niovecs must not be NULL");
+ osi_Panic("rxi_FreeDataBufsNoLock: unexpected NULL iov");
rxi_FreePacketNoLock(RX_CBUF_TO_PACKET(iov->iov_base, p));
}
p->length = 0;
return 0;
}
-#ifdef RX_ENABLE_TSFPQ
-int
-rxi_FreeDataBufsTSFPQ(struct rx_packet *p, int first, int flush_global)
+#else
+
+/*
+ * free packet continuation buffers into the thread-local free pool
+ *
+ * [IN] p -- packet from which continuation buffers will be freed
+ * [IN] first -- iovec offset of first continuation buffer to free
+ * any value less than 2, the min number of iovecs,
+ * is treated as if it is 2.
+ * [IN] flush_global -- if nonzero, we will flush overquota packets to the
+ * global free pool before returning
+ *
+ * returns:
+ * zero always
+ */
+static int
+rxi_FreeDataBufsTSFPQ(struct rx_packet *p, afs_uint32 first, int flush_global)
{
- struct iovec *iov, *end;
- register struct rx_ts_info_t * rx_ts_info;
+ struct iovec *iov;
+ struct rx_ts_info_t * rx_ts_info;
RX_TS_INFO_GET(rx_ts_info);
- if (first != 1) /* MTUXXX */
- osi_Panic("FreeDataBufs 1: first must be 1");
- iov = &p->wirevec[1];
- end = iov + (p->niovecs - 1);
- if (iov->iov_base != (caddr_t) p->localdata) /* MTUXXX */
- osi_Panic("FreeDataBufs 2: vec 1 must be localdata");
- for (iov++; iov < end; iov++) {
+ for (first = MAX(2, first); first < p->niovecs; first++) {
+ iov = &p->wirevec[first];
if (!iov->iov_base)
- osi_Panic("FreeDataBufs 3: vecs 2-niovecs must not be NULL");
+ osi_Panic("rxi_FreeDataBufsTSFPQ: unexpected NULL iov");
RX_TS_FPQ_CHECKIN(rx_ts_info,RX_CBUF_TO_PACKET(iov->iov_base, p));
}
p->length = 0;
int rxi_nBadIovecs = 0;
-/* rxi_RestoreDataBufs
+/* rxi_RestoreDataBufs
*
* Restore the correct sizes to the iovecs. Called when reusing a packet
* for reading off the wire.
void
rxi_RestoreDataBufs(struct rx_packet *p)
{
- int i;
- struct iovec *iov = &p->wirevec[2];
+ unsigned int i;
+ struct iovec *iov;
RX_PACKET_IOV_INIT(p);
{
int length;
struct iovec *iov, *end;
- register struct rx_ts_info_t * rx_ts_info;
+ struct rx_ts_info_t * rx_ts_info;
SPLVAR;
if (first != 1)
void
rxi_FreePacket(struct rx_packet *p)
{
- rxi_FreeDataBufsTSFPQ(p, 1, 0);
+ rxi_FreeDataBufsTSFPQ(p, 2, 0);
rxi_FreePacketTSFPQ(p, RX_TS_FPQ_FLUSH_GLOBAL);
}
#else /* RX_ENABLE_TSFPQ */
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
- rxi_FreeDataBufsNoLock(p, 1);
+ rxi_FreeDataBufsNoLock(p, 2);
rxi_FreePacketNoLock(p);
/* Wakeup anyone waiting for packets */
rxi_PacketsUnWait();
}
#endif /* RX_ENABLE_TSFPQ */
-/* rxi_AllocPacket sets up p->length so it reflects the number of
+/* rxi_AllocPacket sets up p->length so it reflects the number of
* bytes in the packet at this point, **not including** the header.
* The header is absolutely necessary, besides, this is the way the
* length field is usually used */
#ifdef RX_ENABLE_TSFPQ
-struct rx_packet *
+static struct rx_packet *
rxi_AllocPacketNoLock(int class)
{
- register struct rx_packet *p;
- register struct rx_ts_info_t * rx_ts_info;
+ struct rx_packet *p;
+ struct rx_ts_info_t * rx_ts_info;
RX_TS_INFO_GET(rx_ts_info);
-#ifdef KERNEL
- if (rxi_OverQuota(class)) {
- rxi_NeedMorePackets = TRUE;
- MUTEX_ENTER(&rx_stats_mutex);
- switch (class) {
- case RX_PACKET_CLASS_RECEIVE:
- rx_stats.receivePktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SEND:
- rx_stats.sendPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SPECIAL:
- rx_stats.specialPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_RECV_CBUF:
- rx_stats.receiveCbufPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SEND_CBUF:
- rx_stats.sendCbufPktAllocFailures++;
- break;
- }
- MUTEX_EXIT(&rx_stats_mutex);
- return (struct rx_packet *)0;
- }
-#endif /* KERNEL */
-
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.packetRequests++;
- MUTEX_EXIT(&rx_stats_mutex);
-
- if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.packetRequests);
+ if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
#ifdef KERNEL
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
osi_Panic("rxi_AllocPacket error");
#else /* KERNEL */
- if (queue_IsEmpty(&rx_freePacketQueue))
- rxi_MorePacketsNoLock(rx_initSendWindow);
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
+ rxi_MorePacketsNoLock(rx_maxSendWindow);
#endif /* KERNEL */
RX_TS_FPQ_CHECKOUT(rx_ts_info,p);
- dpf(("Alloc %lx, class %d\n", (unsigned long)p, class));
+ dpf(("Alloc %"AFS_PTR_FMT", class %d\n", p, class));
/* have to do this here because rx_FlushWrite fiddles with the iovs in
- * order to truncate outbound packets. In the near future, may need
+ * order to truncate outbound packets. In the near future, may need
* to allocate bufs from a static pool here, and/or in AllocSendPacket
*/
RX_PACKET_IOV_FULLINIT(p);
return p;
}
#else /* RX_ENABLE_TSFPQ */
-struct rx_packet *
+static struct rx_packet *
rxi_AllocPacketNoLock(int class)
{
- register struct rx_packet *p;
+ struct rx_packet *p;
#ifdef KERNEL
if (rxi_OverQuota(class)) {
rxi_NeedMorePackets = TRUE;
- MUTEX_ENTER(&rx_stats_mutex);
- switch (class) {
- case RX_PACKET_CLASS_RECEIVE:
- rx_stats.receivePktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SEND:
- rx_stats.sendPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SPECIAL:
- rx_stats.specialPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_RECV_CBUF:
- rx_stats.receiveCbufPktAllocFailures++;
- break;
- case RX_PACKET_CLASS_SEND_CBUF:
- rx_stats.sendCbufPktAllocFailures++;
- break;
- }
- MUTEX_EXIT(&rx_stats_mutex);
+ if (rx_stats_active) {
+ switch (class) {
+ case RX_PACKET_CLASS_RECEIVE:
+ rx_atomic_inc(&rx_stats.receivePktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_SEND:
+ rx_atomic_inc(&rx_stats.sendPktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_SPECIAL:
+ rx_atomic_inc(&rx_stats.specialPktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_RECV_CBUF:
+ rx_atomic_inc(&rx_stats.receiveCbufPktAllocFailures);
+ break;
+ case RX_PACKET_CLASS_SEND_CBUF:
+ rx_atomic_inc(&rx_stats.sendCbufPktAllocFailures);
+ break;
+ }
+ }
return (struct rx_packet *)0;
}
#endif /* KERNEL */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.packetRequests++;
- MUTEX_EXIT(&rx_stats_mutex);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.packetRequests);
#ifdef KERNEL
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
osi_Panic("rxi_AllocPacket error");
#else /* KERNEL */
- if (queue_IsEmpty(&rx_freePacketQueue))
- rxi_MorePacketsNoLock(rx_initSendWindow);
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
+ rxi_MorePacketsNoLock(rx_maxSendWindow);
#endif /* KERNEL */
rx_nFreePackets--;
- p = queue_First(&rx_freePacketQueue, rx_packet);
- queue_Remove(p);
+ p = opr_queue_First(&rx_freePacketQueue, struct rx_packet, entry);
+ opr_queue_Remove(&p->entry);
RX_FPQ_MARK_USED(p);
- dpf(("Alloc %lx, class %d\n", (unsigned long)p, class));
+ dpf(("Alloc %"AFS_PTR_FMT", class %d\n", p, class));
/* have to do this here because rx_FlushWrite fiddles with the iovs in
- * order to truncate outbound packets. In the near future, may need
+ * order to truncate outbound packets. In the near future, may need
* to allocate bufs from a static pool here, and/or in AllocSendPacket
*/
RX_PACKET_IOV_FULLINIT(p);
#endif /* RX_ENABLE_TSFPQ */
#ifdef RX_ENABLE_TSFPQ
-struct rx_packet *
+static struct rx_packet *
rxi_AllocPacketTSFPQ(int class, int pull_global)
{
- register struct rx_packet *p;
- register struct rx_ts_info_t * rx_ts_info;
+ struct rx_packet *p;
+ struct rx_ts_info_t * rx_ts_info;
RX_TS_INFO_GET(rx_ts_info);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.packetRequests++;
- MUTEX_EXIT(&rx_stats_mutex);
-
- if (pull_global && queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.packetRequests);
+ if (pull_global && opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
MUTEX_ENTER(&rx_freePktQ_lock);
- if (queue_IsEmpty(&rx_freePacketQueue))
- rxi_MorePacketsNoLock(rx_initSendWindow);
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
+ rxi_MorePacketsNoLock(rx_maxSendWindow);
RX_TS_FPQ_GTOL(rx_ts_info);
MUTEX_EXIT(&rx_freePktQ_lock);
- } else if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ } else if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
return NULL;
}
RX_TS_FPQ_CHECKOUT(rx_ts_info,p);
- dpf(("Alloc %lx, class %d\n", (unsigned long)p, class));
+ dpf(("Alloc %"AFS_PTR_FMT", class %d\n", p, class));
/* have to do this here because rx_FlushWrite fiddles with the iovs in
- * order to truncate outbound packets. In the near future, may need
+ * order to truncate outbound packets. In the near future, may need
* to allocate bufs from a static pool here, and/or in AllocSendPacket
*/
RX_PACKET_IOV_FULLINIT(p);
struct rx_packet *
rxi_AllocPacket(int class)
{
- register struct rx_packet *p;
+ struct rx_packet *p;
p = rxi_AllocPacketTSFPQ(class, RX_TS_FPQ_PULL_GLOBAL);
return p;
struct rx_packet *
rxi_AllocPacket(int class)
{
- register struct rx_packet *p;
+ struct rx_packet *p;
MUTEX_ENTER(&rx_freePktQ_lock);
p = rxi_AllocPacketNoLock(class);
* Called with call locked.
*/
struct rx_packet *
-rxi_AllocSendPacket(register struct rx_call *call, int want)
+rxi_AllocSendPacket(struct rx_call *call, int want)
{
- register struct rx_packet *p = (struct rx_packet *)0;
- register int mud;
- register unsigned delta;
+ struct rx_packet *p = (struct rx_packet *)0;
+ int mud;
+ unsigned delta;
SPLVAR;
mud = call->MTU - RX_HEADER_SIZE;
(void)rxi_AllocDataBuf(p, (want - p->length),
RX_PACKET_CLASS_SEND_CBUF);
- if ((unsigned)p->length > mud)
+ if (p->length > mud)
p->length = mud;
if (delta >= p->length) {
(void)rxi_AllocDataBuf(p, (want - p->length),
RX_PACKET_CLASS_SEND_CBUF);
- if ((unsigned)p->length > mud)
+ if (p->length > mud)
p->length = mud;
if (delta >= p->length) {
}
#ifndef KERNEL
-#ifdef AFS_NT40_ENV
+#ifdef AFS_NT40_ENV
/* Windows does not use file descriptors. */
#define CountFDs(amax) 0
#else
/* count the number of used FDs */
static int
-CountFDs(register int amax)
+CountFDs(int amax)
{
struct stat tstat;
- register int i, code;
- register int count;
+ int i, code;
+ int count;
count = 0;
for (i = 0; i < amax; i++) {
* the data length of the packet is stored in the packet structure.
* The header is decoded. */
int
-rxi_ReadPacket(osi_socket socket, register struct rx_packet *p,
- struct sockaddr_storage *saddr, int *slen)
+rxi_ReadPacket(osi_socket socket, struct rx_packet *p, afs_uint32 * host,
+ u_short * port)
{
+ struct sockaddr_in from;
int nbytes;
afs_int32 rlen;
- register afs_int32 tlen, savelen;
+ afs_uint32 tlen, savelen;
struct msghdr msg;
rx_computelen(p, tlen);
rx_SetDataSize(p, tlen); /* this is the size of the user data area */
} else
tlen = rlen;
- /* Extend the last iovec for padding, it's just to make sure that the
+ /* Extend the last iovec for padding, it's just to make sure that the
* read doesn't return more data than we expect, and is done to get around
* our problems caused by the lack of a length field in the rx header.
* Use the extra buffer that follows the localdata in each packet
savelen = p->wirevec[p->niovecs - 1].iov_len;
p->wirevec[p->niovecs - 1].iov_len += RX_EXTRABUFFERSIZE;
- memset((char *)&msg, 0, sizeof(msg));
- msg.msg_name = (char *)saddr;
- msg.msg_namelen = *slen;
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = (char *)&from;
+ msg.msg_namelen = sizeof(struct sockaddr_in);
msg.msg_iov = p->wirevec;
msg.msg_iovlen = p->niovecs;
nbytes = rxi_Recvmsg(socket, &msg, 0);
- *slen = msg.msg_namelen;
/* restore the vec to its correct state */
p->wirevec[p->niovecs - 1].iov_len = savelen;
- p->length = (nbytes - RX_HEADER_SIZE);
- if ((nbytes > tlen) || (p->length & 0x8000)) { /* Bogus packet */
+ p->length = (u_short)(nbytes - RX_HEADER_SIZE);
+ if (nbytes < 0 || (nbytes > tlen) || (p->length & 0x8000)) { /* Bogus packet */
if (nbytes < 0 && errno == EWOULDBLOCK) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.noPacketOnRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.noPacketOnRead);
} else if (nbytes <= 0) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.bogusPacketOnRead++;
- switch (rx_ssfamily(saddr)) {
- case AF_INET:
- rx_stats.bogusHost = rx_ss2sin(saddr)->sin_addr.s_addr;
- break;
- default:
-#ifdef AF_INET6
- case AF_INET6:
-#endif /* AF_INET6 */
- rx_stats.bogusHost = 0xffffffff;
- break;
- }
- MUTEX_EXIT(&rx_stats_mutex);
- dpf(("B: bogus packet from [%x,%d] nb=%d",
- ntohl(rx_ss2v4addr(saddr)), ntohs(rx_ss2pn(saddr)), nbytes));
+ if (rx_stats_active) {
+ rx_atomic_inc(&rx_stats.bogusPacketOnRead);
+ rx_stats.bogusHost = from.sin_addr.s_addr;
+ }
+ dpf(("B: bogus packet from [%x,%d] nb=%d\n", ntohl(from.sin_addr.s_addr),
+ ntohs(from.sin_port), nbytes));
}
return 0;
- }
+ }
#ifdef RXDEBUG
else if ((rx_intentionallyDroppedOnReadPer100 > 0)
&& (random() % 100 < rx_intentionallyDroppedOnReadPer100)) {
rxi_DecodePacketHeader(p);
- dpf(("Dropped %d %s: %x.%u.%u.%u.%u.%u.%u flags %d len %d",
- p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(rx_ss2v4addr(saddr)), ntohs(rx_ss2pn(saddr)), p->header.serial,
- p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.flags,
+ *host = from.sin_addr.s_addr;
+ *port = from.sin_port;
+
+ dpf(("Dropped %d %s: %x.%u.%u.%u.%u.%u.%u flags %d len %d\n",
+ p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(*host), ntohs(*port), p->header.serial,
+ p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.flags,
p->length));
+#ifdef RX_TRIMDATABUFS
rxi_TrimDataBufs(p, 1);
+#endif
return 0;
- }
+ }
#endif
else {
/* Extract packet header. */
rxi_DecodePacketHeader(p);
- if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) {
- struct rx_peer *peer;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.packetsRead[p->header.type - 1]++;
- MUTEX_EXIT(&rx_stats_mutex);
- /*
- * Try to look up this peer structure. If it doesn't exist,
- * don't create a new one -
- * we don't keep count of the bytes sent/received if a peer
- * structure doesn't already exist.
- *
- * The peer/connection cleanup code assumes that there is 1 peer
- * per connection. If we actually created a peer structure here
- * and this packet was an rxdebug packet, the peer structure would
- * never be cleaned up.
- */
- peer = rxi_FindPeer(saddr, *slen, SOCK_DGRAM, 0, 0);
- /* Since this may not be associated with a connection,
- * it may have no refCount, meaning we could race with
- * ReapConnections
- */
- if (peer && (peer->refCount > 0)) {
- MUTEX_ENTER(&peer->peer_lock);
- hadd32(peer->bytesReceived, p->length);
- MUTEX_EXIT(&peer->peer_lock);
- }
+ *host = from.sin_addr.s_addr;
+ *port = from.sin_port;
+ if (rx_stats_active
+ && p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) {
+
+ rx_atomic_inc(&rx_stats.packetsRead[p->header.type - 1]);
}
+#ifdef RX_TRIMDATABUFS
/* Free any empty packet buffers at the end of this packet */
rxi_TrimDataBufs(p, 1);
-
+#endif
return 1;
}
}
* last two pad bytes. */
struct rx_packet *
-rxi_SplitJumboPacket(register struct rx_packet *p,
- struct sockaddr_storage *saddr, int slen, int first)
+rxi_SplitJumboPacket(struct rx_packet *p, afs_uint32 host, short port,
+ int first)
{
struct rx_packet *np;
struct rx_jumboHeader *jp;
np->header = p->header;
np->header.serial = p->header.serial + 1;
np->header.seq = p->header.seq + 1;
+ np->header.userStatus = 0;
np->header.flags = jp->flags;
np->header.spare = jp->cksum;
#ifndef KERNEL
/* Send a udp datagram */
int
-osi_NetSend(osi_socket socket, void *addr, int addrlen, struct iovec *dvec,
- int nvecs, int length, int istack)
+osi_NetSend(osi_socket socket, void *addr, struct iovec *dvec, int nvecs,
+ int length, int istack)
{
struct msghdr msg;
int ret;
msg.msg_iov = dvec;
msg.msg_iovlen = nvecs;
msg.msg_name = addr;
- msg.msg_namelen = addrlen;
+ msg.msg_namelen = sizeof(struct sockaddr_in);
ret = rxi_Sendmsg(socket, &msg, 0);
* The message is NOT changed.
*/
static int
-cpytoc(mblk_t * mp, register int off, register int len, register char *cp)
+cpytoc(mblk_t * mp, int off, int len, char *cp)
{
- register int n;
+ int n;
for (; mp && len > 0; mp = mp->b_cont) {
if (mp->b_datap->db_type != M_DATA) {
}
/* MTUXXX Supposed to skip <off> bytes and copy <len> bytes,
- * but it doesn't really.
- * This sucks, anyway, do it like m_cpy.... below
+ * but it doesn't really.
+ * This sucks, anyway, do it like m_cpy.... below
*/
static int
-cpytoiovec(mblk_t * mp, int off, int len, register struct iovec *iovs,
+cpytoiovec(mblk_t * mp, int off, int len, struct iovec *iovs,
int niovs)
{
- register int m, n, o, t, i;
+ int m, n, o, t, i;
for (i = -1, t = 0; i < niovs && mp && len > 0; mp = mp->b_cont) {
if (mp->b_datap->db_type != M_DATA) {
#endif /* AFS_SUN5_ENV */
#if !defined(AFS_LINUX20_ENV) && !defined(AFS_DARWIN80_ENV)
+#if defined(AFS_NBSD_ENV)
+int
+rx_mb_to_packet(struct mbuf *amb, void (*free) (struct mbuf *), int hdr_len, int data_len, struct rx_packet *phandle)
+#else
int
rx_mb_to_packet(amb, free, hdr_len, data_len, phandle)
#if defined(AFS_SUN5_ENV) || defined(AFS_HPUX110_ENV)
void (*free) ();
struct rx_packet *phandle;
int hdr_len, data_len;
+#endif /* AFS_NBSD_ENV */
{
- register int code;
+ int code;
code =
m_cpytoiovec(amb, hdr_len, data_len, phandle->wirevec,
/* send a response to a debug packet */
struct rx_packet *
-rxi_ReceiveDebugPacket(register struct rx_packet *ap, osi_socket asocket,
- struct sockaddr_storage *saddr, int slen, int istack)
+rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
+ afs_uint32 ahost, short aport, int istack)
{
struct rx_debugIn tin;
afs_int32 tl;
- struct rx_serverQueueEntry *np, *nqe;
/*
* Only respond to client-initiated Rx debug packets,
}
rx_packetread(ap, 0, sizeof(struct rx_debugIn), (char *)&tin);
- /* all done with packet, now set length to the truth, so we can
+ /* all done with packet, now set length to the truth, so we can
* reuse this packet */
rx_computelen(ap, ap->length);
struct rx_debugStats tstat;
/* get basic stats */
- memset((char *)&tstat, 0, sizeof(tstat)); /* make sure spares are zero */
+ memset(&tstat, 0, sizeof(tstat)); /* make sure spares are zero */
tstat.version = RX_DEBUGI_VERSION;
#ifndef RX_ENABLE_LOCKS
tstat.waitingForPackets = rx_waitingForPackets;
#endif
MUTEX_ENTER(&rx_serverPool_lock);
tstat.nFreePackets = htonl(rx_nFreePackets);
+ tstat.nPackets = htonl(rx_nPackets);
tstat.callsExecuted = htonl(rxi_nCalls);
tstat.packetReclaims = htonl(rx_packetReclaims);
tstat.usedFDs = CountFDs(64);
- tstat.nWaiting = htonl(rx_nWaiting);
- tstat.nWaited = htonl(rx_nWaited);
- queue_Count(&rx_idleServerQueue, np, nqe, rx_serverQueueEntry,
- tstat.idleThreads);
+ tstat.nWaiting = htonl(rx_atomic_read(&rx_nWaiting));
+ tstat.nWaited = htonl(rx_atomic_read(&rx_nWaited));
+ tstat.idleThreads = opr_queue_Count(&rx_idleServerQueue);
MUTEX_EXIT(&rx_serverPool_lock);
tstat.idleThreads = htonl(tstat.idleThreads);
tl = sizeof(struct rx_debugStats) - ap->length;
rx_packetwrite(ap, 0, sizeof(struct rx_debugStats),
(char *)&tstat);
ap->length = sizeof(struct rx_debugStats);
- rxi_SendDebugPacket(ap, asocket, saddr, slen, istack);
+ rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
rx_computelen(ap, ap->length);
}
break;
case RX_DEBUGI_GETALLCONN:
case RX_DEBUGI_GETCONN:{
- int i, j;
- register struct rx_connection *tc;
+ unsigned int i, j;
+ struct rx_connection *tc;
struct rx_call *tcall;
struct rx_debugConn tconn;
int all = (tin.type == RX_DEBUGI_GETALLCONN);
if (tl > 0)
return ap;
- memset((char *)&tconn, 0, sizeof(tconn)); /* make sure spares are zero */
+ memset(&tconn, 0, sizeof(tconn)); /* make sure spares are zero */
/* get N'th (maybe) "interesting" connection info */
for (i = 0; i < rx_hashTableSize; i++) {
#if !defined(KERNEL)
#endif
#endif
MUTEX_ENTER(&rx_connHashTable_lock);
- /* We might be slightly out of step since we are not
+ /* We might be slightly out of step since we are not
* locking each call, but this is only debugging output.
*/
for (tc = rx_connHashTable[i]; tc; tc = tc->next) {
if ((all || rxi_IsConnInteresting(tc))
&& tin.index-- <= 0) {
- switch (rx_ssfamily(&tc->peer->saddr)) {
- case AF_INET:
- tconn.host = rx_ss2sin(&tc->peer->saddr)->sin_addr.s_addr;
- break;
- default:
-#ifdef AF_INET6
- case AF_INET6:
-#endif /* AF_INET6 */
- tconn.host = 0xffffffff;
- break;
- }
- tconn.port = rx_ss2pn(&tc->peer->saddr);
+ tconn.host = tc->peer->host;
+ tconn.port = tc->peer->port;
tconn.cid = htonl(tc->cid);
tconn.epoch = htonl(tc->epoch);
tconn.serial = htonl(tc->serial);
tconn.callNumber[j] = htonl(tc->callNumber[j]);
if ((tcall = tc->call[j])) {
tconn.callState[j] = tcall->state;
- tconn.callMode[j] = tcall->mode;
+ tconn.callMode[j] = tcall->app.mode;
tconn.callFlags[j] = tcall->flags;
- if (queue_IsNotEmpty(&tcall->rq))
+ if (!opr_queue_IsEmpty(&tcall->rq))
tconn.callOther[j] |= RX_OTHER_IN;
- if (queue_IsNotEmpty(&tcall->tq))
+ if (!opr_queue_IsEmpty(&tcall->tq))
tconn.callOther[j] |= RX_OTHER_OUT;
} else
tconn.callState[j] = RX_STATE_NOTINIT;
(char *)&tconn);
tl = ap->length;
ap->length = sizeof(struct rx_debugConn);
- rxi_SendDebugPacket(ap, asocket, saddr, slen,
+ rxi_SendDebugPacket(ap, asocket, ahost, aport,
istack);
ap->length = tl;
return ap;
(char *)&tconn);
tl = ap->length;
ap->length = sizeof(struct rx_debugConn);
- rxi_SendDebugPacket(ap, asocket, saddr, slen, istack);
+ rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
ap->length = tl;
break;
}
*/
case RX_DEBUGI_GETPEER:{
- int i;
- register struct rx_peer *tp;
+ unsigned int i;
+ struct rx_peer *tp;
struct rx_debugPeer tpeer;
if (tl > 0)
return ap;
- memset((char *)&tpeer, 0, sizeof(tpeer));
+ memset(&tpeer, 0, sizeof(tpeer));
for (i = 0; i < rx_hashTableSize; i++) {
#if !defined(KERNEL)
/* the time complexity of the algorithm used here
MUTEX_ENTER(&rx_peerHashTable_lock);
for (tp = rx_peerHashTable[i]; tp; tp = tp->next) {
if (tin.index-- <= 0) {
- switch (rx_ssfamily(&tp->saddr)) {
- case AF_INET:
- tpeer.host = rx_ss2sin(&tp->saddr)->sin_addr.s_addr;
- break;
- default:
-#ifdef AF_INET6
- case AF_INET6:
-#endif /* AF_INET6 */
- tpeer.host = 0xffffffff;
- break;
- }
- tpeer.port = rx_ss2pn(&tp->saddr);
+ tp->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ MUTEX_ENTER(&tp->peer_lock);
+ tpeer.host = tp->host;
+ tpeer.port = tp->port;
tpeer.ifMTU = htons(tp->ifMTU);
tpeer.idleWhen = htonl(tp->idleWhen);
tpeer.refCount = htons(tp->refCount);
- tpeer.burstSize = tp->burstSize;
- tpeer.burst = tp->burst;
- tpeer.burstWait.sec = htonl(tp->burstWait.sec);
- tpeer.burstWait.usec = htonl(tp->burstWait.usec);
+ tpeer.burstSize = 0;
+ tpeer.burst = 0;
+ tpeer.burstWait.sec = 0;
+ tpeer.burstWait.usec = 0;
tpeer.rtt = htonl(tp->rtt);
tpeer.rtt_dev = htonl(tp->rtt_dev);
- tpeer.timeout.sec = htonl(tp->timeout.sec);
- tpeer.timeout.usec = htonl(tp->timeout.usec);
tpeer.nSent = htonl(tp->nSent);
tpeer.reSends = htonl(tp->reSends);
- tpeer.inPacketSkew = htonl(tp->inPacketSkew);
- tpeer.outPacketSkew = htonl(tp->outPacketSkew);
- tpeer.rateFlag = htonl(tp->rateFlag);
tpeer.natMTU = htons(tp->natMTU);
tpeer.maxMTU = htons(tp->maxMTU);
tpeer.maxDgramPackets = htons(tp->maxDgramPackets);
tpeer.cwind = htons(tp->cwind);
tpeer.nDgramPackets = htons(tp->nDgramPackets);
tpeer.congestSeq = htons(tp->congestSeq);
- tpeer.bytesSent.high = htonl(tp->bytesSent.high);
- tpeer.bytesSent.low = htonl(tp->bytesSent.low);
+ tpeer.bytesSent.high =
+ htonl(tp->bytesSent >> 32);
+ tpeer.bytesSent.low =
+ htonl(tp->bytesSent & MAX_AFS_UINT32);
tpeer.bytesReceived.high =
- htonl(tp->bytesReceived.high);
+ htonl(tp->bytesReceived >> 32);
tpeer.bytesReceived.low =
- htonl(tp->bytesReceived.low);
+ htonl(tp->bytesReceived & MAX_AFS_UINT32);
+ MUTEX_EXIT(&tp->peer_lock);
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ tp->refCount--;
MUTEX_EXIT(&rx_peerHashTable_lock);
+
rx_packetwrite(ap, 0, sizeof(struct rx_debugPeer),
(char *)&tpeer);
tl = ap->length;
ap->length = sizeof(struct rx_debugPeer);
- rxi_SendDebugPacket(ap, asocket, saddr, slen,
+ rxi_SendDebugPacket(ap, asocket, ahost, aport,
istack);
ap->length = tl;
return ap;
(char *)&tpeer);
tl = ap->length;
ap->length = sizeof(struct rx_debugPeer);
- rxi_SendDebugPacket(ap, asocket, saddr, slen, istack);
+ rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
ap->length = tl;
break;
}
return ap;
/* Since its all int32s convert to network order with a loop. */
- MUTEX_ENTER(&rx_stats_mutex);
+ if (rx_stats_active)
+ MUTEX_ENTER(&rx_stats_mutex);
s = (afs_int32 *) & rx_stats;
for (i = 0; i < sizeof(rx_stats) / sizeof(afs_int32); i++, s++)
rx_PutInt32(ap, i * sizeof(afs_int32), htonl(*s));
tl = ap->length;
ap->length = sizeof(rx_stats);
- MUTEX_EXIT(&rx_stats_mutex);
- rxi_SendDebugPacket(ap, asocket, saddr, slen, istack);
+ if (rx_stats_active)
+ MUTEX_EXIT(&rx_stats_mutex);
+ rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
ap->length = tl;
break;
}
rx_packetwrite(ap, 0, sizeof(struct rx_debugIn), (char *)&tin);
tl = ap->length;
ap->length = sizeof(struct rx_debugIn);
- rxi_SendDebugPacket(ap, asocket, saddr, slen, istack);
+ rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
ap->length = tl;
break;
}
}
struct rx_packet *
-rxi_ReceiveVersionPacket(register struct rx_packet *ap, osi_socket asocket,
- struct sockaddr_storage *saddr, int slen, int istack)
+rxi_ReceiveVersionPacket(struct rx_packet *ap, osi_socket asocket,
+ afs_uint32 ahost, short aport, int istack)
{
afs_int32 tl;
rx_packetwrite(ap, 0, 65, buf);
tl = ap->length;
ap->length = 65;
- rxi_SendDebugPacket(ap, asocket, saddr, slen, istack);
+ rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
ap->length = tl;
}
/* send a debug packet back to the sender */
static void
rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
- struct sockaddr_storage *saddr, int slen, afs_int32 istack)
+ afs_uint32 ahost, short aport, afs_int32 istack)
{
- int i;
- int nbytes;
+ struct sockaddr_in taddr;
+ unsigned int i, nbytes, savelen = 0;
int saven = 0;
- size_t savelen = 0;
#ifdef KERNEL
int waslocked = ISAFS_GLOCK();
#endif
+ taddr.sin_family = AF_INET;
+ taddr.sin_port = aport;
+ taddr.sin_addr.s_addr = ahost;
+ memset(&taddr.sin_zero, 0, sizeof(taddr.sin_zero));
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+ taddr.sin_len = sizeof(struct sockaddr_in);
+#endif
+
/* We need to trim the niovecs. */
nbytes = apacket->length;
for (i = 1; i < apacket->niovecs; i++) {
afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
"before osi_NetSend()");
AFS_GUNLOCK();
- } else
+ }
#else
if (waslocked)
AFS_GUNLOCK();
#endif
#endif
/* debug packets are not reliably delivered, hence the cast below. */
- (void)osi_NetSend(asocket, saddr, slen, apacket->wirevec, apacket->niovecs,
+ (void)osi_NetSend(asocket, &taddr, apacket->wirevec, apacket->niovecs,
apacket->length + RX_HEADER_SIZE, istack);
#ifdef KERNEL
#ifdef RX_KERNEL_TRACE
"after osi_NetSend()");
if (!waslocked)
AFS_GUNLOCK();
- } else
+ }
#else
if (waslocked)
AFS_GLOCK();
}
+static void
+rxi_NetSendError(struct rx_call *call, int code)
+{
+ int down = 0;
+#ifdef AFS_NT40_ENV
+ if (code == -1 && WSAGetLastError() == WSAEHOSTUNREACH) {
+ down = 1;
+ }
+ if (code == -WSAEHOSTUNREACH) {
+ down = 1;
+ }
+#elif defined(AFS_LINUX20_ENV)
+ if (code == -ENETUNREACH) {
+ down = 1;
+ }
+#elif defined(AFS_DARWIN_ENV)
+ if (code == EHOSTUNREACH) {
+ down = 1;
+ }
+#endif
+ if (down) {
+ call->lastReceiveTime = 0;
+ }
+}
+
/* Send the packet to appropriate destination for the specified
* call. The header is first encoded and placed in the packet.
*/
int waslocked;
#endif
int code;
- register struct rx_peer *peer = conn->peer;
+ struct sockaddr_in addr;
+ struct rx_peer *peer = conn->peer;
osi_socket socket;
#ifdef RXDEBUG
char deliveryType = 'S';
#endif
+ /* The address we're sending the packet to */
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_port = peer->port;
+ addr.sin_addr.s_addr = peer->host;
+ memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
+
/* This stuff should be revamped, I think, so that most, if not
* all, of the header stuff is always added here. We could
* probably do away with the encode/decode routines. XXXXX */
* serial number means the packet was never sent. */
MUTEX_ENTER(&conn->conn_data_lock);
p->header.serial = ++conn->serial;
+ if (p->length > conn->peer->maxPacketSize) {
+ if ((p->header.type == RX_PACKET_TYPE_ACK) &&
+ (p->header.flags & RX_REQUEST_ACK)) {
+ conn->lastPingSize = p->length;
+ conn->lastPingSizeSer = p->header.serial;
+ } else if (p->header.seq != 0) {
+ conn->lastPacketSize = p->length;
+ conn->lastPacketSizeSeq = p->header.seq;
+ }
+ }
MUTEX_EXIT(&conn->conn_data_lock);
- /* This is so we can adjust retransmit time-outs better in the face of
+ /* This is so we can adjust retransmit time-outs better in the face of
* rapidly changing round-trip times. RTO estimation is not a la Karn.
*/
if (p->firstSerial == 0) {
/* If an output tracer function is defined, call it with the packet and
* network address. Note this function may modify its arguments. */
if (rx_almostSent) {
- int drop = (*rx_almostSent) (p, &peer->saddr);
+ int drop = (*rx_almostSent) (p, &addr);
/* drop packet if return value is non-zero? */
if (drop)
deliveryType = 'D'; /* Drop the packet */
#endif
/* Get network byte order header */
- rxi_EncodePacketHeader(p); /* XXX in the event of rexmit, etc, don't need to
+ rxi_EncodePacketHeader(p); /* XXX in the event of rexmit, etc, don't need to
* touch ALL the fields */
/* Send the packet out on the same socket that related packets are being
afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
"before osi_NetSend()");
AFS_GUNLOCK();
- } else
+ }
#else
if (waslocked)
AFS_GUNLOCK();
#endif
#endif
if ((code =
- osi_NetSend(socket, &peer->saddr, peer->saddrlen, p->wirevec,
- p->niovecs, p->length + RX_HEADER_SIZE,
- istack)) != 0) {
+ osi_NetSend(socket, &addr, p->wirevec, p->niovecs,
+ p->length + RX_HEADER_SIZE, istack)) != 0) {
/* send failed, so let's hurry up the resend, eh? */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.netSendFailures++;
- MUTEX_EXIT(&rx_stats_mutex);
- p->retryTime = p->timeSent; /* resend it very soon */
- clock_Addmsec(&(p->retryTime),
- 10 + (((afs_uint32) p->backoff) << 8));
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.netSendFailures);
+ p->flags &= ~RX_PKTFLAG_SENT; /* resend it very soon */
-#ifdef AFS_NT40_ENV
- /* Windows is nice -- it can tell us right away that we cannot
- * reach this recipient by returning an WSAEHOSTUNREACH error
- * code. So, when this happens let's "down" the host NOW so
- * we don't sit around waiting for this host to timeout later.
- */
- if (call && code == -1 && errno == WSAEHOSTUNREACH)
- call->lastReceiveTime = 0;
-#endif
-#if defined(KERNEL) && defined(AFS_LINUX20_ENV)
- /* Linux is nice -- it can tell us right away that we cannot
- * reach this recipient by returning an ENETUNREACH error
- * code. So, when this happens let's "down" the host NOW so
+ /* Some systems are nice and tell us right away that we cannot
+ * reach this recipient by returning an error code.
+ * So, when this happens let's "down" the host NOW so
* we don't sit around waiting for this host to timeout later.
*/
- if (call && code == -ENETUNREACH)
- call->lastReceiveTime = 0;
-#endif
+ if (call) {
+ rxi_NetSendError(call, code);
+ }
}
#ifdef KERNEL
#ifdef RX_KERNEL_TRACE
"after osi_NetSend()");
if (!waslocked)
AFS_GUNLOCK();
- } else
+ }
#else
if (waslocked)
AFS_GLOCK();
#endif
#ifdef RXDEBUG
}
- dpf(("%c %d %s: %s.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], rx_AddrStringOf(peer), ntohs(rx_PortOf(peer)), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.flags, (unsigned long)p, p->retryTime.sec, p->retryTime.usec / 1000, p->length));
+ dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
+ deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(peer->host),
+ ntohs(peer->port), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber,
+ p->header.seq, p->header.flags, p, p->length));
#endif
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.packetsSent[p->header.type - 1]++;
- MUTEX_EXIT(&rx_stats_mutex);
- MUTEX_ENTER(&peer->peer_lock);
- hadd32(peer->bytesSent, p->length);
- MUTEX_EXIT(&peer->peer_lock);
+ if (rx_stats_active) {
+ rx_atomic_inc(&rx_stats.packetsSent[p->header.type - 1]);
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->bytesSent += p->length;
+ MUTEX_EXIT(&peer->peer_lock);
+ }
}
/* Send a list of packets to appropriate destination for the specified
#if defined(AFS_SUN5_ENV) && defined(KERNEL)
int waslocked;
#endif
- register struct rx_peer *peer = conn->peer;
+ struct sockaddr_in addr;
+ struct rx_peer *peer = conn->peer;
osi_socket socket;
struct rx_packet *p = NULL;
struct iovec wirevec[RX_MAXIOVECS];
#ifdef RXDEBUG
char deliveryType = 'S';
#endif
+ /* The address we're sending the packet to */
+ addr.sin_family = AF_INET;
+ addr.sin_port = peer->port;
+ addr.sin_addr.s_addr = peer->host;
+ memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
if (len + 1 > RX_MAXIOVECS) {
osi_Panic("rxi_SendPacketList, len > RX_MAXIOVECS\n");
MUTEX_ENTER(&conn->conn_data_lock);
serial = conn->serial;
conn->serial += len;
+ for (i = 0; i < len; i++) {
+ p = list[i];
+ /* a ping *or* a sequenced packet can count */
+ if (p->length > conn->peer->maxPacketSize) {
+ if (((p->header.type == RX_PACKET_TYPE_ACK) &&
+ (p->header.flags & RX_REQUEST_ACK)) &&
+ ((i == 0) || (p->length >= conn->lastPingSize))) {
+ conn->lastPingSize = p->length;
+ conn->lastPingSizeSer = serial + i;
+ } else if ((p->header.seq != 0) &&
+ ((i == 0) || (p->length >= conn->lastPacketSize))) {
+ conn->lastPacketSize = p->length;
+ conn->lastPacketSizeSeq = p->header.seq;
+ }
+ }
+ }
MUTEX_EXIT(&conn->conn_data_lock);
/* Pre-increment, to guarantee no zero serial number; a zero
* serial number means the packet was never sent. */
p->header.serial = ++serial;
- /* This is so we can adjust retransmit time-outs better in the face of
+ /* This is so we can adjust retransmit time-outs better in the face of
* rapidly changing round-trip times. RTO estimation is not a la Karn.
*/
if (p->firstSerial == 0) {
/* If an output tracer function is defined, call it with the packet and
* network address. Note this function may modify its arguments. */
if (rx_almostSent) {
- int drop = (*rx_almostSent) (p, &peer->saddr);
+ int drop = (*rx_almostSent) (p, &addr);
/* drop packet if return value is non-zero? */
if (drop)
deliveryType = 'D'; /* Drop the packet */
#endif
/* Get network byte order header */
- rxi_EncodePacketHeader(p); /* XXX in the event of rexmit, etc, don't need to
+ rxi_EncodePacketHeader(p); /* XXX in the event of rexmit, etc, don't need to
* touch ALL the fields */
}
AFS_GUNLOCK();
#endif
if ((code =
- osi_NetSend(socket, &peer->saddr, peer->saddrlen, &wirevec[0],
- len + 1, length, istack)) != 0) {
+ osi_NetSend(socket, &addr, &wirevec[0], len + 1, length,
+ istack)) != 0) {
/* send failed, so let's hurry up the resend, eh? */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.netSendFailures++;
- MUTEX_EXIT(&rx_stats_mutex);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.netSendFailures);
for (i = 0; i < len; i++) {
p = list[i];
- p->retryTime = p->timeSent; /* resend it very soon */
- clock_Addmsec(&(p->retryTime),
- 10 + (((afs_uint32) p->backoff) << 8));
+ p->flags &= ~RX_PKTFLAG_SENT; /* resend it very soon */
}
-#ifdef AFS_NT40_ENV
- /* Windows is nice -- it can tell us right away that we cannot
- * reach this recipient by returning an WSAEHOSTUNREACH error
- * code. So, when this happens let's "down" the host NOW so
- * we don't sit around waiting for this host to timeout later.
- */
- if (call && code == -1 && errno == WSAEHOSTUNREACH)
- call->lastReceiveTime = 0;
-#endif
-#if defined(KERNEL) && defined(AFS_LINUX20_ENV)
- /* Linux is nice -- it can tell us right away that we cannot
- * reach this recipient by returning an ENETUNREACH error
- * code. So, when this happens let's "down" the host NOW so
+ /* Some systems are nice and tell us right away that we cannot
+ * reach this recipient by returning an error code.
+ * So, when this happens let's "down" the host NOW so
* we don't sit around waiting for this host to timeout later.
*/
- if (call && code == -ENETUNREACH)
- call->lastReceiveTime = 0;
-#endif
+ if (call) {
+ rxi_NetSendError(call, code);
+ }
}
#if defined(AFS_SUN5_ENV) && defined(KERNEL)
if (!istack && waslocked)
#ifdef RXDEBUG
}
- assert(p != NULL);
+ osi_Assert(p != NULL);
- dpf(("%c %d %s: %s.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], rx_AddrStringOf(peer), ntohs(rx_PortOf(peer)), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.flags, (unsigned long)p, p->retryTime.sec, p->retryTime.usec / 1000, p->length));
+ dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
+ deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(peer->host),
+ ntohs(peer->port), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber,
+ p->header.seq, p->header.flags, p, p->length));
#endif
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.packetsSent[p->header.type - 1]++;
- MUTEX_EXIT(&rx_stats_mutex);
- MUTEX_ENTER(&peer->peer_lock);
-
- hadd32(peer->bytesSent, p->length);
- MUTEX_EXIT(&peer->peer_lock);
+ if (rx_stats_active) {
+ rx_atomic_inc(&rx_stats.packetsSent[p->header.type - 1]);
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->bytesSent += p->length;
+ MUTEX_EXIT(&peer->peer_lock);
+ }
}
+/* Send a raw abort packet, without any call or connection structures */
+void
+rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
+ afs_uint32 serial, afs_int32 error,
+ struct rx_packet *source, int istack)
+{
+ struct rx_header theader;
+ struct sockaddr_in addr;
+ struct iovec iov[2];
+
+ memset(&theader, 0, sizeof(theader));
+ theader.epoch = htonl(source->header.epoch);
+ theader.callNumber = htonl(source->header.callNumber);
+ theader.serial = htonl(serial);
+ theader.type = RX_PACKET_TYPE_ABORT;
+ theader.serviceId = htons(source->header.serviceId);
+ theader.securityIndex = source->header.securityIndex;
+ theader.cid = htonl(source->header.cid);
+
+ /*
+ * If the abort is being sent in response to a server initiated packet,
+ * set client_initiated in the abort to ensure it is not associated by
+ * the receiver with a connection in the opposite direction.
+ */
+ if ((source->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
+ theader.flags |= RX_CLIENT_INITIATED;
+
+ error = htonl(error);
+
+ iov[0].iov_base = &theader;
+ iov[0].iov_len = sizeof(struct rx_header);
+ iov[1].iov_base = &error;
+ iov[1].iov_len = sizeof(error);
+
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = host;
+ addr.sin_port = port;
+ memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+ addr.sin_len = sizeof(struct sockaddr_in);
+#endif
+
+ osi_NetSend(socket, &addr, iov, 2,
+ sizeof(struct rx_header) + sizeof(error), istack);
+}
/* Send a "special" packet to the peer connection. If call is
* specified, then the packet is directed to a specific call channel
* in rx.h. Bug: there's a lot of duplication between this and other
* routines. This needs to be cleaned up. */
struct rx_packet *
-rxi_SendSpecial(register struct rx_call *call,
- register struct rx_connection *conn,
+rxi_SendSpecial(struct rx_call *call,
+ struct rx_connection *conn,
struct rx_packet *optionalPacket, int type, char *data,
int nbytes, int istack)
{
/* Some of the following stuff should be common code for all
* packet sends (it's repeated elsewhere) */
- register struct rx_packet *p;
+ struct rx_packet *p;
unsigned int i = 0;
int savelen = 0, saven = 0;
int channel, callNumber;
p->header.seq = 0;
p->header.epoch = conn->epoch;
p->header.type = type;
+ p->header.userStatus = 0;
p->header.flags = 0;
if (conn->type == RX_CLIENT_CONNECTION)
p->header.flags |= RX_CLIENT_INITIATED;
* the net byte order representation in the wire representation of the
* packet, which is what is actually sent out on the wire) */
void
-rxi_EncodePacketHeader(register struct rx_packet *p)
+rxi_EncodePacketHeader(struct rx_packet *p)
{
- register afs_uint32 *buf = (afs_uint32 *) (p->wirevec[0].iov_base); /* MTUXXX */
+ afs_uint32 *buf = (afs_uint32 *) (p->wirevec[0].iov_base); /* MTUXXX */
- memset((char *)buf, 0, RX_HEADER_SIZE);
+ memset(buf, 0, RX_HEADER_SIZE);
*buf++ = htonl(p->header.epoch);
*buf++ = htonl(p->header.cid);
*buf++ = htonl(p->header.callNumber);
/* Decode the packet's header (from net byte order to a struct header) */
void
-rxi_DecodePacketHeader(register struct rx_packet *p)
+rxi_DecodePacketHeader(struct rx_packet *p)
{
- register afs_uint32 *buf = (afs_uint32 *) (p->wirevec[0].iov_base); /* MTUXXX */
+ afs_uint32 *buf = (afs_uint32 *) (p->wirevec[0].iov_base); /* MTUXXX */
afs_uint32 temp;
p->header.epoch = ntohl(*buf);
/* Note: top 16 bits of this last word are the security checksum */
}
+/*
+ * LOCKS HELD: called with call->lock held.
+ *
+ * PrepareSendPacket is the only place in the code that
+ * can increment call->tnext. This could become an atomic
+ * in the future. Beyond that there is nothing in this
+ * function that requires the call being locked. This
+ * function can only be called by the application thread.
+ */
void
-rxi_PrepareSendPacket(register struct rx_call *call,
- register struct rx_packet *p, register int last)
+rxi_PrepareSendPacket(struct rx_call *call,
+ struct rx_packet *p, int last)
{
- register struct rx_connection *conn = call->conn;
- int i, j;
- ssize_t len; /* len must be a signed type; it can go negative */
-
- p->flags &= ~RX_PKTFLAG_ACKED;
- p->header.cid = (conn->cid | call->channel);
- p->header.serviceId = conn->serviceId;
- p->header.securityIndex = conn->securityIndex;
+ struct rx_connection *conn = call->conn;
+ afs_uint32 seq = call->tnext++;
+ unsigned int i;
+ afs_int32 len; /* len must be a signed type; it can go negative */
+ int code;
/* No data packets on call 0. Where do these come from? */
if (*call->callNumber == 0)
*call->callNumber = 1;
+ MUTEX_EXIT(&call->lock);
+ p->flags &= ~(RX_PKTFLAG_ACKED | RX_PKTFLAG_SENT);
+
+ p->header.cid = (conn->cid | call->channel);
+ p->header.serviceId = conn->serviceId;
+ p->header.securityIndex = conn->securityIndex;
+
p->header.callNumber = *call->callNumber;
- p->header.seq = call->tnext++;
+ p->header.seq = seq;
p->header.epoch = conn->epoch;
p->header.type = RX_PACKET_TYPE_DATA;
+ p->header.userStatus = 0;
p->header.flags = 0;
p->header.spare = 0;
if (conn->type == RX_CLIENT_CONNECTION)
if (last)
p->header.flags |= RX_LAST_PACKET;
- clock_Zero(&p->retryTime); /* Never yet transmitted */
clock_Zero(&p->firstSent); /* Never yet transmitted */
p->header.serial = 0; /* Another way of saying never transmitted... */
- p->backoff = 0;
/* Now that we're sure this is the last data on the call, make sure
* that the "length" and the sum of the iov_lens matches. */
}
if (len > 0) {
osi_Panic("PrepareSendPacket 1\n"); /* MTUXXX */
- } else {
- struct rx_queue q;
- int nb;
-
- queue_Init(&q);
-
+ } else if (i < p->niovecs) {
/* Free any extra elements in the wirevec */
- for (j = MAX(2, i), nb = p->niovecs - j; j < p->niovecs; j++) {
- queue_Append(&q,RX_CBUF_TO_PACKET(p->wirevec[j].iov_base, p));
- }
- if (nb)
- rxi_FreePackets(nb, &q);
+#if defined(RX_ENABLE_TSFPQ)
+ rxi_FreeDataBufsTSFPQ(p, i, 1 /* allow global pool flush if overquota */);
+#else /* !RX_ENABLE_TSFPQ */
+ MUTEX_ENTER(&rx_freePktQ_lock);
+ rxi_FreeDataBufsNoLock(p, i);
+ MUTEX_EXIT(&rx_freePktQ_lock);
+#endif /* !RX_ENABLE_TSFPQ */
- p->niovecs = i;
- p->wirevec[i - 1].iov_len += len;
+ p->niovecs = i;
+ }
+ if (len)
+ p->wirevec[i - 1].iov_len += len;
+ MUTEX_ENTER(&call->lock);
+ code = RXS_PreparePacket(conn->securityObject, call, p);
+ if (code) {
+ MUTEX_EXIT(&call->lock);
+ rxi_ConnectionError(conn, code);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ p = rxi_SendConnectionAbort(conn, p, 0, 0);
+ MUTEX_EXIT(&conn->conn_data_lock);
+ MUTEX_ENTER(&call->lock);
+ /* setting a connection error means all calls for that conn are also
+ * error'd. if this call does not have an error by now, something is
+ * very wrong, and we risk sending data in the clear that is supposed
+ * to be encrypted. */
+ osi_Assert(call->error);
}
- RXS_PreparePacket(conn->securityObject, call, p);
}
/* Given an interface MTU size, calculate an adjusted MTU size that
int adjMTU;
int frags;
+ if (rxi_nRecvFrags == 1 && rxi_nSendFrags == 1)
+ return mtu;
adjMTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE + RX_JUMBOHEADERSIZE;
if (mtu <= adjMTU) {
return mtu;
}
return (2 + (maxMTU / (RX_JUMBOBUFFERSIZE + RX_JUMBOHEADERSIZE)));
}
+
+#ifndef KERNEL
+/*
+ * This function can be used by the Windows Cache Manager
+ * to dump the list of all rx packets so that we can determine
+ * where the packet leakage is.
+ */
+int rx_DumpPackets(FILE *outputFile, char *cookie)
+{
+#ifdef RXDEBUG_PACKET
+ struct rx_packet *p;
+#ifdef AFS_NT40_ENV
+ int zilch;
+ char output[2048];
+#define RXDPRINTF sprintf
+#define RXDPRINTOUT output
+#else
+#define RXDPRINTF fprintf
+#define RXDPRINTOUT outputFile
+#endif
+
+ NETPRI;
+ MUTEX_ENTER(&rx_freePktQ_lock);
+ RXDPRINTF(RXDPRINTOUT, "%s - Start dumping all Rx Packets - count=%u\r\n", cookie, rx_packet_id);
+#ifdef AFS_NT40_ENV
+ WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
+#endif
+
+ for (p = rx_mallocedP; p; p = p->allNextp) {
+ RXDPRINTF(RXDPRINTOUT, "%s - packet=0x%p, id=%u, firstSent=%u.%08u, timeSent=%u.%08u, firstSerial=%u, niovecs=%u, flags=0x%x, length=%u header: epoch=%u, cid=%u, callNum=%u, seq=%u, serial=%u, type=%u, flags=0x%x, userStatus=%u, securityIndex=%u, serviceId=%u\r\n",
+ cookie, p, p->packetId, p->firstSent.sec, p->firstSent.usec, p->timeSent.sec, p->timeSent.usec,
+ p->firstSerial, p->niovecs, (afs_uint32)p->flags, (afs_uint32)p->length,
+ p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.serial,
+ (afs_uint32)p->header.type, (afs_uint32)p->header.flags, (afs_uint32)p->header.userStatus,
+ (afs_uint32)p->header.securityIndex, (afs_uint32)p->header.serviceId);
+#ifdef AFS_NT40_ENV
+ WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
+#endif
+ }
+
+ RXDPRINTF(RXDPRINTOUT, "%s - End dumping all Rx Packets\r\n", cookie);
+#ifdef AFS_NT40_ENV
+ WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);
+#endif
+
+ MUTEX_EXIT(&rx_freePktQ_lock);
+ USERPRI;
+#endif /* RXDEBUG_PACKET */
+ return 0;
+}
+#endif