include-afsconfig-before-param-h-20010712
[openafs.git] / src / rx / rx_stream.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX stream:  the stream I/O layer for RX */
11
12 This file is now obsolete.
13
14 #include <afsconfig.h>
15 #ifdef KERNEL
16 #include "../afs/param.h"
17 #else
18 #include <afs/param.h>
19 #endif
20
21 RCSID("$Header$");
22
23 #ifdef  KERNEL
24 #include "../h/types.h"
25 #include "../h/uio.h"
26 #include "../rx/rx_stream.h"
27 #else /* KERNEL */
28 #include "rx_stream.h"
29 #endif /* KERNEL */
30
31 void rx_stream_InitRead(sd, call)
32     struct rx_stream *sd;
33     struct rx_call *call;
34 {
35     queue_Init(&sd->sd.rd.rq);
36     queue_Init(&sd->sd.rd.freeTheseQ);
37     sd->sd.rd.nLeft = 0;
38     sd->sd.rd.call = call;
39 }
40
41 /* Normally called from the macro, rx_stream_Read */
42 int rx_stream_ReadProc(sd, buf, nbytes)
43     struct rx_stream *sd;
44     char *buf;
45     int nbytes;
46 {
47     int totalBytes = nbytes;
48
49     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
50
51     while (nbytes) {
52         struct rx_packet *tp;
53         if (queue_IsEmpty(&sd->sd.rd.rq)) {
54             if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) return totalBytes - nbytes;
55             tp = queue_First(&sd->sd.rd.rq, rx_packet);
56             sd->sd.rd.nextByte = rx_DataOf(tp);
57             sd->sd.rd.nLeft = rx_GetDataSize(tp);
58         }
59         if (nbytes < sd->sd.rd.nLeft) {
60             sd->sd.rd.nLeft -= nbytes;
61             bcopy(sd->sd.rd.nextByte, buf, nbytes);
62             sd->sd.rd.nextByte += nbytes;
63             return totalBytes;
64         }
65         bcopy(sd->sd.rd.nextByte, buf, sd->sd.rd.nLeft);
66         buf += sd->sd.rd.nLeft;
67         nbytes -= sd->sd.rd.nLeft;
68         tp = queue_First(&sd->sd.rd.rq, rx_packet);
69         queue_Remove(tp);
70         rx_FreePacket(tp);
71         if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
72             tp = queue_First(&sd->sd.rd.rq, rx_packet);
73             sd->sd.rd.nextByte = rx_DataOf(tp);
74             sd->sd.rd.nLeft = rx_GetDataSize(tp);
75         }
76     }
77     return totalBytes;
78 }
79
80 int rx_stream_ReadIov(sd, iovlenp, iov, nbytes)
81     struct rx_stream *sd;
82     int *iovlenp;
83     struct iovec *iov;
84     int nbytes;
85 {
86     int totalBytes = nbytes;
87     int iovIndex = 0;
88     int maxiovlen = *iovlenp;
89
90     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
91
92     while (nbytes && iovIndex < maxiovlen) {
93         struct rx_packet *tp;
94         if (queue_IsEmpty(&sd->sd.rd.rq)) {
95             if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) break;
96             tp = queue_First(&sd->sd.rd.rq, rx_packet);
97             sd->sd.rd.nextByte = rx_DataOf(tp);
98             sd->sd.rd.nLeft = rx_GetDataSize(tp);
99         }
100         if (nbytes < sd->sd.rd.nLeft) {
101             sd->sd.rd.nLeft -= nbytes;
102             iov[iovIndex].iov_base = sd->sd.rd.nextByte;
103             iov[iovIndex++].iov_len = nbytes;
104             sd->sd.rd.nextByte += nbytes;
105             nbytes = 0;
106             break;
107         }
108         iov[iovIndex].iov_base = sd->sd.rd.nextByte;
109         iov[iovIndex++].iov_len = sd->sd.rd.nLeft;
110         nbytes -= sd->sd.rd.nLeft;
111         tp = queue_First(&sd->sd.rd.rq, rx_packet);
112         queue_MovePrepend(&sd->sd.rd.freeTheseQ, tp);
113         if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
114             tp = queue_First(&sd->sd.rd.rq, rx_packet);
115             sd->sd.rd.nextByte = rx_DataOf(tp);
116             sd->sd.rd.nLeft = rx_GetDataSize(tp);
117         }
118     }
119     *iovlenp = iovIndex;
120     return totalBytes - nbytes;
121 }
122
123 void rx_stream_FinishRead(sd)
124     struct rx_stream *sd;
125 {
126     if (queue_IsNotEmpty(&sd->sd.rd.rq)) rx_FreePackets(&sd->sd.rd.rq);
127     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
128 }
129
130 void rx_stream_InitWrite(sd, call)
131     struct rx_stream *sd;
132     struct rx_call *call;
133 {
134     sd->sd.wd.freePtr = 0;
135     sd->sd.wd.nFree = 0;
136     sd->sd.wd.call = call;
137     queue_Init(&sd->sd.wd.wq);
138     sd->sd.wd.packetSize = rx_MaxDataSize(rx_ConnectionOf(call));
139 }
140
141 /* The real write procedure (rx_stream_Write is a macro) */
142 int rx_stream_WriteProc(sd, buf, nbytes)
143     struct rx_stream *sd;
144     char *buf;
145     int nbytes;
146 {
147     int totalBytes = nbytes;
148     while (nbytes) {
149         if (queue_IsEmpty(&sd->sd.wd.wq)) {
150             if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, 1, RX_WAIT)) break;
151             sd->sd.wd.nFree = sd->sd.wd.packetSize;
152             sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
153         }
154         if (nbytes < sd->sd.wd.nFree) {
155             if (buf) bcopy(buf, sd->sd.wd.freePtr, nbytes), buf += nbytes;
156             sd->sd.wd.nFree -= nbytes;
157             sd->sd.wd.freePtr += nbytes;
158             nbytes = 0;
159             break;
160         }
161         if (buf) bcopy(buf, sd->sd.wd.freePtr, sd->sd.wd.nFree), buf += sd->sd.wd.nFree;
162         nbytes -= sd->sd.wd.nFree;      
163         sd->sd.wd.nFree = 0;
164         if (rx_stream_FlushWrite(sd)) break;
165     }
166     return totalBytes - nbytes;
167 }
168
169 /* Returns nbytes allocated */
170 int rx_stream_AllocIov(sd, iovlenp, iovp, nbytes)
171     struct rx_stream *sd;
172     int *iovlenp;
173     struct iovec *iovp;
174     int nbytes;
175 {
176     struct rx_packet *p, *nxp;
177     int maxiovlen = *iovlenp;
178     int iovIndex;
179     int totalBytes;
180     int nFree;
181     int niovs;
182
183     for (nFree = 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet)) {
184         nFree += sd->sd.wd.packetSize;
185     }
186     if (sd->sd.wd.nFree) nFree = nFree - sd->sd.wd.packetSize + sd->sd.wd.nFree;
187
188     /* Allocate the number of bytes requested, or an even portion */
189     for (totalBytes = nbytes; ; totalBytes >>= 1) {
190         /* Compute number of additional buffers, beyond current partial buffer, required */
191         int nPackets;
192         nbytes = totalBytes - nFree;
193         if (nbytes < 0) {
194             niovs = 1;
195             break;
196         }
197         niovs = nPackets = (nbytes + sd->sd.wd.packetSize - 1)/sd->sd.wd.packetSize;
198         if (nFree) niovs++;
199         if (niovs <= maxiovlen) {
200             if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, nPackets, 1)) break;
201             if (nFree == 0) {
202                 /* Since there weren't any packets allocated previously, setup new description for first packet */
203                 sd->sd.wd.nFree = sd->sd.wd.packetSize;
204                 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
205             }
206             break;
207         }           
208     }
209    
210     /* Create an iovec to describe the set of allocated buffers */
211     for (nFree = sd->sd.wd.nFree, nbytes = totalBytes, iovIndex = 0,
212           queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet),
213           iovIndex++, nFree = sd->sd.wd.packetSize) {
214         if (iovIndex >= niovs) break;
215         iovp[iovIndex].iov_base = rx_DataOf(p) + sd->sd.wd.packetSize - nFree;
216         if (nbytes <= nFree) {
217             iovp[iovIndex].iov_len = nbytes;
218             nbytes = 0;
219             break;
220         }
221         nbytes -= nFree;
222         iovp[iovIndex].iov_len = nFree;
223     }
224     *iovlenp = niovs;
225     return totalBytes - nbytes;
226 }
227
228 /* Wrong, wrong, wrong */
229 rx_stream_FlushWrite(sd)
230     struct rx_stream *sd;
231 {
232     struct rx_queue q;
233     queue_Init(&q);
234     if (queue_IsNotEmpty(&sd->sd.wd.wq) && sd->sd.wd.nFree < sd->sd.wd.packetSize) {
235         struct rx_packet *tp = queue_First(&sd->sd.wd.wq, rx_packet);
236         queue_MoveAppend(&q, tp);
237         rx_SetDataSize(queue_First(&q, rx_packet), sd->sd.wd.packetSize - sd->sd.wd.nFree);
238         if (queue_IsNotEmpty(&sd->sd.wd.wq)) {
239             sd->sd.wd.nFree = sd->sd.wd.packetSize;
240             sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
241         } else sd->sd.wd.nFree = 0;
242         return (rx_SendData(sd->sd.wd.call, &q));
243     }
244     return 0;
245 }
246
247 void rx_stream_FinishWrite(sd)
248     struct rx_stream *sd;
249 {
250     rx_stream_FlushWrite(sd);
251     if (queue_IsNotEmpty(&sd->sd.wd.wq)) rx_FreePackets(&sd->sd.wd.wq);
252     sd->sd.wd.nFree = 0;
253 }