/* * DataItem.java * * Copyright (c) 1996 Chuck McManis, All Rights Reserved. * * Permission to use, copy, modify, and distribute this software * and its documentation for NON-COMMERCIAL purposes and without * fee is hereby granted provided that this copyright notice * appears in all copies. * * CHUCK MCMANIS MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE * SUITABILITY OF THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING * BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT. CHUCK MCMANIS * SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT * OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES. */ package util.comm; /** * This class defines a single item in the channel. Each * consumer thread gets its own entry to allow us to track * if all threads have consumed a value before the next * value is written. */ class DataItem { private Thread TID; // The consumer thread private Object q[]; // Queued values for thread private int nQueue; // Number of items queued private int dataStatus; // status of the queue private long timeout; // legal Q status values final static int NONE = 0; final static int NEW = 1; final static int OVERRUN = 2; final static int SHUTDOWN = 3; /** * Return a new DataItem for this thread, with the given * queue size. Slower threads would need a longer queue * to prevent overruns. */ DataItem(Thread t, int Q, long timeo) { q = new Object[Q]; nQueue = 0; TID = t; timeout = timeo; dataStatus = NONE; } DataItem(Thread t, int Q) { this(t, Q, 600000l); } /** * Shorthand for a queue size of 2. */ DataItem(Thread t) { this(t, 2, 600000l); } /** * Insert data into this data item, note overrun if it * occurs. (Note no asynchronous exceptions so we can't * post this to the consuming thread.) */ synchronized void insert(Object x) { if (nQueue == q.length) { dataStatus = OVERRUN; q[nQueue-1] = x; } else { q[nQueue] = x; dataStatus = NEW; nQueue++; } notify(); } /** * Return the next data item in the queue. If an overrun has * occurred, this method will throw DataChannelOverrun and * then reset the overrun condition. The client thread can * simply toss overruns if it chooses. It can call again * to get the last value received. * * This function will block the thread if there is no * data in the queue. */ synchronized Object fetch() throws DataChannelOverrun, DataChannelTimeout, DataChannelShutdown { Object r = null; /* If nothing is waiting, we sleep. */ if (dataStatus == NONE) { try { long was = System.currentTimeMillis(); wait(timeout); if ((System.currentTimeMillis() - was) >= timeout) throw new DataChannelTimeout(); } catch (InterruptedException e) { dataStatus = SHUTDOWN; } } /* Data (or status) has arrived so dispatch it. */ switch (dataStatus) { case NONE: break; case NEW: r = q[0]; System.arraycopy(q, 1, q, 0, q.length - 1); nQueue--; break; case OVERRUN: dataStatus = NEW; // Next call will get the data throw new DataChannelOverrun(); case SHUTDOWN: throw new DataChannelShutdown(); } if (nQueue == 0) dataStatus = NONE; return r; } /** available items to be read */ int queueSize() { return nQueue; } /** Data is waiting check */ boolean hasData() { return (nQueue > 0); } /** last value posted (doesn't block) */ Object lastValue() { return q[0]; } synchronized void delete() { dataStatus = SHUTDOWN; notify(); } }