rx-warning-cleanup-and-afsconfig-20010605
[openafs.git] / src / rx / rx_stream.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /* RX stream:  the stream I/O layer for RX */
11
12 This file is now obsolete.
13
14 #ifdef  KERNEL
15 #include "../h/types.h"
16 #include "../h/uio.h"
17 #include "../rx/rx_stream.h"
18 #else /* KERNEL */
19 #include "rx_stream.h"
20 #endif /* KERNEL */
21
22 void rx_stream_InitRead(sd, call)
23     struct rx_stream *sd;
24     struct rx_call *call;
25 {
26     queue_Init(&sd->sd.rd.rq);
27     queue_Init(&sd->sd.rd.freeTheseQ);
28     sd->sd.rd.nLeft = 0;
29     sd->sd.rd.call = call;
30 }
31
32 /* Normally called from the macro, rx_stream_Read */
33 int rx_stream_ReadProc(sd, buf, nbytes)
34     struct rx_stream *sd;
35     char *buf;
36     int nbytes;
37 {
38     int totalBytes = nbytes;
39
40     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
41
42     while (nbytes) {
43         struct rx_packet *tp;
44         if (queue_IsEmpty(&sd->sd.rd.rq)) {
45             if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) return totalBytes - nbytes;
46             tp = queue_First(&sd->sd.rd.rq, rx_packet);
47             sd->sd.rd.nextByte = rx_DataOf(tp);
48             sd->sd.rd.nLeft = rx_GetDataSize(tp);
49         }
50         if (nbytes < sd->sd.rd.nLeft) {
51             sd->sd.rd.nLeft -= nbytes;
52             bcopy(sd->sd.rd.nextByte, buf, nbytes);
53             sd->sd.rd.nextByte += nbytes;
54             return totalBytes;
55         }
56         bcopy(sd->sd.rd.nextByte, buf, sd->sd.rd.nLeft);
57         buf += sd->sd.rd.nLeft;
58         nbytes -= sd->sd.rd.nLeft;
59         tp = queue_First(&sd->sd.rd.rq, rx_packet);
60         queue_Remove(tp);
61         rx_FreePacket(tp);
62         if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
63             tp = queue_First(&sd->sd.rd.rq, rx_packet);
64             sd->sd.rd.nextByte = rx_DataOf(tp);
65             sd->sd.rd.nLeft = rx_GetDataSize(tp);
66         }
67     }
68     return totalBytes;
69 }
70
71 int rx_stream_ReadIov(sd, iovlenp, iov, nbytes)
72     struct rx_stream *sd;
73     int *iovlenp;
74     struct iovec *iov;
75     int nbytes;
76 {
77     int totalBytes = nbytes;
78     int iovIndex = 0;
79     int maxiovlen = *iovlenp;
80
81     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
82
83     while (nbytes && iovIndex < maxiovlen) {
84         struct rx_packet *tp;
85         if (queue_IsEmpty(&sd->sd.rd.rq)) {
86             if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) break;
87             tp = queue_First(&sd->sd.rd.rq, rx_packet);
88             sd->sd.rd.nextByte = rx_DataOf(tp);
89             sd->sd.rd.nLeft = rx_GetDataSize(tp);
90         }
91         if (nbytes < sd->sd.rd.nLeft) {
92             sd->sd.rd.nLeft -= nbytes;
93             iov[iovIndex].iov_base = sd->sd.rd.nextByte;
94             iov[iovIndex++].iov_len = nbytes;
95             sd->sd.rd.nextByte += nbytes;
96             nbytes = 0;
97             break;
98         }
99         iov[iovIndex].iov_base = sd->sd.rd.nextByte;
100         iov[iovIndex++].iov_len = sd->sd.rd.nLeft;
101         nbytes -= sd->sd.rd.nLeft;
102         tp = queue_First(&sd->sd.rd.rq, rx_packet);
103         queue_MovePrepend(&sd->sd.rd.freeTheseQ, tp);
104         if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
105             tp = queue_First(&sd->sd.rd.rq, rx_packet);
106             sd->sd.rd.nextByte = rx_DataOf(tp);
107             sd->sd.rd.nLeft = rx_GetDataSize(tp);
108         }
109     }
110     *iovlenp = iovIndex;
111     return totalBytes - nbytes;
112 }
113
114 void rx_stream_FinishRead(sd)
115     struct rx_stream *sd;
116 {
117     if (queue_IsNotEmpty(&sd->sd.rd.rq)) rx_FreePackets(&sd->sd.rd.rq);
118     if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
119 }
120
121 void rx_stream_InitWrite(sd, call)
122     struct rx_stream *sd;
123     struct rx_call *call;
124 {
125     sd->sd.wd.freePtr = 0;
126     sd->sd.wd.nFree = 0;
127     sd->sd.wd.call = call;
128     queue_Init(&sd->sd.wd.wq);
129     sd->sd.wd.packetSize = rx_MaxDataSize(rx_ConnectionOf(call));
130 }
131
132 /* The real write procedure (rx_stream_Write is a macro) */
133 int rx_stream_WriteProc(sd, buf, nbytes)
134     struct rx_stream *sd;
135     char *buf;
136     int nbytes;
137 {
138     int totalBytes = nbytes;
139     while (nbytes) {
140         if (queue_IsEmpty(&sd->sd.wd.wq)) {
141             if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, 1, RX_WAIT)) break;
142             sd->sd.wd.nFree = sd->sd.wd.packetSize;
143             sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
144         }
145         if (nbytes < sd->sd.wd.nFree) {
146             if (buf) bcopy(buf, sd->sd.wd.freePtr, nbytes), buf += nbytes;
147             sd->sd.wd.nFree -= nbytes;
148             sd->sd.wd.freePtr += nbytes;
149             nbytes = 0;
150             break;
151         }
152         if (buf) bcopy(buf, sd->sd.wd.freePtr, sd->sd.wd.nFree), buf += sd->sd.wd.nFree;
153         nbytes -= sd->sd.wd.nFree;      
154         sd->sd.wd.nFree = 0;
155         if (rx_stream_FlushWrite(sd)) break;
156     }
157     return totalBytes - nbytes;
158 }
159
160 /* Returns nbytes allocated */
161 int rx_stream_AllocIov(sd, iovlenp, iovp, nbytes)
162     struct rx_stream *sd;
163     int *iovlenp;
164     struct iovec *iovp;
165     int nbytes;
166 {
167     struct rx_packet *p, *nxp;
168     int maxiovlen = *iovlenp;
169     int iovIndex;
170     int totalBytes;
171     int nFree;
172     int niovs;
173
174     for (nFree = 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet)) {
175         nFree += sd->sd.wd.packetSize;
176     }
177     if (sd->sd.wd.nFree) nFree = nFree - sd->sd.wd.packetSize + sd->sd.wd.nFree;
178
179     /* Allocate the number of bytes requested, or an even portion */
180     for (totalBytes = nbytes; ; totalBytes >>= 1) {
181         /* Compute number of additional buffers, beyond current partial buffer, required */
182         int nPackets;
183         nbytes = totalBytes - nFree;
184         if (nbytes < 0) {
185             niovs = 1;
186             break;
187         }
188         niovs = nPackets = (nbytes + sd->sd.wd.packetSize - 1)/sd->sd.wd.packetSize;
189         if (nFree) niovs++;
190         if (niovs <= maxiovlen) {
191             if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, nPackets, 1)) break;
192             if (nFree == 0) {
193                 /* Since there weren't any packets allocated previously, setup new description for first packet */
194                 sd->sd.wd.nFree = sd->sd.wd.packetSize;
195                 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
196             }
197             break;
198         }           
199     }
200    
201     /* Create an iovec to describe the set of allocated buffers */
202     for (nFree = sd->sd.wd.nFree, nbytes = totalBytes, iovIndex = 0,
203           queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet),
204           iovIndex++, nFree = sd->sd.wd.packetSize) {
205         if (iovIndex >= niovs) break;
206         iovp[iovIndex].iov_base = rx_DataOf(p) + sd->sd.wd.packetSize - nFree;
207         if (nbytes <= nFree) {
208             iovp[iovIndex].iov_len = nbytes;
209             nbytes = 0;
210             break;
211         }
212         nbytes -= nFree;
213         iovp[iovIndex].iov_len = nFree;
214     }
215     *iovlenp = niovs;
216     return totalBytes - nbytes;
217 }
218
219 /* Wrong, wrong, wrong */
220 rx_stream_FlushWrite(sd)
221     struct rx_stream *sd;
222 {
223     struct rx_queue q;
224     queue_Init(&q);
225     if (queue_IsNotEmpty(&sd->sd.wd.wq) && sd->sd.wd.nFree < sd->sd.wd.packetSize) {
226         struct rx_packet *tp = queue_First(&sd->sd.wd.wq, rx_packet);
227         queue_MoveAppend(&q, tp);
228         rx_SetDataSize(queue_First(&q, rx_packet), sd->sd.wd.packetSize - sd->sd.wd.nFree);
229         if (queue_IsNotEmpty(&sd->sd.wd.wq)) {
230             sd->sd.wd.nFree = sd->sd.wd.packetSize;
231             sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
232         } else sd->sd.wd.nFree = 0;
233         return (rx_SendData(sd->sd.wd.call, &q));
234     }
235     return 0;
236 }
237
238 void rx_stream_FinishWrite(sd)
239     struct rx_stream *sd;
240 {
241     rx_stream_FlushWrite(sd);
242     if (queue_IsNotEmpty(&sd->sd.wd.wq)) rx_FreePackets(&sd->sd.wd.wq);
243     sd->sd.wd.nFree = 0;
244 }