6.10. Streams¶
6.10.1. Utility Functions¶
-
pyslet.streams.
io_blocked
(err)¶ Returns True if IO operation is blocked
- err
- An IOError exception (or similar object with errno attribute).
Bear in mind that EAGAIN and EWOULDBLOCK are not necessarily the same value and that when running under windows WSAEWOULDBLOCK may be raised instead. This function removes this complexity making it easier to write cross platform non-blocking IO code.
-
pyslet.streams.
io_timedout
(err)¶ Returns True if an IO operation timed out
- err
- An IOError exception (or similar object with errno attribute).
Tests for ETIMEDOUT and when running under windows WSAETIMEDOUT too.
6.10.2. Stream Classes¶
-
class
pyslet.streams.
Pipe
(bsize=8192, rblocking=True, wblocking=True, timeout=None, name=None)¶ Bases:
io.RawIOBase
Buffered pipe for inter-thread communication
The purpose of this class is to provide a thread-safe buffer to use for communicating between two parts of an application that support non-blocking io while reducing to a minimum the amount of byte-copying that takes place.
Essentially, write calls with immutable byte strings are simply cached without copying (and always succeed) enabling them to be passed directly through to the corresponding read operation in streaming situations. However, to improve flow control a canwrite method is provided to help writers moderate the amount of data that has to be held in the buffer:
# data producer thread while busy: wmax = p.canwrite() if wmax: data = get_at_most_max_bytes(wmax) p.write(data) else: # do something else while the pipe is blocked spin_the_beach_ball()
- bsize
- The buffer size, this is used as a guide only. When writing immutable bytes objects to the pipe the buffer size may be exceeded as these can simply be cached and returned directly to the reader more efficiently than slicing them up just to adhere to the buffer size. However, if the buffer already contains bsize bytes all calls to write will block or return None. Defaults to io.DEFAULT_BUFFER_SIZE.
- rblocking
- Controls the blocking behaviour of the read end of this pipe. True indicates reads may block waiting for data, False that they will not and read may return None. Defaults to True.
- wblocking
- Controls the blocking behaviour of the write end of the this pipe. True indicates writes may block waiting for data, False that they will not and write may return None. Defaults to True.
- timeout
- The number of seconds before a blocked read or write operation will timeout. Defaults to None, which indicates ‘wait forever’. A value of 0 is not the same as placing both ends of the pipe in non-blocking mode (though the effect may be similar).
- name
- An optional name to use for this pipe, the name is used when raising errors and when logging
-
name
= None¶ the name of the pipe
-
close
()¶ closed the Pipe
This implementation works on a ‘reader closes’ principle. The writer should simply write the EOF marker to the Pipe (see
write_eof()
.If the buffer still contains data when it is closed a warning is logged.
-
readable
()¶ Pipe’s are always readable
-
writable
()¶ Pipe’s are always writeable
-
readblocking
()¶ Returns True if reads may block
-
set_readblocking
(blocking=True)¶ Sets the readblocking mode of the Pipe.
- blocking
- A boolean, defaults to True indicating that reads may block.
-
writeblocking
()¶ Returns True if writes may block
-
set_writeblocking
(blocking=True)¶ Sets the writeblocking mode of the Pipe.
- blocking
- A boolean, defaults to True indicating that writes may block.
-
empty
()¶ Returns True if the buffer is currently empty
-
buffered
()¶ Returns the number of buffered bytes in the Pipe
-
canwrite
()¶ Returns the number of bytes that can be written.
This value is the number of bytes that can be written in a single non-blocking call to write. 0 indicates that the pipe’s buffer is full. A call to write may accept more than this but the next call to write will always accept at least this many.
This class is fully multithreaded so in situations where there are multiple threads writing this call is of limited use.
If called on a pipe that has had the EOF mark written then IOError is raised.
-
set_rflag
(rflag)¶ Sets an Event triggered when a reader is detected.
- rflag
- An Event instance from the threading module.
The event will be set each time the Pipe is read. The flag may be cleared at any time by the caller but as a convenience it will always be cleared when
canwrite()
returns 0.The purpose of this flag is to allow a writer to use a custom event to monitor whether or not the Pipe is ready to be written. If the Pipe is full then the writer will want to wait on this flag until a reader appears before attempting to write again. Therefore, when canwrite indicates that the buffer is full it makes sense that the flag is also cleared.
If the pipe is closed then the event is set as a warning that the pipe will never be read. (The next call to write will then fail.)
-
write_wait
(timeout=None)¶ Waits for the pipe to become writable or raises IOError
- timeout
- Defaults to None: wait forever. Otherwise the maximum number of seconds to wait for.
-
flush_wait
(timeout=None)¶ Waits for the pipe to become empty or raises IOError
- timeout
- Defaults to None: wait forever. Otherwise the maximum number of seconds to wait for.
-
canread
()¶ Returns True if the next call to read will not block.
False indicates that the pipe’s buffer is empty and that a call to read will block.
Note that if the buffer is empty but the EOF signal has been given with
write_eof()
then canread returns True! The next call to read will not block but return an empty string indicating the EOF.
-
read_wait
(timeout=None)¶ Waits for the pipe to become readable or raises IOError
- timeout
- Defaults to None: wait forever. Otherwise the maximum number of seconds to wait for.
-
write
(b)¶ writes data to the pipe
The implementation varies depending on the type of b. If b is an immutable bytes object then it is accepted even if this overfills the internal buffer (as it is not actually copied). If b is a bytearray then data is copied, up to the maximum buffer size.
-
write_eof
()¶ Writes the EOF flag to the Pipe
Any waiting readers are notified and will wake to process the Pipe. After this call the Pipe will not accept any more data.
-
flush
()¶ flushes the Pipe
The intention of flush to push any written data out to the destination, in this case the thread that is reading the data.
In write-blocking mode this call will wait until the buffer is empty, though if the reader is idle for more than
timeout
seconds then it will raise IOError.In non-blocking mode it simple raises IOError with EWOULDBLOCK if the buffer is not empty.
Given that flush is called automatically by
close()
for classes that inherit from the base io classes our implementation of close discards the buffer rather than risk an exception.
-
readall
()¶ Overridden to take care of non-blocking behaviour.
Warning: readall always blocks until it has read EOF, regardless of the rblocking status of the Pipe.
The problem is that, if the Pipe is set for non-blocking reads then we seem to have the choice of returning a partial read (and failing to signal that some of the data is still in the pipe) or raising an error and losing the partially read data.
Perhaps ideally we’d return None indicating that we are blocked from reading the entire stream but this isn’t listed as a possible return result for io.RawIOBase.readall and it would be tricky to implement anyway as we still need to deal with partially read data.
Ultimately the safe choices are raise an error if called on a non-blocking Pipe or simply block. We do the latter on the basis that anyone calling readall clearly intends to wait.
For a deep discussion of the issues around non-blocking behaviour see http://bugs.python.org/issue13322
-
readmatch
(match='\r\n')¶ Read until a byte string is matched
- match
- A binary string, defaults to CRLF.
This operation will block if the string is not matched unless the buffer becomes full without a match, in which case IOError is raised with code ENOBUFS.
-
read
(nbytes=-1)¶ read data from the pipe
May return fewer than nbytes if the result can be returned without copying data. Otherwise
readinto()
is used.
-
readinto
(b)¶ Reads data from the Pipe into a bytearray.
Returns the number of bytes read. 0 indicates EOF, None indicates an operation that would block in a Pipe that is non-blocking for read operations. May return fewer bytes than would fit into the bytearray as it returns as soon as it has at least some data.
-
class
pyslet.streams.
BufferedStreamWrapper
(src, buffsize=8192)¶ Bases:
io.RawIOBase
A buffered wrapper for file-like objects.
- src
- A file-like object, we only require a read method
- buffsize
- The maximum size of the internal buffer
On construction the src is read until an end of file condition is encountered or until buffsize bytes have been read. EOF is signaled by an empty string returned by src’s read method. Instances then behave like readable streams transparently reading from the buffer or from the remainder of the src as applicable.
Instances behave differently depending on whether or not the entire src is buffered. If it is they become seekable and set a value for the length attribute. Otherwise they are not seekable and the length attribute is None.
If src is a non-blocking data source and it becomes blocked, indicated by read returning None rather than an empty string, then the instance reverts to non-seekable behaviour.
-
peek
(nbytes)¶ Read up to nbytes without advancing the position
If the stream is not seekable and we have read past the end of the internal buffer then an empty string will be returned.