2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX stream: the stream I/O layer for RX */
12 This file is now obsolete.
13 #include <afsconfig.h>
15 #include "afs/param.h"
17 #include <afs/param.h>
26 #include "rx/rx_stream.h"
28 #include "rx_stream.h"
32 rx_stream_InitRead(sd, call)
36 queue_Init(&sd->sd.rd.rq);
37 queue_Init(&sd->sd.rd.freeTheseQ);
39 sd->sd.rd.call = call;
42 /* Normally called from the macro, rx_stream_Read */
44 rx_stream_ReadProc(sd, buf, nbytes)
49 int totalBytes = nbytes;
51 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ))
52 rx_FreePackets(&sd->sd.rd.freeTheseQ);
56 if (queue_IsEmpty(&sd->sd.rd.rq)) {
57 if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq))
58 return totalBytes - nbytes;
59 tp = queue_First(&sd->sd.rd.rq, rx_packet);
60 sd->sd.rd.nextByte = rx_DataOf(tp);
61 sd->sd.rd.nLeft = rx_GetDataSize(tp);
63 if (nbytes < sd->sd.rd.nLeft) {
64 sd->sd.rd.nLeft -= nbytes;
65 memcpy(buf, sd->sd.rd.nextByte, nbytes);
66 sd->sd.rd.nextByte += nbytes;
69 memcpy(buf, sd->sd.rd.nextByte, sd->sd.rd.nLeft);
70 buf += sd->sd.rd.nLeft;
71 nbytes -= sd->sd.rd.nLeft;
72 tp = queue_First(&sd->sd.rd.rq, rx_packet);
75 if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
76 tp = queue_First(&sd->sd.rd.rq, rx_packet);
77 sd->sd.rd.nextByte = rx_DataOf(tp);
78 sd->sd.rd.nLeft = rx_GetDataSize(tp);
85 rx_stream_ReadIov(sd, iovlenp, iov, nbytes)
91 int totalBytes = nbytes;
93 int maxiovlen = *iovlenp;
95 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ))
96 rx_FreePackets(&sd->sd.rd.freeTheseQ);
98 while (nbytes && iovIndex < maxiovlen) {
100 if (queue_IsEmpty(&sd->sd.rd.rq)) {
101 if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq))
103 tp = queue_First(&sd->sd.rd.rq, rx_packet);
104 sd->sd.rd.nextByte = rx_DataOf(tp);
105 sd->sd.rd.nLeft = rx_GetDataSize(tp);
107 if (nbytes < sd->sd.rd.nLeft) {
108 sd->sd.rd.nLeft -= nbytes;
109 iov[iovIndex].iov_base = sd->sd.rd.nextByte;
110 iov[iovIndex++].iov_len = nbytes;
111 sd->sd.rd.nextByte += nbytes;
115 iov[iovIndex].iov_base = sd->sd.rd.nextByte;
116 iov[iovIndex++].iov_len = sd->sd.rd.nLeft;
117 nbytes -= sd->sd.rd.nLeft;
118 tp = queue_First(&sd->sd.rd.rq, rx_packet);
119 queue_MovePrepend(&sd->sd.rd.freeTheseQ, tp);
120 if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
121 tp = queue_First(&sd->sd.rd.rq, rx_packet);
122 sd->sd.rd.nextByte = rx_DataOf(tp);
123 sd->sd.rd.nLeft = rx_GetDataSize(tp);
127 return totalBytes - nbytes;
131 rx_stream_FinishRead(sd)
132 struct rx_stream *sd;
134 if (queue_IsNotEmpty(&sd->sd.rd.rq))
135 rx_FreePackets(&sd->sd.rd.rq);
136 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ))
137 rx_FreePackets(&sd->sd.rd.freeTheseQ);
141 rx_stream_InitWrite(sd, call)
142 struct rx_stream *sd;
143 struct rx_call *call;
145 sd->sd.wd.freePtr = 0;
147 sd->sd.wd.call = call;
148 queue_Init(&sd->sd.wd.wq);
149 sd->sd.wd.packetSize = rx_MaxDataSize(rx_ConnectionOf(call));
152 /* The real write procedure (rx_stream_Write is a macro) */
154 rx_stream_WriteProc(sd, buf, nbytes)
155 struct rx_stream *sd;
159 int totalBytes = nbytes;
161 if (queue_IsEmpty(&sd->sd.wd.wq)) {
162 if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, 1, RX_WAIT))
164 sd->sd.wd.nFree = sd->sd.wd.packetSize;
166 rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
168 if (nbytes < sd->sd.wd.nFree) {
170 memcpy(sd->sd.wd.freePtr, buf, nbytes), buf += nbytes;
171 sd->sd.wd.nFree -= nbytes;
172 sd->sd.wd.freePtr += nbytes;
177 memcpy(sd->sd.wd.freePtr, buf, sd->sd.wd.nFree), buf +=
179 nbytes -= sd->sd.wd.nFree;
181 if (rx_stream_FlushWrite(sd))
184 return totalBytes - nbytes;
187 /* Returns nbytes allocated */
189 rx_stream_AllocIov(sd, iovlenp, iovp, nbytes)
190 struct rx_stream *sd;
195 struct rx_packet *p, *nxp;
196 int maxiovlen = *iovlenp;
202 for (nFree = 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet)) {
203 nFree += sd->sd.wd.packetSize;
206 nFree = nFree - sd->sd.wd.packetSize + sd->sd.wd.nFree;
208 /* Allocate the number of bytes requested, or an even portion */
209 for (totalBytes = nbytes;; totalBytes >>= 1) {
210 /* Compute number of additional buffers, beyond current partial buffer, required */
212 nbytes = totalBytes - nFree;
218 (nbytes + sd->sd.wd.packetSize - 1) / sd->sd.wd.packetSize;
221 if (niovs <= maxiovlen) {
222 if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, nPackets, 1))
225 /* Since there weren't any packets allocated previously, setup new description for first packet */
226 sd->sd.wd.nFree = sd->sd.wd.packetSize;
228 rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
234 /* Create an iovec to describe the set of allocated buffers */
235 for (nFree = sd->sd.wd.nFree, nbytes = totalBytes, iovIndex =
236 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet), iovIndex++, nFree =
237 sd->sd.wd.packetSize) {
238 if (iovIndex >= niovs)
240 iovp[iovIndex].iov_base = rx_DataOf(p) + sd->sd.wd.packetSize - nFree;
241 if (nbytes <= nFree) {
242 iovp[iovIndex].iov_len = nbytes;
247 iovp[iovIndex].iov_len = nFree;
250 return totalBytes - nbytes;
253 /* Wrong, wrong, wrong */
254 rx_stream_FlushWrite(sd)
255 struct rx_stream *sd;
259 if (queue_IsNotEmpty(&sd->sd.wd.wq)
260 && sd->sd.wd.nFree < sd->sd.wd.packetSize) {
261 struct rx_packet *tp = queue_First(&sd->sd.wd.wq, rx_packet);
262 queue_MoveAppend(&q, tp);
263 rx_SetDataSize(queue_First(&q, rx_packet),
264 sd->sd.wd.packetSize - sd->sd.wd.nFree);
265 if (queue_IsNotEmpty(&sd->sd.wd.wq)) {
266 sd->sd.wd.nFree = sd->sd.wd.packetSize;
268 rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
271 return (rx_SendData(sd->sd.wd.call, &q));
277 rx_stream_FinishWrite(sd)
278 struct rx_stream *sd;
280 rx_stream_FlushWrite(sd);
281 if (queue_IsNotEmpty(&sd->sd.wd.wq))
282 rx_FreePackets(&sd->sd.wd.wq);