From c10f5296d26eac9ac00018199ef579e8f6095c07 Mon Sep 17 00:00:00 2001 From: Derrick Brashear Date: Wed, 16 Mar 2011 01:06:43 -0400 Subject: [PATCH] macos: kernel socket upcall support kernel socket upcall instead of listener env Change-Id: I1b66ce0877053700cd894f47017596fbe07e7384 Reviewed-on: http://gerrit.openafs.org/4239 Tested-by: Derrick Brashear Tested-by: BuildBot Reviewed-by: Derrick Brashear --- src/afs/afs_call.c | 28 +++++++- src/afsd/afsd.c | 11 +-- src/config/param.ppc_darwin_80.h | 1 + src/config/param.ppc_darwin_90.h | 1 + src/config/param.x86_darwin_100.h | 3 +- src/config/param.x86_darwin_80.h | 1 + src/config/param.x86_darwin_90.h | 1 + src/rx/DARWIN/rx_knet.c | 138 +++++++++++++++++++++++++++++++++++++- src/rx/rx.c | 2 + src/rx/rx_kcommon.c | 20 +++--- src/rx/rx_kernel.h | 2 +- src/rx/rx_prototypes.h | 8 ++- 12 files changed, 195 insertions(+), 21 deletions(-) diff --git a/src/afs/afs_call.c b/src/afs/afs_call.c index 01704ad..49f253b 100644 --- a/src/afs/afs_call.c +++ b/src/afs/afs_call.c @@ -144,8 +144,13 @@ afsd_thread(int *rock) AFS_GLOCK(); wakeup(arg); afs_CB_Running = 1; +#ifndef RXK_LISTENER_ENV + afs_initState = AFSOP_START_AFS; + afs_osi_Wakeup(&afs_initState); +#else while (afs_RX_Running != 2) afs_osi_Sleep(&afs_RX_Running); +#endif afs_RXCallBackServer(); AFS_GUNLOCK(); thread_terminate(current_thread()); @@ -191,6 +196,7 @@ afsd_thread(int *rock) AFS_GUNLOCK(); thread_terminate(current_thread()); break; +#ifdef RXK_LISTENER_ENV case AFSOP_RXLISTENER_DAEMON: AFS_GLOCK(); wakeup(arg); @@ -203,6 +209,7 @@ afsd_thread(int *rock) AFS_GUNLOCK(); thread_terminate(current_thread()); break; +#endif default: afs_warn("Unknown op %ld in StartDaemon()\n", (long)parm); break; @@ -219,10 +226,12 @@ afs_DaemonOp(long parm, long parm2, long parm3, long parm4, long parm5, if (parm == AFSOP_START_RXCALLBACK) { if (afs_CB_Running) return; +#ifdef RXK_LISTENER_ENV } else if (parm == AFSOP_RXLISTENER_DAEMON) { if (afs_RX_Running) return; afs_RX_Running = 1; +#endif code = afs_InitSetup(parm2); if (parm3) { rx_enablePeerRPCStats(); @@ -282,8 +291,13 @@ afsd_thread(void *rock) AFS_GLOCK(); complete(arg->complete); afs_CB_Running = 1; +#if !defined(RXK_LISTENER_ENV) + afs_initState = AFSOP_START_AFS; + afs_osi_Wakeup(&afs_initState); +#else while (afs_RX_Running != 2) afs_osi_Sleep(&afs_RX_Running); +#endif sprintf(current->comm, "afs_callback"); afs_RXCallBackServer(); AFS_GUNLOCK(); @@ -355,6 +369,7 @@ afsd_thread(void *rock) AFS_GUNLOCK(); complete_and_exit(0, 0); break; +#ifdef RXK_LISTENER_ENV case AFSOP_RXLISTENER_DAEMON: sprintf(current->comm, "afs_lsnstart"); #ifdef SYS_SETPRIORITY_EXPORTED @@ -376,6 +391,7 @@ afsd_thread(void *rock) AFS_GUNLOCK(); complete_and_exit(0, 0); break; +#endif default: afs_warn("Unknown op %ld in StartDaemon()\n", (long)parm); break; @@ -415,10 +431,12 @@ afs_DaemonOp(long parm, long parm2, long parm3, long parm4, long parm5, if (parm == AFSOP_START_RXCALLBACK) { if (afs_CB_Running) return; +#ifdef RXK_LISTENER_ENV } else if (parm == AFSOP_RXLISTENER_DAEMON) { if (afs_RX_Running) return; afs_RX_Running = 1; +#endif code = afs_InitSetup(parm2); if (parm3) { rx_enablePeerRPCStats(); @@ -584,9 +602,15 @@ afs_syscall_call(long parm, long parm2, long parm3, #endif /* !RXK_LISTENER_ENV */ { #ifdef RXK_LISTENER_ENV - while (afs_RX_Running != 2) + while (afs_RX_Running != 2) afs_osi_Sleep(&afs_RX_Running); #else /* !RXK_LISTENER_ENV */ + if (parm3) { + rx_enablePeerRPCStats(); + } + if (parm4) { + rx_enableProcessRPCStats(); + } afs_initState = AFSOP_START_AFS; afs_osi_Wakeup(&afs_initState); #endif /* RXK_LISTENER_ENV */ @@ -1301,7 +1325,7 @@ afs_shutdown(void) #endif #endif -#ifdef AFS_SUN510_ENV +#if defined(AFS_SUN510_ENV) || defined(RXK_UPCALL_ENV) afs_warn("NetIfPoller... "); osi_StopNetIfPoller(); #endif diff --git a/src/afsd/afsd.c b/src/afsd/afsd.c index ee6d33b..b951a22 100644 --- a/src/afsd/afsd.c +++ b/src/afsd/afsd.c @@ -334,9 +334,7 @@ int PartSizeOverflow(char *path, int cs); #if defined(AFS_SUN510_ENV) && defined(RXK_LISTENER_ENV) static void fork_rx_syscall_wait(); #endif -#if defined(AFS_SUN510_ENV) || defined(RXK_LISTENER_ENV) static void fork_rx_syscall(); -#endif static void fork_syscall(); #if defined(AFS_DARWIN_ENV) && !defined(AFS_ARM_DARWIN_ENV) @@ -2206,8 +2204,13 @@ afsd_run(void) #endif if (afsd_verbose) printf("%s: Forking rx callback listener.\n", rn); +#ifndef RXK_LISTENER_ENV + fork_rx_syscall(rn, AFSOP_START_RXCALLBACK, preallocs, enable_peer_stats, + enable_process_stats); +#else fork_syscall(rn, AFSOP_START_RXCALLBACK, preallocs); -#if defined(AFS_SUN5_ENV) || defined(RXK_LISTENER_ENV) +#endif +#if defined(AFS_SUN5_ENV) || defined(RXK_LISTENER_ENV) || defined(RXK_UPCALL_ENV) if (afsd_verbose) printf("%s: Forking rxevent daemon.\n", rn); fork_rx_syscall(rn, AFSOP_RXEVENT_DAEMON); @@ -3160,7 +3163,6 @@ fork_syscall(const char *rn, long syscall, long param1, long param2, fork_syscall_impl(0, 0, rn, syscall, param1, param2, param3, param4, param5); } -#if defined(AFS_SUN510_ENV) || defined(RXK_LISTENER_ENV) /** * call a syscall in another process or thread, and give it RX priority. */ @@ -3170,7 +3172,6 @@ fork_rx_syscall(const char *rn, long syscall, long param1, long param2, { fork_syscall_impl(1, 0, rn, syscall, param1, param2, param3, param4, param5); } -#endif /* AFS_SUN510_ENV || RXK_LISTENER_ENV */ #if defined(AFS_SUN510_ENV) && defined(RXK_LISTENER_ENV) /** diff --git a/src/config/param.ppc_darwin_80.h b/src/config/param.ppc_darwin_80.h index f4ab2f6..3812713 100644 --- a/src/config/param.ppc_darwin_80.h +++ b/src/config/param.ppc_darwin_80.h @@ -61,6 +61,7 @@ #define AFS_GCPAGS 0 #define RXK_LISTENER_ENV 1 +#define RXK_TIMEDSLEEP_ENV 1 #ifdef KERNEL #undef MACRO_BEGIN diff --git a/src/config/param.ppc_darwin_90.h b/src/config/param.ppc_darwin_90.h index 7804cfd..e5429a6 100644 --- a/src/config/param.ppc_darwin_90.h +++ b/src/config/param.ppc_darwin_90.h @@ -63,6 +63,7 @@ #define AFS_GCPAGS 0 #define RXK_LISTENER_ENV 1 +#define RXK_TIMEDSLEEP_ENV 1 #ifdef KERNEL #undef MACRO_BEGIN diff --git a/src/config/param.x86_darwin_100.h b/src/config/param.x86_darwin_100.h index 2275b0b..63f8bfc 100644 --- a/src/config/param.x86_darwin_100.h +++ b/src/config/param.x86_darwin_100.h @@ -95,7 +95,8 @@ #define AFS_HAVE_FFS 1 /* Use system's ffs. */ #define AFS_GCPAGS 0 -#define RXK_LISTENER_ENV 1 +#define RXK_UPCALL_ENV 1 +#define RXK_TIMEDSLEEP_ENV 1 #ifdef KERNEL #undef MACRO_BEGIN diff --git a/src/config/param.x86_darwin_80.h b/src/config/param.x86_darwin_80.h index f2fa162..29751d3 100644 --- a/src/config/param.x86_darwin_80.h +++ b/src/config/param.x86_darwin_80.h @@ -59,6 +59,7 @@ #define AFS_GCPAGS 0 #define RXK_LISTENER_ENV 1 +#define RXK_TIMEDSLEEP_ENV 1 #ifdef KERNEL #undef MACRO_BEGIN diff --git a/src/config/param.x86_darwin_90.h b/src/config/param.x86_darwin_90.h index b7ec741..f1e02a6 100644 --- a/src/config/param.x86_darwin_90.h +++ b/src/config/param.x86_darwin_90.h @@ -63,6 +63,7 @@ #define AFS_GCPAGS 0 #define RXK_LISTENER_ENV 1 +#define RXK_TIMEDSLEEP_ENV 1 #ifdef KERNEL #undef MACRO_BEGIN diff --git a/src/rx/DARWIN/rx_knet.c b/src/rx/DARWIN/rx_knet.c index e6b48b7..c70e171 100644 --- a/src/rx/DARWIN/rx_knet.c +++ b/src/rx/DARWIN/rx_knet.c @@ -16,7 +16,140 @@ #ifdef AFS_DARWIN80_ENV #define soclose sock_close #endif - + +#ifdef RXK_UPCALL_ENV +void +rx_upcall(socket_t so, void *arg, __unused int waitflag) +{ + mbuf_t m; + int error = 0; + int i, flags = 0; + struct msghdr msg; + struct sockaddr_storage ss; + struct sockaddr *sa = NULL; + struct sockaddr_in from; + struct rx_packet *p; + afs_int32 rlen; + afs_int32 tlen; + afs_int32 savelen; /* was using rlen but had aliasing problems */ + size_t nbytes, resid, noffset; + + p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE); + rx_computelen(p, tlen); + rx_SetDataSize(p, tlen); /* this is the size of the user data area */ + tlen += RX_HEADER_SIZE; /* now this is the size of the entire packet */ + rlen = rx_maxJumboRecvSize; /* this is what I am advertising. Only check + * it once in order to avoid races. */ + tlen = rlen - tlen; + if (tlen > 0) { + tlen = rxi_AllocDataBuf(p, tlen, RX_PACKET_CLASS_RECV_CBUF); + if (tlen > 0) { + tlen = rlen - tlen; + } else + tlen = rlen; + } else + tlen = rlen; + /* add some padding to the last iovec, it's just to make sure that the + * read doesn't return more data than we expect, and is done to get around + * our problems caused by the lack of a length field in the rx header. */ + savelen = p->wirevec[p->niovecs - 1].iov_len; + p->wirevec[p->niovecs - 1].iov_len = savelen + RX_EXTRABUFFERSIZE; + + resid = nbytes = tlen + sizeof(afs_int32); + + memset(&msg, 0, sizeof(struct msghdr)); + msg.msg_name = &ss; + msg.msg_namelen = sizeof(struct sockaddr_storage); + sa =(struct sockaddr *) &ss; + + do { + m = NULL; + error = sock_receivembuf(so, &msg, &m, MSG_DONTWAIT, &nbytes); + if (!error) { + size_t sz, offset = 0; + noffset = 0; + resid = nbytes; + for (i=0;iniovecs && resid;i++) { + sz=MIN(resid, p->wirevec[i].iov_len); + error = mbuf_copydata(m, offset, sz, p->wirevec[i].iov_base); + if (error) + break; + resid-=sz; + offset+=sz; + noffset += sz; + } + } + } while (0); + + mbuf_freem(m); + + /* restore the vec to its correct state */ + p->wirevec[p->niovecs - 1].iov_len = savelen; + + if (error == EWOULDBLOCK && noffset > 0) + error = 0; + + if (!error) { + int host, port; + + nbytes -= resid; + + if (sa->sa_family == AF_INET) + from = *(struct sockaddr_in *)sa; + + p->length = nbytes - RX_HEADER_SIZE;; + if ((nbytes > tlen) || (p->length & 0x8000)) { /* Bogus packet */ + if (nbytes <= 0) { + if (rx_stats_active) { + MUTEX_ENTER(&rx_stats_mutex); + rx_stats.bogusPacketOnRead++; + rx_stats.bogusHost = from.sin_addr.s_addr; + MUTEX_EXIT(&rx_stats_mutex); + } + dpf(("B: bogus packet from [%x,%d] nb=%d", + from.sin_addr.s_addr, from.sin_port, nbytes)); + } + return; + } else { + /* Extract packet header. */ + rxi_DecodePacketHeader(p); + + host = from.sin_addr.s_addr; + port = from.sin_port; + if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) { + if (rx_stats_active) { + MUTEX_ENTER(&rx_stats_mutex); + rx_stats.packetsRead[p->header.type - 1]++; + MUTEX_EXIT(&rx_stats_mutex); + } + } + +#ifdef RX_TRIMDATABUFS + /* Free any empty packet buffers at the end of this packet */ + rxi_TrimDataBufs(p, 1); +#endif + /* receive pcket */ + p = rxi_ReceivePacket(p, so, host, port, 0, 0); + } + } + /* free packet? */ + if (p) + rxi_FreePacket(p); + + return; +} + +/* in listener env, the listener shutdown does this. we have no listener */ +void +osi_StopNetIfPoller(void) +{ + soclose(rx_socket); + if (afs_termState == AFSOP_STOP_NETIF) { + afs_termState = AFSOP_STOP_COMPLETE; + osi_rxWakeup(&afs_termState); + } +} +#elif defined(RXK_LISTENER_ENV) int osi_NetReceive(osi_socket so, struct sockaddr_in *addr, struct iovec *dvec, int nvecs, int *alength) @@ -129,6 +262,9 @@ osi_StopListener(void) psignal(p, SIGUSR1); #endif } +#else +#error need upcall or listener +#endif int osi_NetSend(osi_socket so, struct sockaddr_in *addr, struct iovec *dvec, diff --git a/src/rx/rx.c b/src/rx/rx.c index 9fc9718..6c80715 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -594,9 +594,11 @@ rx_InitHost(u_int host, u_int port) rx_GetIFInfo(); #endif +#if defined(RXK_LISTENER_ENV) || !defined(KERNEL) /* Start listener process (exact function is dependent on the * implementation environment--kernel or user space) */ rxi_StartListener(); +#endif USERPRI; tmp_status = rxinit_status = 0; diff --git a/src/rx/rx_kcommon.c b/src/rx/rx_kcommon.c index f50abeb..27649fa 100644 --- a/src/rx/rx_kcommon.c +++ b/src/rx/rx_kcommon.c @@ -114,7 +114,7 @@ rxi_GetHostUDPSocket(u_int host, u_short port) sockp = (osi_socket *)rxk_NewSocketHost(host, port); if (sockp == (osi_socket *)0) return OSI_NULLSOCKET; - rxk_AddPort(port, (char *)sockp); + rxk_AddPort(port, (char *)sockp); return (osi_socket) sockp; } @@ -347,8 +347,8 @@ MyArrivalProc(struct rx_packet *ahandle, void rxi_StartListener(void) { +#if !defined(RXK_LISTENER_ENV) && !defined(RXK_UPCALL_ENV) /* if kernel, give name of appropriate procedures */ -#ifndef RXK_LISTENER_ENV rxk_GetPacketProc = MyPacketProc; rxk_PacketArrivalProc = MyArrivalProc; rxk_init(); @@ -861,9 +861,13 @@ rxk_NewSocketHost(afs_uint32 ahost, short aport) code = socreate(AF_INET, &newSocket, SOCK_DGRAM, IPPROTO_UDP, afs_osi_credp, curthread); #elif defined(AFS_DARWIN80_ENV) +#ifdef RXK_LISTENER_ENV code = sock_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, NULL, &newSocket); +#else + code = sock_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, rx_upcall, NULL, &newSocket); +#endif #elif defined(AFS_NBSD50_ENV) - code = socreate(AF_INET, &newSocket, SOCK_DGRAM, 0, osi_curproc(), NULL); + code = socreate(AF_INET, &newSocket, SOCK_DGRAM, 0, osi_curproc(), NULL); #elif defined(AFS_NBSD40_ENV) code = socreate(AF_INET, &newSocket, SOCK_DGRAM, 0, osi_curproc()); #else @@ -1021,8 +1025,8 @@ rxk_FreeSocket(struct socket *asocket) } #endif /* !SUN5 && !LINUX20 */ -#if defined(RXK_LISTENER_ENV) || defined(AFS_SUN5_ENV) -#ifdef AFS_DARWIN80_ENV +#if defined(RXK_LISTENER_ENV) || defined(AFS_SUN5_ENV) || defined(RXK_UPCALL_ENV) +#ifdef RXK_TIMEDSLEEP_ENV /* Shutting down should wake us up, as should an earlier event. */ void rxi_ReScheduleEvents(void) @@ -1059,7 +1063,7 @@ afs_rxevent_daemon(void) afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING, "before afs_osi_Wait()"); #endif -#ifdef AFS_DARWIN80_ENV +#ifdef RXK_TIMEDSLEEP_ENV afs_osi_TimedSleep(&afs_termState, MAX(500, ((temp.sec * 1000) + (temp.usec / 1000))), 0); #else @@ -1072,13 +1076,11 @@ afs_rxevent_daemon(void) if (afs_termState == AFSOP_STOP_RXEVENT) { #ifdef RXK_LISTENER_ENV afs_termState = AFSOP_STOP_RXK_LISTENER; -#else -#ifdef AFS_SUN510_ENV +#elif defined(AFS_SUN510_ENV) || defined(RXK_UPCALL_ENV) afs_termState = AFSOP_STOP_NETIF; #else afs_termState = AFSOP_STOP_COMPLETE; #endif -#endif osi_rxWakeup(&afs_termState); return; } diff --git a/src/rx/rx_kernel.h b/src/rx/rx_kernel.h index d5223ac..1351d17 100644 --- a/src/rx/rx_kernel.h +++ b/src/rx/rx_kernel.h @@ -15,7 +15,7 @@ #define osi_Alloc afs_osi_Alloc #define osi_Free afs_osi_Free -#ifndef AFS_DARWIN80_ENV +#ifndef RXK_TIMEDSLEEP_ENV #define rxi_ReScheduleEvents 0 /* Not needed by kernel */ #endif diff --git a/src/rx/rx_prototypes.h b/src/rx/rx_prototypes.h index b4b3b8c..6e39b0b 100644 --- a/src/rx/rx_prototypes.h +++ b/src/rx/rx_prototypes.h @@ -434,8 +434,12 @@ extern int osi_NetSend(osi_socket asocket, struct sockaddr_in *addr, struct iovec *dvec, int nvecs, afs_int32 asize, int istack); # endif +# ifdef RXK_UPCALL_ENV +extern void rx_upcall(socket_t so, void *arg, __unused int waitflag); +# else extern int osi_NetReceive(osi_socket so, struct sockaddr_in *addr, struct iovec *dvec, int nvecs, int *lengthp); +# endif # if defined(AFS_SUN510_ENV) extern void osi_StartNetIfPoller(void); extern void osi_NetIfPoller(void); @@ -449,8 +453,8 @@ extern void rxi_ListenerProc(osi_socket usockp, int *tnop, struct rx_call **newcallp); # endif -# ifndef RXK_LISTENER_ENV -extern void rxk_init(); +# if !defined(RXK_LISTENER_ENV) && !defined(RXK_UPCALL_ENV) +extern void rxk_init(void); # endif /* UKERNEL/rx_knet.c */ -- 1.9.4