reindent-again-20030808
[openafs.git] / src / rx / rx_stream.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX stream:  the stream I/O layer for RX */
11
12 This file is now obsolete.
13 #include <afsconfig.h>
14 #ifdef KERNEL
15 #include "afs/param.h"
16 #else
17 #include <afs/param.h>
18 #endif
19  
20     RCSID
21     ("$Header$");
22
23 #ifdef  KERNEL
24 #include "h/types.h"
25 #include "h/uio.h"
26 #include "rx/rx_stream.h"
27 #else /* KERNEL */
28 #include "rx_stream.h"
29 #endif /* KERNEL */
30
31 void
32 rx_stream_InitRead(sd, call)
33      struct rx_stream *sd;
34      struct rx_call *call;
35 {
36     queue_Init(&sd->sd.rd.rq);
37     queue_Init(&sd->sd.rd.freeTheseQ);
38     sd->sd.rd.nLeft = 0;
39     sd->sd.rd.call = call;
40 }
41
42 /* Normally called from the macro, rx_stream_Read */
43 int
44 rx_stream_ReadProc(sd, buf, nbytes)
45      struct rx_stream *sd;
46      char *buf;
47      int nbytes;
48 {
49     int totalBytes = nbytes;
50
51     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ))
52         rx_FreePackets(&sd->sd.rd.freeTheseQ);
53
54     while (nbytes) {
55         struct rx_packet *tp;
56         if (queue_IsEmpty(&sd->sd.rd.rq)) {
57             if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq))
58                 return totalBytes - nbytes;
59             tp = queue_First(&sd->sd.rd.rq, rx_packet);
60             sd->sd.rd.nextByte = rx_DataOf(tp);
61             sd->sd.rd.nLeft = rx_GetDataSize(tp);
62         }
63         if (nbytes < sd->sd.rd.nLeft) {
64             sd->sd.rd.nLeft -= nbytes;
65             memcpy(buf, sd->sd.rd.nextByte, nbytes);
66             sd->sd.rd.nextByte += nbytes;
67             return totalBytes;
68         }
69         memcpy(buf, sd->sd.rd.nextByte, sd->sd.rd.nLeft);
70         buf += sd->sd.rd.nLeft;
71         nbytes -= sd->sd.rd.nLeft;
72         tp = queue_First(&sd->sd.rd.rq, rx_packet);
73         queue_Remove(tp);
74         rx_FreePacket(tp);
75         if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
76             tp = queue_First(&sd->sd.rd.rq, rx_packet);
77             sd->sd.rd.nextByte = rx_DataOf(tp);
78             sd->sd.rd.nLeft = rx_GetDataSize(tp);
79         }
80     }
81     return totalBytes;
82 }
83
84 int
85 rx_stream_ReadIov(sd, iovlenp, iov, nbytes)
86      struct rx_stream *sd;
87      int *iovlenp;
88      struct iovec *iov;
89      int nbytes;
90 {
91     int totalBytes = nbytes;
92     int iovIndex = 0;
93     int maxiovlen = *iovlenp;
94
95     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ))
96         rx_FreePackets(&sd->sd.rd.freeTheseQ);
97
98     while (nbytes && iovIndex < maxiovlen) {
99         struct rx_packet *tp;
100         if (queue_IsEmpty(&sd->sd.rd.rq)) {
101             if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq))
102                 break;
103             tp = queue_First(&sd->sd.rd.rq, rx_packet);
104             sd->sd.rd.nextByte = rx_DataOf(tp);
105             sd->sd.rd.nLeft = rx_GetDataSize(tp);
106         }
107         if (nbytes < sd->sd.rd.nLeft) {
108             sd->sd.rd.nLeft -= nbytes;
109             iov[iovIndex].iov_base = sd->sd.rd.nextByte;
110             iov[iovIndex++].iov_len = nbytes;
111             sd->sd.rd.nextByte += nbytes;
112             nbytes = 0;
113             break;
114         }
115         iov[iovIndex].iov_base = sd->sd.rd.nextByte;
116         iov[iovIndex++].iov_len = sd->sd.rd.nLeft;
117         nbytes -= sd->sd.rd.nLeft;
118         tp = queue_First(&sd->sd.rd.rq, rx_packet);
119         queue_MovePrepend(&sd->sd.rd.freeTheseQ, tp);
120         if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
121             tp = queue_First(&sd->sd.rd.rq, rx_packet);
122             sd->sd.rd.nextByte = rx_DataOf(tp);
123             sd->sd.rd.nLeft = rx_GetDataSize(tp);
124         }
125     }
126     *iovlenp = iovIndex;
127     return totalBytes - nbytes;
128 }
129
130 void
131 rx_stream_FinishRead(sd)
132      struct rx_stream *sd;
133 {
134     if (queue_IsNotEmpty(&sd->sd.rd.rq))
135         rx_FreePackets(&sd->sd.rd.rq);
136     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ))
137         rx_FreePackets(&sd->sd.rd.freeTheseQ);
138 }
139
140 void
141 rx_stream_InitWrite(sd, call)
142      struct rx_stream *sd;
143      struct rx_call *call;
144 {
145     sd->sd.wd.freePtr = 0;
146     sd->sd.wd.nFree = 0;
147     sd->sd.wd.call = call;
148     queue_Init(&sd->sd.wd.wq);
149     sd->sd.wd.packetSize = rx_MaxDataSize(rx_ConnectionOf(call));
150 }
151
152 /* The real write procedure (rx_stream_Write is a macro) */
153 int
154 rx_stream_WriteProc(sd, buf, nbytes)
155      struct rx_stream *sd;
156      char *buf;
157      int nbytes;
158 {
159     int totalBytes = nbytes;
160     while (nbytes) {
161         if (queue_IsEmpty(&sd->sd.wd.wq)) {
162             if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, 1, RX_WAIT))
163                 break;
164             sd->sd.wd.nFree = sd->sd.wd.packetSize;
165             sd->sd.wd.freePtr =
166                 rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
167         }
168         if (nbytes < sd->sd.wd.nFree) {
169             if (buf)
170                 memcpy(sd->sd.wd.freePtr, buf, nbytes), buf += nbytes;
171             sd->sd.wd.nFree -= nbytes;
172             sd->sd.wd.freePtr += nbytes;
173             nbytes = 0;
174             break;
175         }
176         if (buf)
177             memcpy(sd->sd.wd.freePtr, buf, sd->sd.wd.nFree), buf +=
178                 sd->sd.wd.nFree;
179         nbytes -= sd->sd.wd.nFree;
180         sd->sd.wd.nFree = 0;
181         if (rx_stream_FlushWrite(sd))
182             break;
183     }
184     return totalBytes - nbytes;
185 }
186
187 /* Returns nbytes allocated */
188 int
189 rx_stream_AllocIov(sd, iovlenp, iovp, nbytes)
190      struct rx_stream *sd;
191      int *iovlenp;
192      struct iovec *iovp;
193      int nbytes;
194 {
195     struct rx_packet *p, *nxp;
196     int maxiovlen = *iovlenp;
197     int iovIndex;
198     int totalBytes;
199     int nFree;
200     int niovs;
201
202     for (nFree = 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet)) {
203         nFree += sd->sd.wd.packetSize;
204     }
205     if (sd->sd.wd.nFree)
206         nFree = nFree - sd->sd.wd.packetSize + sd->sd.wd.nFree;
207
208     /* Allocate the number of bytes requested, or an even portion */
209     for (totalBytes = nbytes;; totalBytes >>= 1) {
210         /* Compute number of additional buffers, beyond current partial buffer, required */
211         int nPackets;
212         nbytes = totalBytes - nFree;
213         if (nbytes < 0) {
214             niovs = 1;
215             break;
216         }
217         niovs = nPackets =
218             (nbytes + sd->sd.wd.packetSize - 1) / sd->sd.wd.packetSize;
219         if (nFree)
220             niovs++;
221         if (niovs <= maxiovlen) {
222             if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, nPackets, 1))
223                 break;
224             if (nFree == 0) {
225                 /* Since there weren't any packets allocated previously, setup new description for first packet */
226                 sd->sd.wd.nFree = sd->sd.wd.packetSize;
227                 sd->sd.wd.freePtr =
228                     rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
229             }
230             break;
231         }
232     }
233
234     /* Create an iovec to describe the set of allocated buffers */
235     for (nFree = sd->sd.wd.nFree, nbytes = totalBytes, iovIndex =
236          0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet), iovIndex++, nFree =
237          sd->sd.wd.packetSize) {
238         if (iovIndex >= niovs)
239             break;
240         iovp[iovIndex].iov_base = rx_DataOf(p) + sd->sd.wd.packetSize - nFree;
241         if (nbytes <= nFree) {
242             iovp[iovIndex].iov_len = nbytes;
243             nbytes = 0;
244             break;
245         }
246         nbytes -= nFree;
247         iovp[iovIndex].iov_len = nFree;
248     }
249     *iovlenp = niovs;
250     return totalBytes - nbytes;
251 }
252
253 /* Wrong, wrong, wrong */
254 rx_stream_FlushWrite(sd)
255      struct rx_stream *sd;
256 {
257     struct rx_queue q;
258     queue_Init(&q);
259     if (queue_IsNotEmpty(&sd->sd.wd.wq)
260         && sd->sd.wd.nFree < sd->sd.wd.packetSize) {
261         struct rx_packet *tp = queue_First(&sd->sd.wd.wq, rx_packet);
262         queue_MoveAppend(&q, tp);
263         rx_SetDataSize(queue_First(&q, rx_packet),
264                        sd->sd.wd.packetSize - sd->sd.wd.nFree);
265         if (queue_IsNotEmpty(&sd->sd.wd.wq)) {
266             sd->sd.wd.nFree = sd->sd.wd.packetSize;
267             sd->sd.wd.freePtr =
268                 rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
269         } else
270             sd->sd.wd.nFree = 0;
271         return (rx_SendData(sd->sd.wd.call, &q));
272     }
273     return 0;
274 }
275
276 void
277 rx_stream_FinishWrite(sd)
278      struct rx_stream *sd;
279 {
280     rx_stream_FlushWrite(sd);
281     if (queue_IsNotEmpty(&sd->sd.wd.wq))
282         rx_FreePackets(&sd->sd.wd.wq);
283     sd->sd.wd.nFree = 0;
284 }