macos: kernel socket upcall
authorDerrick Brashear <shadow@dementia.org>
Wed, 16 Mar 2011 05:06:43 +0000 (01:06 -0400)
committerDerrick Brashear <shadow@dementia.org>
Thu, 17 Mar 2011 20:56:38 +0000 (13:56 -0700)
support kernel socket upcall instead of listener env

Change-Id: I1b66ce0877053700cd894f47017596fbe07e7384
Reviewed-on: http://gerrit.openafs.org/4239
Tested-by: Derrick Brashear <shadow@dementia.org>
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementia.org>

12 files changed:
src/afs/afs_call.c
src/afsd/afsd.c
src/config/param.ppc_darwin_80.h
src/config/param.ppc_darwin_90.h
src/config/param.x86_darwin_100.h
src/config/param.x86_darwin_80.h
src/config/param.x86_darwin_90.h
src/rx/DARWIN/rx_knet.c
src/rx/rx.c
src/rx/rx_kcommon.c
src/rx/rx_kernel.h
src/rx/rx_prototypes.h

index 01704ad..49f253b 100644 (file)
@@ -144,8 +144,13 @@ afsd_thread(int *rock)
        AFS_GLOCK();
        wakeup(arg);
        afs_CB_Running = 1;
+#ifndef RXK_LISTENER_ENV
+       afs_initState = AFSOP_START_AFS;
+       afs_osi_Wakeup(&afs_initState);
+#else
        while (afs_RX_Running != 2)
            afs_osi_Sleep(&afs_RX_Running);
+#endif
        afs_RXCallBackServer();
        AFS_GUNLOCK();
        thread_terminate(current_thread());
@@ -191,6 +196,7 @@ afsd_thread(int *rock)
        AFS_GUNLOCK();
        thread_terminate(current_thread());
        break;
+#ifdef RXK_LISTENER_ENV
     case AFSOP_RXLISTENER_DAEMON:
        AFS_GLOCK();
        wakeup(arg);
@@ -203,6 +209,7 @@ afsd_thread(int *rock)
        AFS_GUNLOCK();
        thread_terminate(current_thread());
        break;
+#endif
     default:
        afs_warn("Unknown op %ld in StartDaemon()\n", (long)parm);
        break;
@@ -219,10 +226,12 @@ afs_DaemonOp(long parm, long parm2, long parm3, long parm4, long parm5,
     if (parm == AFSOP_START_RXCALLBACK) {
        if (afs_CB_Running)
            return;
+#ifdef RXK_LISTENER_ENV
     } else if (parm == AFSOP_RXLISTENER_DAEMON) {
        if (afs_RX_Running)
            return;
        afs_RX_Running = 1;
+#endif
        code = afs_InitSetup(parm2);
        if (parm3) {
            rx_enablePeerRPCStats();
@@ -282,8 +291,13 @@ afsd_thread(void *rock)
        AFS_GLOCK();
        complete(arg->complete);
        afs_CB_Running = 1;
+#if !defined(RXK_LISTENER_ENV)
+       afs_initState = AFSOP_START_AFS;
+       afs_osi_Wakeup(&afs_initState);
+#else
        while (afs_RX_Running != 2)
            afs_osi_Sleep(&afs_RX_Running);
+#endif
        sprintf(current->comm, "afs_callback");
        afs_RXCallBackServer();
        AFS_GUNLOCK();
@@ -355,6 +369,7 @@ afsd_thread(void *rock)
        AFS_GUNLOCK();
        complete_and_exit(0, 0);
        break;
+#ifdef RXK_LISTENER_ENV
     case AFSOP_RXLISTENER_DAEMON:
        sprintf(current->comm, "afs_lsnstart");
 #ifdef SYS_SETPRIORITY_EXPORTED
@@ -376,6 +391,7 @@ afsd_thread(void *rock)
        AFS_GUNLOCK();
        complete_and_exit(0, 0);
        break;
+#endif
     default:
        afs_warn("Unknown op %ld in StartDaemon()\n", (long)parm);
        break;
@@ -415,10 +431,12 @@ afs_DaemonOp(long parm, long parm2, long parm3, long parm4, long parm5,
     if (parm == AFSOP_START_RXCALLBACK) {
        if (afs_CB_Running)
            return;
+#ifdef RXK_LISTENER_ENV
     } else if (parm == AFSOP_RXLISTENER_DAEMON) {
        if (afs_RX_Running)
            return;
        afs_RX_Running = 1;
+#endif
        code = afs_InitSetup(parm2);
        if (parm3) {
            rx_enablePeerRPCStats();
@@ -584,9 +602,15 @@ afs_syscall_call(long parm, long parm2, long parm3,
 #endif /* !RXK_LISTENER_ENV */
        {
 #ifdef RXK_LISTENER_ENV
-               while (afs_RX_Running != 2)
+           while (afs_RX_Running != 2)
                afs_osi_Sleep(&afs_RX_Running);
 #else /* !RXK_LISTENER_ENV */
+           if (parm3) {
+               rx_enablePeerRPCStats();
+           }
+           if (parm4) {
+               rx_enableProcessRPCStats();
+           }
            afs_initState = AFSOP_START_AFS;
            afs_osi_Wakeup(&afs_initState);
 #endif /* RXK_LISTENER_ENV */
@@ -1301,7 +1325,7 @@ afs_shutdown(void)
 #endif
 #endif
 
-#ifdef AFS_SUN510_ENV
+#if defined(AFS_SUN510_ENV) || defined(RXK_UPCALL_ENV)
     afs_warn("NetIfPoller... ");
     osi_StopNetIfPoller();
 #endif
index ee6d33b..b951a22 100644 (file)
@@ -334,9 +334,7 @@ int PartSizeOverflow(char *path, int cs);
 #if defined(AFS_SUN510_ENV) && defined(RXK_LISTENER_ENV)
 static void fork_rx_syscall_wait();
 #endif
-#if defined(AFS_SUN510_ENV) || defined(RXK_LISTENER_ENV)
 static void fork_rx_syscall();
-#endif
 static void fork_syscall();
 
 #if defined(AFS_DARWIN_ENV) && !defined(AFS_ARM_DARWIN_ENV)
@@ -2206,8 +2204,13 @@ afsd_run(void)
 #endif
     if (afsd_verbose)
        printf("%s: Forking rx callback listener.\n", rn);
+#ifndef RXK_LISTENER_ENV
+    fork_rx_syscall(rn, AFSOP_START_RXCALLBACK, preallocs, enable_peer_stats,
+                    enable_process_stats);
+#else
     fork_syscall(rn, AFSOP_START_RXCALLBACK, preallocs);
-#if defined(AFS_SUN5_ENV) || defined(RXK_LISTENER_ENV)
+#endif
+#if defined(AFS_SUN5_ENV) || defined(RXK_LISTENER_ENV) || defined(RXK_UPCALL_ENV)
     if (afsd_verbose)
        printf("%s: Forking rxevent daemon.\n", rn);
     fork_rx_syscall(rn, AFSOP_RXEVENT_DAEMON);
@@ -3160,7 +3163,6 @@ fork_syscall(const char *rn, long syscall, long param1, long param2,
     fork_syscall_impl(0, 0, rn, syscall, param1, param2, param3, param4, param5);
 }
 
-#if defined(AFS_SUN510_ENV) || defined(RXK_LISTENER_ENV)
 /**
  * call a syscall in another process or thread, and give it RX priority.
  */
@@ -3170,7 +3172,6 @@ fork_rx_syscall(const char *rn, long syscall, long param1, long param2,
 {
     fork_syscall_impl(1, 0, rn, syscall, param1, param2, param3, param4, param5);
 }
-#endif /* AFS_SUN510_ENV || RXK_LISTENER_ENV */
 
 #if defined(AFS_SUN510_ENV) && defined(RXK_LISTENER_ENV)
 /**
index f4ab2f6..3812713 100644 (file)
@@ -61,6 +61,7 @@
 
 #define AFS_GCPAGS               0
 #define RXK_LISTENER_ENV         1
+#define RXK_TIMEDSLEEP_ENV       1
 
 #ifdef KERNEL
 #undef MACRO_BEGIN
index 7804cfd..e5429a6 100644 (file)
@@ -63,6 +63,7 @@
 
 #define AFS_GCPAGS               0
 #define RXK_LISTENER_ENV         1
+#define RXK_TIMEDSLEEP_ENV       1
 
 #ifdef KERNEL
 #undef MACRO_BEGIN
index 2275b0b..63f8bfc 100644 (file)
@@ -95,7 +95,8 @@
 #define AFS_HAVE_FFS    1      /* Use system's ffs. */
 
 #define AFS_GCPAGS               0
-#define RXK_LISTENER_ENV         1
+#define RXK_UPCALL_ENV         1
+#define RXK_TIMEDSLEEP_ENV       1
 
 #ifdef KERNEL
 #undef MACRO_BEGIN
index f2fa162..29751d3 100644 (file)
@@ -59,6 +59,7 @@
 
 #define AFS_GCPAGS               0
 #define RXK_LISTENER_ENV         1
+#define RXK_TIMEDSLEEP_ENV       1
 
 #ifdef KERNEL
 #undef MACRO_BEGIN
index b7ec741..f1e02a6 100644 (file)
@@ -63,6 +63,7 @@
 
 #define AFS_GCPAGS               0
 #define RXK_LISTENER_ENV         1
+#define RXK_TIMEDSLEEP_ENV       1
 
 #ifdef KERNEL
 #undef MACRO_BEGIN
index e6b48b7..c70e171 100644 (file)
 #ifdef AFS_DARWIN80_ENV
 #define soclose sock_close
 #endif
+
+#ifdef RXK_UPCALL_ENV
+void
+rx_upcall(socket_t so, void *arg, __unused int waitflag)
+{
+    mbuf_t m;
+    int error = 0;
+    int i, flags = 0;
+    struct msghdr msg;
+    struct sockaddr_storage ss;
+    struct sockaddr *sa = NULL;
+    struct sockaddr_in from;
+    struct rx_packet *p;
+    afs_int32 rlen;
+    afs_int32 tlen;
+    afs_int32 savelen;          /* was using rlen but had aliasing problems */
+    size_t nbytes, resid, noffset;
+
+    p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE);
+    rx_computelen(p, tlen);
+    rx_SetDataSize(p, tlen);    /* this is the size of the user data area */
+    tlen += RX_HEADER_SIZE;     /* now this is the size of the entire packet */
+    rlen = rx_maxJumboRecvSize; /* this is what I am advertising.  Only check
+                                * it once in order to avoid races.  */
+    tlen = rlen - tlen;
+    if (tlen > 0) {
+       tlen = rxi_AllocDataBuf(p, tlen, RX_PACKET_CLASS_RECV_CBUF);
+       if (tlen > 0) {
+           tlen = rlen - tlen;
+       } else
+           tlen = rlen;
+    } else
+       tlen = rlen;
+    /* add some padding to the last iovec, it's just to make sure that the
+     * read doesn't return more data than we expect, and is done to get around
+     * our problems caused by the lack of a length field in the rx header. */
+    savelen = p->wirevec[p->niovecs - 1].iov_len;
+    p->wirevec[p->niovecs - 1].iov_len = savelen + RX_EXTRABUFFERSIZE;
+
+    resid = nbytes = tlen + sizeof(afs_int32);
+
+    memset(&msg, 0, sizeof(struct msghdr));
+    msg.msg_name = &ss;
+    msg.msg_namelen = sizeof(struct sockaddr_storage);
+    sa =(struct sockaddr *) &ss;
+
+    do {
+       m = NULL;
+       error = sock_receivembuf(so, &msg, &m, MSG_DONTWAIT, &nbytes);
+       if (!error) {
+           size_t sz, offset = 0;
+           noffset = 0;
+           resid = nbytes;
+           for (i=0;i<p->niovecs && resid;i++) {
+               sz=MIN(resid, p->wirevec[i].iov_len);
+               error = mbuf_copydata(m, offset, sz, p->wirevec[i].iov_base);
+               if (error)
+                   break;
+               resid-=sz;
+               offset+=sz;
+               noffset += sz;
+           }
+       }
+    } while (0);
+
+    mbuf_freem(m);
+
+    /* restore the vec to its correct state */
+    p->wirevec[p->niovecs - 1].iov_len = savelen;
+
+    if (error == EWOULDBLOCK && noffset > 0)
+       error = 0;
+
+    if (!error) {
+       int host, port;
+
+       nbytes -= resid;
+
+       if (sa->sa_family == AF_INET)
+           from = *(struct sockaddr_in *)sa;
+
+       p->length = nbytes - RX_HEADER_SIZE;;
+       if ((nbytes > tlen) || (p->length & 0x8000)) {  /* Bogus packet */
+           if (nbytes <= 0) {
+               if (rx_stats_active) {
+                   MUTEX_ENTER(&rx_stats_mutex);
+                   rx_stats.bogusPacketOnRead++;
+                   rx_stats.bogusHost = from.sin_addr.s_addr;
+                   MUTEX_EXIT(&rx_stats_mutex);
+               }
+               dpf(("B: bogus packet from [%x,%d] nb=%d",
+                    from.sin_addr.s_addr, from.sin_port, nbytes));
+           }
+           return;
+       } else {
+           /* Extract packet header. */
+           rxi_DecodePacketHeader(p);
+
+           host = from.sin_addr.s_addr;
+           port = from.sin_port;
+           if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) {
+               if (rx_stats_active) {
+                   MUTEX_ENTER(&rx_stats_mutex);
+                   rx_stats.packetsRead[p->header.type - 1]++;
+                   MUTEX_EXIT(&rx_stats_mutex);
+               }
+           }
+
+#ifdef RX_TRIMDATABUFS
+           /* Free any empty packet buffers at the end of this packet */
+           rxi_TrimDataBufs(p, 1);
+#endif
+           /* receive pcket */
+           p = rxi_ReceivePacket(p, so, host, port, 0, 0);
+       }
+    }
+    /* free packet? */
+    if (p)
+       rxi_FreePacket(p);
+
+    return;
+}
+
+/* in listener env, the listener shutdown does this. we have no listener */
+void
+osi_StopNetIfPoller(void)
+{
+    soclose(rx_socket);
+    if (afs_termState == AFSOP_STOP_NETIF) {
+       afs_termState = AFSOP_STOP_COMPLETE;
+       osi_rxWakeup(&afs_termState);
+    }
+}
+#elif defined(RXK_LISTENER_ENV)
 int
 osi_NetReceive(osi_socket so, struct sockaddr_in *addr, struct iovec *dvec,
               int nvecs, int *alength)
@@ -129,6 +262,9 @@ osi_StopListener(void)
        psignal(p, SIGUSR1);
 #endif
 }
+#else
+#error need upcall or listener
+#endif
 
 int
 osi_NetSend(osi_socket so, struct sockaddr_in *addr, struct iovec *dvec,
index 9fc9718..6c80715 100644 (file)
@@ -594,9 +594,11 @@ rx_InitHost(u_int host, u_int port)
     rx_GetIFInfo();
 #endif
 
+#if defined(RXK_LISTENER_ENV) || !defined(KERNEL)
     /* Start listener process (exact function is dependent on the
      * implementation environment--kernel or user space) */
     rxi_StartListener();
+#endif
 
     USERPRI;
     tmp_status = rxinit_status = 0;
index f50abeb..27649fa 100644 (file)
@@ -114,7 +114,7 @@ rxi_GetHostUDPSocket(u_int host, u_short port)
     sockp = (osi_socket *)rxk_NewSocketHost(host, port);
     if (sockp == (osi_socket *)0)
        return OSI_NULLSOCKET;
-       rxk_AddPort(port, (char *)sockp);
+    rxk_AddPort(port, (char *)sockp);
     return (osi_socket) sockp;
 }
 
@@ -347,8 +347,8 @@ MyArrivalProc(struct rx_packet *ahandle,
 void
 rxi_StartListener(void)
 {
+#if !defined(RXK_LISTENER_ENV) && !defined(RXK_UPCALL_ENV)
     /* if kernel, give name of appropriate procedures */
-#ifndef RXK_LISTENER_ENV
     rxk_GetPacketProc = MyPacketProc;
     rxk_PacketArrivalProc = MyArrivalProc;
     rxk_init();
@@ -861,9 +861,13 @@ rxk_NewSocketHost(afs_uint32 ahost, short aport)
     code = socreate(AF_INET, &newSocket, SOCK_DGRAM, IPPROTO_UDP,
                    afs_osi_credp, curthread);
 #elif defined(AFS_DARWIN80_ENV)
+#ifdef RXK_LISTENER_ENV
     code = sock_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, NULL, &newSocket);
+#else
+    code = sock_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, rx_upcall, NULL, &newSocket);
+#endif
 #elif defined(AFS_NBSD50_ENV)
-       code = socreate(AF_INET, &newSocket, SOCK_DGRAM, 0, osi_curproc(), NULL);
+    code = socreate(AF_INET, &newSocket, SOCK_DGRAM, 0, osi_curproc(), NULL);
 #elif defined(AFS_NBSD40_ENV)
     code = socreate(AF_INET, &newSocket, SOCK_DGRAM, 0, osi_curproc());
 #else
@@ -1021,8 +1025,8 @@ rxk_FreeSocket(struct socket *asocket)
 }
 #endif /* !SUN5 && !LINUX20 */
 
-#if defined(RXK_LISTENER_ENV) || defined(AFS_SUN5_ENV)
-#ifdef AFS_DARWIN80_ENV
+#if defined(RXK_LISTENER_ENV) || defined(AFS_SUN5_ENV) || defined(RXK_UPCALL_ENV)
+#ifdef RXK_TIMEDSLEEP_ENV
 /* Shutting down should wake us up, as should an earlier event. */
 void
 rxi_ReScheduleEvents(void)
@@ -1059,7 +1063,7 @@ afs_rxevent_daemon(void)
        afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
                   "before afs_osi_Wait()");
 #endif
-#ifdef AFS_DARWIN80_ENV
+#ifdef RXK_TIMEDSLEEP_ENV
        afs_osi_TimedSleep(&afs_termState, MAX(500, ((temp.sec * 1000) +
                                                     (temp.usec / 1000))), 0);
 #else
@@ -1072,13 +1076,11 @@ afs_rxevent_daemon(void)
        if (afs_termState == AFSOP_STOP_RXEVENT) {
 #ifdef RXK_LISTENER_ENV
            afs_termState = AFSOP_STOP_RXK_LISTENER;
-#else
-#ifdef AFS_SUN510_ENV
+#elif defined(AFS_SUN510_ENV) || defined(RXK_UPCALL_ENV)
            afs_termState = AFSOP_STOP_NETIF;
 #else
            afs_termState = AFSOP_STOP_COMPLETE;
 #endif
-#endif
            osi_rxWakeup(&afs_termState);
            return;
        }
index d5223ac..1351d17 100644 (file)
@@ -15,7 +15,7 @@
 #define osi_Alloc afs_osi_Alloc
 #define osi_Free  afs_osi_Free
 
-#ifndef AFS_DARWIN80_ENV
+#ifndef RXK_TIMEDSLEEP_ENV
 #define rxi_ReScheduleEvents    0      /* Not needed by kernel */
 #endif
 
index b4b3b8c..6e39b0b 100644 (file)
@@ -434,8 +434,12 @@ extern int osi_NetSend(osi_socket asocket, struct sockaddr_in *addr,
                       struct iovec *dvec, int nvecs, afs_int32 asize,
                       int istack);
 # endif
+# ifdef RXK_UPCALL_ENV
+extern void rx_upcall(socket_t so, void *arg, __unused int waitflag);
+# else
 extern int osi_NetReceive(osi_socket so, struct sockaddr_in *addr,
                          struct iovec *dvec, int nvecs, int *lengthp);
+# endif
 # if defined(AFS_SUN510_ENV)
 extern void osi_StartNetIfPoller(void);
 extern void osi_NetIfPoller(void);
@@ -449,8 +453,8 @@ extern void rxi_ListenerProc(osi_socket usockp, int *tnop,
                             struct rx_call **newcallp);
 # endif
 
-# ifndef RXK_LISTENER_ENV
-extern void rxk_init();
+# if !defined(RXK_LISTENER_ENV) && !defined(RXK_UPCALL_ENV)
+extern void rxk_init(void);
 # endif
 
 /* UKERNEL/rx_knet.c */