/* * DataChannelInputStream.java - an input stream using data channels. * * Copyright (c) 1996 Chuck McManis, All Rights Reserved. * * Permission to use, copy, modify, and distribute this software * and its documentation for NON-COMMERCIAL purposes and without * fee is hereby granted provided that this copyright notice * appears in all copies. * * CHUCK MCMANIS MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE * SUITABILITY OF THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING * BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT. CHUCK MCMANIS * SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT * OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES. */ package util.comm; import java.io.*; /** * This class implements an InputStream that runs through a * DataChannel. The advantages of this are that the stream * is many to one and one to many. Anyone can open an input * stream on the DataChannel and get copies of all data sent * to the channel, anyone can write to the data channel and * send output to all readers. */ public class DataChannelInputStream extends InputStream implements Runnable { private Thread reader; DataChannel dc; StreamBuffer current; int hasBeenRead = 0; StreamBufferList bufferList = new StreamBufferList(); synchronized void addBuffer(StreamBuffer sb) { if (current == null) { current = sb; hasBeenRead = 0; } else bufferList.putBuffer(sb); notifyAll(); } /** * Return the total number of bytes available for reading from * the stream. */ public int avail() { int total = 0; if (current != null) { total = (current.avail() - hasBeenRead); } if (bufferList != null) { total += bufferList.avail(); } return total; } /** * Remove one value from the buffer, block if buffer * is empty. Note that if several threads call this * they will each get a value until the buffer is empty * or they stop asking. The order in which the threads * wake up and get values is system dependent. */ synchronized int getFromBuffer() { int r; // our eventual result; while (true) { // Wait for some data to appear if necessary. while (current == null) { try { wait(); } catch (InterruptedException e) { return -1; } } // Retrieve a byte from the current StreamBuffer r = current.read(hasBeenRead++); if (r != -1) { notifyAll(); return r; // return it to caller } // Current buffer is empty, get the next one. StreamBuffer tmp = current; hasBeenRead = 0; current = bufferList.getBuffer(); tmp.recycle(); // Recycle this buffer. tmp = null; // drop our reference to it. } } /** * Create a new input stream on the DataChannel named * chanID */ public DataChannelInputStream(String chanID) { reader = new Thread(this); dc = DataChannel.getChannel(chanID, reader, 8); dc.setBlocking(reader); reader.start(); } /** * Read a "byte" from the stream. Return -1 if the * stream is "empty", and return 255 if you read * a byte 0xff. */ public synchronized int read() throws IOException { return getFromBuffer(); } /** * The run method waits for StreamBuffer objects and * puts them in the queue to be read. */ public void run() { while (reader == Thread.currentThread()) { Object q = null; try { q = dc.getValue(); } catch (DataChannelTimeout e) { continue; } catch (DataChannelShutdown e) { return; } catch (DataChannelOverrun e) { continue; } catch (DataChannelException e) { return; } if (q instanceof StreamBuffer) { addBuffer((StreamBuffer) q); } } } }