/* * DataItem.java * * Copyright (c) 1996 Chuck McManis, All Rights Reserved. * * Permission to use, copy, modify, and distribute this software * and its documentation for NON-COMMERCIAL purposes and without * fee is hereby granted provided that this copyright notice * appears in all copies. * * CHUCK MCMANIS MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE * SUITABILITY OF THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING * BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT. CHUCK MCMANIS * SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT * OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES. */ package util.comm; import util.Recyclable; /** * This class defines a single item in the channel. Each * consumer thread gets its own entry to allow us to track * if all threads have consumed a value before the next * value is written. */ class DataItem { private Thread TID; // The consumer thread private Object q[]; // Queued values for thread private int nQueue; // Number of items queued private int dataStatus; // status of the queue private long timeout; // legal Q status values final static int NONE = 0; final static int NEW = 1; final static int OVERRUN = 2; final static int SHUTDOWN = 3; boolean waitOnQueue = false; /** * Return a new DataItem for this thread, with the given * queue size. Slower threads would need a longer queue * to prevent overruns. */ DataItem(Thread t, int Q, long timeo) { q = new Object[Q]; nQueue = 0; TID = t; timeout = timeo; dataStatus = NONE; } DataItem(Thread t, int Q) { this(t, Q, 600000l); } /** * Shorthand for a queue size of 2. */ DataItem(Thread t) { this(t, 2, 600000l); } /** * Insert data into this data item, note overrun if it * occurs. (Note no asynchronous exceptions so we can't * post this to the consuming thread.) */ synchronized void insert(Object x) { Referenced r = null; if (x instanceof Referenced) ((Referenced) x).addRefCount(); // if it would overrun and we're in block on overrun mode wait. if ((waitOnQueue) && (nQueue == q.length)) { System.out.println("Blocking on insert. QSIZE="+q.length); while (nQueue > 0) { notify(); try { wait(); } catch (InterruptedException e) { } } System.out.println("Resuming..."); } if (nQueue == q.length) { dataStatus = OVERRUN; /* * At this point we are going to discard the last value in * the Queue, so we check to see if it is a reference counting * object, and if so we decrement the reference to it, and if * it is a recyclable object we offer it back for recycling. */ if (q[nQueue - 1] instanceof Referenced) ((Referenced) q[nQueue-1]).decRefCount(); if (q[nQueue - 1] instanceof Recyclable) ((Recyclable) q[nQueue-1]).recycle(q[nQueue-1]); q[nQueue-1] = x; } else { q[nQueue] = x; dataStatus = NEW; nQueue++; } notify(); } /** * Return the next data item in the queue. If an overrun has * occurred, this method will throw DataChannelOverrun and * then reset the overrun condition. The client thread can * simply toss overruns if it chooses. It can call again * to get the last value received. * * This function will block the thread if there is no * data in the queue. */ synchronized Object fetch() throws DataChannelOverrun, DataChannelTimeout, DataChannelShutdown { Object r = null; /* If nothing is waiting, we sleep. */ if (dataStatus == NONE) { try { long was = System.currentTimeMillis(); wait(timeout); if ((System.currentTimeMillis() - was) >= timeout) throw new DataChannelTimeout(); } catch (InterruptedException e) { dataStatus = SHUTDOWN; } } /* Data (or status) has arrived so dispatch it. */ switch (dataStatus) { case NONE: break; case NEW: r = q[0]; System.arraycopy(q, 1, q, 0, q.length - 1); nQueue--; break; case OVERRUN: dataStatus = NEW; // Next call will get the data throw new DataChannelOverrun(); case SHUTDOWN: throw new DataChannelShutdown(); } if (nQueue == 0) dataStatus = NONE; notify(); return r; } /** available items to be read */ int queueSize() { return nQueue; } /** Data is waiting check */ boolean hasData() { return (nQueue > 0); } /** last value posted (doesn't block) */ Object lastValue() { return q[0]; } synchronized void delete() { dataStatus = SHUTDOWN; notify(); } void setBlocking() { waitOnQueue = true; } void setNonblocking() { waitOnQueue = false; } }