WvStreams
wvencoderstream.cc
1 /*
2  * Worldvisions Tunnel Vision Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * WvEncoderStream chains a series of encoders on the input and
6  * output ports of the underlying stream to effect on-the-fly data
7  * transformations.
8  */
9 #include "wvencoderstream.h"
10 
12 {
13  is_closing = false;
14  min_readsize = 0;
15 }
16 
17 
18 WvEncoderStream::~WvEncoderStream()
19 {
20  close();
21 }
22 
23 
25 {
26  // fprintf(stderr, "Encoderstream close!\n");
27 
28  // we want to finish the encoders even if !isok() since we
29  // might just have encountered an EOF condition, and we want
30  // to ensure that the remaining data is processed, but this
31  // might cause recursion if the encoders set a new error condition
32  if (is_closing) return;
33  is_closing = true;
34 
35  // finish encoders
36  finish_read();
37  finish_write();
38 
39  // flush write chain and close the stream
41 }
42 
43 
45 {
46  //fprintf(stderr, "encoderstream isok: %d %p %d %d\n",
47  // WvStream::isok(), cloned, cloned->isok(), cloned->geterr());
48 
49  // handle encoder error conditions
50  if (!WvStream::isok())
51  return false;
52 
53  // handle substream error conditions
54  // we don't check substream isok() because that is handled
55  // during read operations to distinguish EOF from errors
56  if (!cloned || cloned->geterr() != 0)
57  return false;
58 
59  return true;
60 }
61 
62 
63 bool WvEncoderStream::flush_internal(time_t msec_timeout)
64 {
65  flush_write();
66  return WvStreamClone::flush_internal(msec_timeout);
67 }
68 
69 
71 {
72  bool success = readchain.flush(readinbuf, readoutbuf);
73  checkreadisok();
74  inbuf.merge(readoutbuf);
75  return success;
76 }
77 
78 
80 {
81  bool success = push(true /*flush*/, false /*finish*/);
82  return success;
83 }
84 
85 
87 {
88  bool success = readchain.flush(readinbuf, readoutbuf);
89  if (!readchain.finish(readoutbuf))
90  success = false;
91  checkreadisok();
92  inbuf.merge(readoutbuf);
93  // noread();
94  return success;
95 }
96 
97 
99 {
100  return push(true /*flush*/, true /*finish*/);
101 }
102 
103 
104 void WvEncoderStream::pull(size_t size)
105 {
106  // fprintf(stderr, "encoder pull %d\n", size);
107 
108  // pull a chunk of unencoded input
109  bool finish = false;
110  if (cloned)
111  {
112  if (size != 0)
113  cloned->read(readinbuf, size);
114  if (!cloned->isok())
115  finish = true; // underlying stream hit EOF or error
116  }
117 
118  // deal with any encoders that have been added recently
119  WvDynBuf tmpbuf;
120  tmpbuf.merge(readoutbuf);
121  readchain.continue_encode(tmpbuf, readoutbuf);
122 
123  // apenwarr 2004/11/06: always flush on read, because otherwise there's
124  // no clear way to decide when we need to flush. Anyway, most "decoders"
125  // (the kind of thing you'd put in the readchain) don't care whether you
126  // flush or not.
127  readchain.encode(readinbuf, readoutbuf, true);
128  //readchain.encode(readinbuf, readoutbuf, finish /*flush*/);
129  if (finish)
130  {
131  readchain.finish(readoutbuf);
132  // if (readoutbuf.used() == 0 && inbuf.used() == 0)
133  // noread();
134  close();
135  // otherwise defer EOF until the buffered data has been read
136  }
137  else if (!readoutbuf.used() && !inbuf.used() && readchain.isfinished())
138  {
139  // only get EOF when the chain is finished and we have no
140  // more data
141  //noread();
142  close();
143  }
144  checkreadisok();
145 }
146 
147 
148 bool WvEncoderStream::push(bool flush, bool finish)
149 {
150  WvDynBuf writeoutbuf;
151 
152  // encode the output
153  if (flush)
154  writeinbuf.merge(outbuf);
155  bool success = writechain.encode(writeinbuf, writeoutbuf, flush);
156  if (finish)
157  if (!writechain.finish(writeoutbuf))
158  success = false;
159  checkwriteisok();
160 
161 #if 0
162  // push encoded output to cloned stream
163  size_t size = writeoutbuf.used();
164  if (size != 0)
165  {
166  const unsigned char *writeout = writeoutbuf.get(size);
167  size_t len = WvStreamClone::uwrite(writeout, size);
168  writeoutbuf.unget(size - len);
169  }
170 #endif
171  if (cloned)
172  cloned->write(writeoutbuf, writeoutbuf.used());
173 
174  return success;
175 }
176 
177 
178 size_t WvEncoderStream::uread(void *buf, size_t size)
179 {
180  // fprintf(stderr, "encstream::uread(%d)\n", size);
181  if (size && readoutbuf.used() == 0)
182  pull(min_readsize > size ? min_readsize : size);
183  size_t avail = readoutbuf.used();
184  if (size > avail)
185  size = avail;
186  readoutbuf.move(buf, size);
187  return size;
188 }
189 
190 
191 size_t WvEncoderStream::uwrite(const void *buf, size_t size)
192 {
193  writeinbuf.put(buf, size);
194  push(false /*flush*/, false /*finish*/);
195  return size;
196 }
197 
198 
200 {
202 
203  if (si.wants.readable && readoutbuf.used() != 0)
204  si.msec_timeout = 0;
205 }
206 
207 
209 {
210  bool sure = false;
211 
212  // if we have buffered input data and we want to check for
213  // readability, then cause a callback to occur that will
214  // hopefully ask us for more data via uread()
215  if (si.wants.readable && readoutbuf.used() != 0)
216  {
217  pull(0); // try an encode
218  if (readoutbuf.used() != 0)
219  sure = true;
220  }
221 
222  // try to push pending encoded output to cloned stream
223  // outbuf_delayed_flush condition already handled by uwrite()
224  push(false /*flush*/, false /*finish*/);
225 
226  // consult the underlying stream
227  sure |= WvStreamClone::post_select(si);
228 
229  return sure;
230 }
231 
232 
233 void WvEncoderStream::checkreadisok()
234 {
235  if (!readchain.isok())
236  {
237  seterr(WvString("read chain: %s", readchain.geterror()));
238  noread();
239  }
240 }
241 
242 
243 void WvEncoderStream::checkwriteisok()
244 {
245  if (!writechain.isok())
246  seterr(WvString("write chain: %s", writechain.geterror()));
247 }