2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 /* RX stream: the stream I/O layer for RX */
12 This file is now obsolete.
14 #include <afsconfig.h>
16 #include "../afs/param.h"
18 #include <afs/param.h>
24 #include "../h/types.h"
26 #include "../rx/rx_stream.h"
28 #include "rx_stream.h"
31 void rx_stream_InitRead(sd, call)
35 queue_Init(&sd->sd.rd.rq);
36 queue_Init(&sd->sd.rd.freeTheseQ);
38 sd->sd.rd.call = call;
41 /* Normally called from the macro, rx_stream_Read */
42 int rx_stream_ReadProc(sd, buf, nbytes)
47 int totalBytes = nbytes;
49 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
53 if (queue_IsEmpty(&sd->sd.rd.rq)) {
54 if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) return totalBytes - nbytes;
55 tp = queue_First(&sd->sd.rd.rq, rx_packet);
56 sd->sd.rd.nextByte = rx_DataOf(tp);
57 sd->sd.rd.nLeft = rx_GetDataSize(tp);
59 if (nbytes < sd->sd.rd.nLeft) {
60 sd->sd.rd.nLeft -= nbytes;
61 bcopy(sd->sd.rd.nextByte, buf, nbytes);
62 sd->sd.rd.nextByte += nbytes;
65 bcopy(sd->sd.rd.nextByte, buf, sd->sd.rd.nLeft);
66 buf += sd->sd.rd.nLeft;
67 nbytes -= sd->sd.rd.nLeft;
68 tp = queue_First(&sd->sd.rd.rq, rx_packet);
71 if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
72 tp = queue_First(&sd->sd.rd.rq, rx_packet);
73 sd->sd.rd.nextByte = rx_DataOf(tp);
74 sd->sd.rd.nLeft = rx_GetDataSize(tp);
80 int rx_stream_ReadIov(sd, iovlenp, iov, nbytes)
86 int totalBytes = nbytes;
88 int maxiovlen = *iovlenp;
90 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
92 while (nbytes && iovIndex < maxiovlen) {
94 if (queue_IsEmpty(&sd->sd.rd.rq)) {
95 if (rx_ReadData(sd->sd.rd.call, &sd->sd.rd.rq)) break;
96 tp = queue_First(&sd->sd.rd.rq, rx_packet);
97 sd->sd.rd.nextByte = rx_DataOf(tp);
98 sd->sd.rd.nLeft = rx_GetDataSize(tp);
100 if (nbytes < sd->sd.rd.nLeft) {
101 sd->sd.rd.nLeft -= nbytes;
102 iov[iovIndex].iov_base = sd->sd.rd.nextByte;
103 iov[iovIndex++].iov_len = nbytes;
104 sd->sd.rd.nextByte += nbytes;
108 iov[iovIndex].iov_base = sd->sd.rd.nextByte;
109 iov[iovIndex++].iov_len = sd->sd.rd.nLeft;
110 nbytes -= sd->sd.rd.nLeft;
111 tp = queue_First(&sd->sd.rd.rq, rx_packet);
112 queue_MovePrepend(&sd->sd.rd.freeTheseQ, tp);
113 if (queue_IsNotEmpty(&sd->sd.rd.rq)) {
114 tp = queue_First(&sd->sd.rd.rq, rx_packet);
115 sd->sd.rd.nextByte = rx_DataOf(tp);
116 sd->sd.rd.nLeft = rx_GetDataSize(tp);
120 return totalBytes - nbytes;
123 void rx_stream_FinishRead(sd)
124 struct rx_stream *sd;
126 if (queue_IsNotEmpty(&sd->sd.rd.rq)) rx_FreePackets(&sd->sd.rd.rq);
127 if (queue_IsNotEmpty(&sd->sd.rd.freeTheseQ)) rx_FreePackets(&sd->sd.rd.freeTheseQ);
130 void rx_stream_InitWrite(sd, call)
131 struct rx_stream *sd;
132 struct rx_call *call;
134 sd->sd.wd.freePtr = 0;
136 sd->sd.wd.call = call;
137 queue_Init(&sd->sd.wd.wq);
138 sd->sd.wd.packetSize = rx_MaxDataSize(rx_ConnectionOf(call));
141 /* The real write procedure (rx_stream_Write is a macro) */
142 int rx_stream_WriteProc(sd, buf, nbytes)
143 struct rx_stream *sd;
147 int totalBytes = nbytes;
149 if (queue_IsEmpty(&sd->sd.wd.wq)) {
150 if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, 1, RX_WAIT)) break;
151 sd->sd.wd.nFree = sd->sd.wd.packetSize;
152 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
154 if (nbytes < sd->sd.wd.nFree) {
155 if (buf) bcopy(buf, sd->sd.wd.freePtr, nbytes), buf += nbytes;
156 sd->sd.wd.nFree -= nbytes;
157 sd->sd.wd.freePtr += nbytes;
161 if (buf) bcopy(buf, sd->sd.wd.freePtr, sd->sd.wd.nFree), buf += sd->sd.wd.nFree;
162 nbytes -= sd->sd.wd.nFree;
164 if (rx_stream_FlushWrite(sd)) break;
166 return totalBytes - nbytes;
169 /* Returns nbytes allocated */
170 int rx_stream_AllocIov(sd, iovlenp, iovp, nbytes)
171 struct rx_stream *sd;
176 struct rx_packet *p, *nxp;
177 int maxiovlen = *iovlenp;
183 for (nFree = 0, queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet)) {
184 nFree += sd->sd.wd.packetSize;
186 if (sd->sd.wd.nFree) nFree = nFree - sd->sd.wd.packetSize + sd->sd.wd.nFree;
188 /* Allocate the number of bytes requested, or an even portion */
189 for (totalBytes = nbytes; ; totalBytes >>= 1) {
190 /* Compute number of additional buffers, beyond current partial buffer, required */
192 nbytes = totalBytes - nFree;
197 niovs = nPackets = (nbytes + sd->sd.wd.packetSize - 1)/sd->sd.wd.packetSize;
199 if (niovs <= maxiovlen) {
200 if (rx_AllocPackets(sd->sd.wd.call, &sd->sd.wd.wq, nPackets, 1)) break;
202 /* Since there weren't any packets allocated previously, setup new description for first packet */
203 sd->sd.wd.nFree = sd->sd.wd.packetSize;
204 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
210 /* Create an iovec to describe the set of allocated buffers */
211 for (nFree = sd->sd.wd.nFree, nbytes = totalBytes, iovIndex = 0,
212 queue_Scan(&sd->sd.wd.wq, p, nxp, rx_packet),
213 iovIndex++, nFree = sd->sd.wd.packetSize) {
214 if (iovIndex >= niovs) break;
215 iovp[iovIndex].iov_base = rx_DataOf(p) + sd->sd.wd.packetSize - nFree;
216 if (nbytes <= nFree) {
217 iovp[iovIndex].iov_len = nbytes;
222 iovp[iovIndex].iov_len = nFree;
225 return totalBytes - nbytes;
228 /* Wrong, wrong, wrong */
229 rx_stream_FlushWrite(sd)
230 struct rx_stream *sd;
234 if (queue_IsNotEmpty(&sd->sd.wd.wq) && sd->sd.wd.nFree < sd->sd.wd.packetSize) {
235 struct rx_packet *tp = queue_First(&sd->sd.wd.wq, rx_packet);
236 queue_MoveAppend(&q, tp);
237 rx_SetDataSize(queue_First(&q, rx_packet), sd->sd.wd.packetSize - sd->sd.wd.nFree);
238 if (queue_IsNotEmpty(&sd->sd.wd.wq)) {
239 sd->sd.wd.nFree = sd->sd.wd.packetSize;
240 sd->sd.wd.freePtr = rx_DataOf(queue_First(&sd->sd.wd.wq, rx_packet));
241 } else sd->sd.wd.nFree = 0;
242 return (rx_SendData(sd->sd.wd.call, &q));
247 void rx_stream_FinishWrite(sd)
248 struct rx_stream *sd;
250 rx_stream_FlushWrite(sd);
251 if (queue_IsNotEmpty(&sd->sd.wd.wq)) rx_FreePackets(&sd->sd.wd.wq);