6.10. Streams

6.10.1. Utility Functions

pyslet.streams.io_blocked(err)

Returns True if IO operation is blocked

err
An IOError exception (or similar object with errno attribute).

Bear in mind that EAGAIN and EWOULDBLOCK are not necessarily the same value and that when running under windows WSAEWOULDBLOCK may be raised instead. This function removes this complexity making it easier to write cross platform non-blocking IO code.

pyslet.streams.io_timedout(err)

Returns True if an IO operation timed out

err
An IOError exception (or similar object with errno attribute).

Tests for ETIMEDOUT and when running under windows WSAETIMEDOUT too.

6.10.2. Stream Classes

class pyslet.streams.Pipe(bsize=8192, rblocking=True, wblocking=True, timeout=None, name=None)

Bases: io.RawIOBase

Buffered pipe for inter-thread communication

The purpose of this class is to provide a thread-safe buffer to use for communicating between two parts of an application that support non-blocking io while reducing to a minimum the amount of byte-copying that takes place.

Essentially, write calls with immutable byte strings are simply cached without copying (and always succeed) enabling them to be passed directly through to the corresponding read operation in streaming situations. However, to improve flow control a canwrite method is provided to help writers moderate the amount of data that has to be held in the buffer:

# data producer thread
while busy:
    wmax = p.canwrite()
    if wmax:
        data = get_at_most_max_bytes(wmax)
        p.write(data)
    else:
        # do something else while the pipe is blocked
        spin_the_beach_ball()
bsize
The buffer size, this is used as a guide only. When writing immutable bytes objects to the pipe the buffer size may be exceeded as these can simply be cached and returned directly to the reader more efficiently than slicing them up just to adhere to the buffer size. However, if the buffer already contains bsize bytes all calls to write will block or return None. Defaults to io.DEFAULT_BUFFER_SIZE.
rblocking
Controls the blocking behaviour of the read end of this pipe. True indicates reads may block waiting for data, False that they will not and read may return None. Defaults to True.
wblocking
Controls the blocking behaviour of the write end of the this pipe. True indicates writes may block waiting for data, False that they will not and write may return None. Defaults to True.
timeout
The number of seconds before a blocked read or write operation will timeout. Defaults to None, which indicates ‘wait forever’. A value of 0 is not the same as placing both ends of the pipe in non-blocking mode (though the effect may be similar).
name
An optional name to use for this pipe, the name is used when raising errors and when logging
name = None

the name of the pipe

close()

closed the Pipe

This implementation works on a ‘reader closes’ principle. The writer should simply write the EOF marker to the Pipe (see write_eof().

If the buffer still contains data when it is closed a warning is logged.

readable()

Pipe’s are always readable

writable()

Pipe’s are always writeable

readblocking()

Returns True if reads may block

set_readblocking(blocking=True)

Sets the readblocking mode of the Pipe.

blocking
A boolean, defaults to True indicating that reads may block.
writeblocking()

Returns True if writes may block

set_writeblocking(blocking=True)

Sets the writeblocking mode of the Pipe.

blocking
A boolean, defaults to True indicating that writes may block.
empty()

Returns True if the buffer is currently empty

buffered()

Returns the number of buffered bytes in the Pipe

canwrite()

Returns the number of bytes that can be written.

This value is the number of bytes that can be written in a single non-blocking call to write. 0 indicates that the pipe’s buffer is full. A call to write may accept more than this but the next call to write will always accept at least this many.

This class is fully multithreaded so in situations where there are multiple threads writing this call is of limited use.

If called on a pipe that has had the EOF mark written then IOError is raised.

set_rflag(rflag)

Sets an Event triggered when a reader is detected.

rflag
An Event instance from the threading module.

The event will be set each time the Pipe is read. The flag may be cleared at any time by the caller but as a convenience it will always be cleared when canwrite() returns 0.

The purpose of this flag is to allow a writer to use a custom event to monitor whether or not the Pipe is ready to be written. If the Pipe is full then the writer will want to wait on this flag until a reader appears before attempting to write again. Therefore, when canwrite indicates that the buffer is full it makes sense that the flag is also cleared.

If the pipe is closed then the event is set as a warning that the pipe will never be read. (The next call to write will then fail.)

write_wait(timeout=None)

Waits for the pipe to become writable or raises IOError

timeout
Defaults to None: wait forever. Otherwise the maximum number of seconds to wait for.
flush_wait(timeout=None)

Waits for the pipe to become empty or raises IOError

timeout
Defaults to None: wait forever. Otherwise the maximum number of seconds to wait for.
canread()

Returns True if the next call to read will not block.

False indicates that the pipe’s buffer is empty and that a call to read will block.

Note that if the buffer is empty but the EOF signal has been given with write_eof() then canread returns True! The next call to read will not block but return an empty string indicating the EOF.

read_wait(timeout=None)

Waits for the pipe to become readable or raises IOError

timeout
Defaults to None: wait forever. Otherwise the maximum number of seconds to wait for.
write(b)

writes data to the pipe

The implementation varies depending on the type of b. If b is an immutable bytes object then it is accepted even if this overfills the internal buffer (as it is not actually copied). If b is a bytearray then data is copied, up to the maximum buffer size.

write_eof()

Writes the EOF flag to the Pipe

Any waiting readers are notified and will wake to process the Pipe. After this call the Pipe will not accept any more data.

flush()

flushes the Pipe

The intention of flush to push any written data out to the destination, in this case the thread that is reading the data.

In write-blocking mode this call will wait until the buffer is empty, though if the reader is idle for more than timeout seconds then it will raise IOError.

In non-blocking mode it simple raises IOError with EWOULDBLOCK if the buffer is not empty.

Given that flush is called automatically by close() for classes that inherit from the base io classes our implementation of close discards the buffer rather than risk an exception.

readall()

Overridden to take care of non-blocking behaviour.

Warning: readall always blocks until it has read EOF, regardless of the rblocking status of the Pipe.

The problem is that, if the Pipe is set for non-blocking reads then we seem to have the choice of returning a partial read (and failing to signal that some of the data is still in the pipe) or raising an error and losing the partially read data.

Perhaps ideally we’d return None indicating that we are blocked from reading the entire stream but this isn’t listed as a possible return result for io.RawIOBase.readall and it would be tricky to implement anyway as we still need to deal with partially read data.

Ultimately the safe choices are raise an error if called on a non-blocking Pipe or simply block. We do the latter on the basis that anyone calling readall clearly intends to wait.

For a deep discussion of the issues around non-blocking behaviour see http://bugs.python.org/issue13322

readmatch(match='\r\n')

Read until a byte string is matched

match
A binary string, defaults to CRLF.

This operation will block if the string is not matched unless the buffer becomes full without a match, in which case IOError is raised with code ENOBUFS.

read(nbytes=-1)

read data from the pipe

May return fewer than nbytes if the result can be returned without copying data. Otherwise readinto() is used.

readinto(b)

Reads data from the Pipe into a bytearray.

Returns the number of bytes read. 0 indicates EOF, None indicates an operation that would block in a Pipe that is non-blocking for read operations. May return fewer bytes than would fit into the bytearray as it returns as soon as it has at least some data.

class pyslet.streams.BufferedStreamWrapper(src, buffsize=8192)

Bases: io.RawIOBase

A buffered wrapper for file-like objects.

src
A file-like object, we only require a read method
buffsize
The maximum size of the internal buffer

On construction the src is read until an end of file condition is encountered or until buffsize bytes have been read. EOF is signaled by an empty string returned by src’s read method. Instances then behave like readable streams transparently reading from the buffer or from the remainder of the src as applicable.

Instances behave differently depending on whether or not the entire src is buffered. If it is they become seekable and set a value for the length attribute. Otherwise they are not seekable and the length attribute is None.

If src is a non-blocking data source and it becomes blocked, indicated by read returning None rather than an empty string, then the instance reverts to non-seekable behaviour.

peek(nbytes)

Read up to nbytes without advancing the position

If the stream is not seekable and we have read past the end of the internal buffer then an empty string will be returned.