2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX stream: the stream I/O layer for RX */
12 This file is now obsolete.
15 #include "../h/types.h"
17 #include "../rx/rx_stream.h"
19 #include "rx_stream.h"
22 void rx_stream_InitRead(sd, call)
26 queue_Init(&sd->sd.rd.rq);
27 queue_Init(&sd->sd.rd.freeTheseQ);
29 sd->sd.rd.call = call;
32 /* Normally called from the macro, rx_stream_Read */
33 int rx_stream_ReadProc(sd, buf, nbytes)
38 int totalBytes = nbytes;
40 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
44 if (queue_IsEmpty(&sd->sd.rd.rq)) {
45 if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) return totalBytes - nbytes;
46 tp = queue_First(&sd->sd.rd.rq, rx_packet);
47 sd->sd.rd.nextByte = rx_DataOf(tp);
48 sd->sd.rd.nLeft = rx_GetDataSize(tp);
50 if (nbytes < sd->sd.rd.nLeft) {
51 sd->sd.rd.nLeft -= nbytes;
52 bcopy(sd->sd.rd.nextByte, buf, nbytes);
53 sd->sd.rd.nextByte += nbytes;
56 bcopy(sd->sd.rd.nextByte, buf, sd->sd.rd.nLeft);
57 buf += sd->sd.rd.nLeft;
58 nbytes -= sd->sd.rd.nLeft;
59 tp = queue_First(&sd->sd.rd.rq, rx_packet);
62 if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
63 tp = queue_First(&sd->sd.rd.rq, rx_packet);
64 sd->sd.rd.nextByte = rx_DataOf(tp);
65 sd->sd.rd.nLeft = rx_GetDataSize(tp);
71 int rx_stream_ReadIov(sd, iovlenp, iov, nbytes)
77 int totalBytes = nbytes;
79 int maxiovlen = *iovlenp;
81 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
83 while (nbytes && iovIndex < maxiovlen) {
85 if (queue_IsEmpty(&sd->sd.rd.rq)) {
86 if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) break;
87 tp = queue_First(&sd->sd.rd.rq, rx_packet);
88 sd->sd.rd.nextByte = rx_DataOf(tp);
89 sd->sd.rd.nLeft = rx_GetDataSize(tp);
91 if (nbytes < sd->sd.rd.nLeft) {
92 sd->sd.rd.nLeft -= nbytes;
93 iov[iovIndex].iov_base = sd->sd.rd.nextByte;
94 iov[iovIndex++].iov_len = nbytes;
95 sd->sd.rd.nextByte += nbytes;
99 iov[iovIndex].iov_base = sd->sd.rd.nextByte;
100 iov[iovIndex++].iov_len = sd->sd.rd.nLeft;
101 nbytes -= sd->sd.rd.nLeft;
102 tp = queue_First(&sd->sd.rd.rq, rx_packet);
103 queue_MovePrepend(&sd->sd.rd.freeTheseQ, tp);
104 if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
105 tp = queue_First(&sd->sd.rd.rq, rx_packet);
106 sd->sd.rd.nextByte = rx_DataOf(tp);
107 sd->sd.rd.nLeft = rx_GetDataSize(tp);
111 return totalBytes - nbytes;
114 void rx_stream_FinishRead(sd)
115 struct rx_stream *sd;
117 if (queue_IsNotEmpty(&sd->sd.rd.rq)) rx_FreePackets(&sd->sd.rd.rq);
118 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
121 void rx_stream_InitWrite(sd, call)
122 struct rx_stream *sd;
123 struct rx_call *call;
125 sd->sd.wd.freePtr = 0;
127 sd->sd.wd.call = call;
128 queue_Init(&sd->sd.wd.wq);
129 sd->sd.wd.packetSize = rx_MaxDataSize(rx_ConnectionOf(call));
132 /* The real write procedure (rx_stream_Write is a macro) */
133 int rx_stream_WriteProc(sd, buf, nbytes)
134 struct rx_stream *sd;
138 int totalBytes = nbytes;
140 if (queue_IsEmpty(&sd->sd.wd.wq)) {
141 if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, 1, RX_WAIT)) break;
142 sd->sd.wd.nFree = sd->sd.wd.packetSize;
143 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
145 if (nbytes < sd->sd.wd.nFree) {
146 if (buf) bcopy(buf, sd->sd.wd.freePtr, nbytes), buf += nbytes;
147 sd->sd.wd.nFree -= nbytes;
148 sd->sd.wd.freePtr += nbytes;
152 if (buf) bcopy(buf, sd->sd.wd.freePtr, sd->sd.wd.nFree), buf += sd->sd.wd.nFree;
153 nbytes -= sd->sd.wd.nFree;
155 if (rx_stream_FlushWrite(sd)) break;
157 return totalBytes - nbytes;
160 /* Returns nbytes allocated */
161 int rx_stream_AllocIov(sd, iovlenp, iovp, nbytes)
162 struct rx_stream *sd;
167 struct rx_packet *p, *nxp;
168 int maxiovlen = *iovlenp;
174 for (nFree = 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet)) {
175 nFree += sd->sd.wd.packetSize;
177 if (sd->sd.wd.nFree) nFree = nFree - sd->sd.wd.packetSize + sd->sd.wd.nFree;
179 /* Allocate the number of bytes requested, or an even portion */
180 for (totalBytes = nbytes; ; totalBytes >>= 1) {
181 /* Compute number of additional buffers, beyond current partial buffer, required */
183 nbytes = totalBytes - nFree;
188 niovs = nPackets = (nbytes + sd->sd.wd.packetSize - 1)/sd->sd.wd.packetSize;
190 if (niovs <= maxiovlen) {
191 if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, nPackets, 1)) break;
193 /* Since there weren't any packets allocated previously, setup new description for first packet */
194 sd->sd.wd.nFree = sd->sd.wd.packetSize;
195 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
201 /* Create an iovec to describe the set of allocated buffers */
202 for (nFree = sd->sd.wd.nFree, nbytes = totalBytes, iovIndex = 0,
203 queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet),
204 iovIndex++, nFree = sd->sd.wd.packetSize) {
205 if (iovIndex >= niovs) break;
206 iovp[iovIndex].iov_base = rx_DataOf(p) + sd->sd.wd.packetSize - nFree;
207 if (nbytes <= nFree) {
208 iovp[iovIndex].iov_len = nbytes;
213 iovp[iovIndex].iov_len = nFree;
216 return totalBytes - nbytes;
219 /* Wrong, wrong, wrong */
220 rx_stream_FlushWrite(sd)
221 struct rx_stream *sd;
225 if (queue_IsNotEmpty(&sd->sd.wd.wq) && sd->sd.wd.nFree < sd->sd.wd.packetSize) {
226 struct rx_packet *tp = queue_First(&sd->sd.wd.wq, rx_packet);
227 queue_MoveAppend(&q, tp);
228 rx_SetDataSize(queue_First(&q, rx_packet), sd->sd.wd.packetSize - sd->sd.wd.nFree);
229 if (queue_IsNotEmpty(&sd->sd.wd.wq)) {
230 sd->sd.wd.nFree = sd->sd.wd.packetSize;
231 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
232 } else sd->sd.wd.nFree = 0;
233 return (rx_SendData(sd->sd.wd.call, &q));
238 void rx_stream_FinishWrite(sd)
239 struct rx_stream *sd;
241 rx_stream_FlushWrite(sd);
242 if (queue_IsNotEmpty(&sd->sd.wd.wq)) rx_FreePackets(&sd->sd.wd.wq);