Using threads in Java, Part 2

Create communication channels that operate between threads

Summary
Communicating between threads is made easy by developing a class that uses the techniques discussed in the first part of this series. Herein, see firsthand how to write such a data channel class, and then create a simple example application that illustrates a real-world implementation of the class. (3,700 words)

By Chuck McManis

In part one of this series of colums, I showed you how two or more threads in the Java runtime can synchronize with the wait() and notify() methods in class Object. In this installment we will write a class that uses these techniques to create a communication channel that operates between threads.

The communication channel performs four tasks. It

This model is pretty typical of such designs.

The first of these, opening a named channel, requires that the class be able to share information (the channel registry) between threads. In Java, threads typically run in a shared address space. Subsequently, static fields in a Java class are shared across all instances of that class. You will recall that static instance variables (sometimes called class variables) do not require that an object instance be created to access them. Instead, they may be accessed using the Classname.variable_name syntax. For the purposes of our example, however, the more salient property is that they are visible to all threads.

Let the games begin
To create our DataChannel class we start out with this code:

1    package util.comm;
2
3    import java.util.Hashtable;
4    import java.util.Enumeration;
5
6    public class DataChannel {
7
8        private Hashtable TIDs;
9
10        private DataChannel() {
11            TIDs = new Hashtable();
12        }
13
14        private static Hashtable registry = null;
15
16   /** getChannel (writer version) */
17        public static synchronized DataChannel getChannel(String name) {
18       if (registry == null)
19       registry = new Hashtable();
20       DataChannel it = (DataChannel) registry.get(name);
21       if (it == null) {
22           it = new DataChannel();
23           it.myName = name;
24           registry.put(name, it);
25       }
26       return it;
27        }

Line 1 puts this code in the util.comm package. There are two advantages to doing this: the classes for the data channels don't clutter up our main classes directory; and we can create a "helper" class named DataItem that is private to this package. Note that the package statement must be the first non-comment line in the file.

Lines 3 through 13 are the constructor for a new DataChannel object. As you can see, the constructor is declared private. This declaration makes it impossible for any object, except a DataChannel object, to construct a new instance of a DataChannel. This is a common technique to restrict the ability to construct new objects to a static method like the one in line 17.

Line 14 declares the registry hash table. Because this declaration is static and private, the hash table is shared across thread address spaces, but accessible only to the methods in DataChannel.

Finally, in lines 17 through 27, comes the channel-writer version of the getChannel() method. As a static method, it is accessed using the syntax:

DataChannel dc = DataChannel.getChannel("name");

Because it is a method in DataChannel, it is allowed to manipulate the private registry hash table and to call the private constructor defined in the class. Static methods such as this one that return an object of their own class type are called factory methods. This term is used because unlike constructors that simply allocate an object from the heap, factory methods manufacture one.

Because getChannel is synchronized, the Java runtime will guarantee that only one thread at a time can be executing it. As you can see, if the named channel doesn't exist, it is created.

Reading is fundamental
The second version of the getChannel method creates a reader DataChannel. Here are the bits:

27   /** getChannel (reader version) */
28        public static DataChannel getChannel(String name, Thread myTID, int qSize) {
29       DataChannel it = getItem(name);
30
31       if (qSize == 0)
32           return it;
33       it.TIDs.put(myTID, new DataItem(myTID, qSize));
34       return it;
35        }

This version takes the additional parameters of a thread identifier and a queue size. It starts by accessing the globally named DataChannel using the other getChannel method. When that channel is returned, it attaches to it an instance of a DataItem object. This object provides the actual channel for the thread. Note that every data channel has one data item for every thread that is interested in receiving data on the channel.

As you may now see, our data channels can have many readers, and they can also have many writers. Further, there can be several data channels active at one time. To prevent all of them from attempting to synchronize on one object (the data channel), we'll create the DataItem class to provide a distinct object between each reader and writer.

The DataItem class is defined below; we'll describe it in sections. This version of DataItem is easy to understand, but it is not as flexible as it could be.

1    package util.comm;
2
3    class DataItem {
4        private Thread TID; // The consumer thread
5        private Object q[]; // Queued values for thread
6        private int nQueue; // Number of items in the Queue
7        private int dataStatus; // Status of the queue
8
9        // legal Queue status values
10        final static int NONE = 0; // No data available
11        final static int NEW = 1; // New data available
12        final static int OVERRUN = 2; // Data overrun (queue overflow)
13
14        DataItem(Thread t, int Q) {
15       q = new Object[Q];
16       nQueue = 0;
17       TID = t;
18       dataStatus = NONE;
19        }
20
21        /** shorter version for a queue size of two (2) */
22        DataItem(Thread t) {
23       this(t, 2);
24        }
 

Again, this class is part of the util.comm package. This is declared in line 1. Remember that the package statement must be the first non-comment line in the file.

Lines 3 through 12 declare the instance variables for this class and some constants used within the methods. These maintain the state in this instance of the data item.

Lines 14 through 24 are the constructors for DataItem. Neither are public since they are used only by DataChannel, which is also defined in the util.comm package. The first constructor allocates a data item for the referenced thread with a queue size as specified in Q. The second simply always allocates a queue size of two, which has been shown to be a useful default.


25
26   /** insert is the "write" method */
27        synchronized void insert(Object x) {
28       if (nQueue == q.length) {
29           dataStatus = OVERRUN;
30           q[nQueue-1] = x;
31       } else {
32           q[nQueue] = x;
33           dataStatus = NEW;
34           nQueue++;
35       }
36       notify();
37        }
 

The method insert() puts new data into the queue. It is synchronized so the writer gets the object's monitor before entering. This ensures multiple writers won't enter the method and potentially corrupt the state of the queue.

Lines 28 through 30 check for data overruns. An overrun occurs when an attempt to insert an object into a full queue is made. This version of the method implements a policy that the previous last value is discarded and the new value is placed at the end of the queue.

Lines 32 through 34 insert the item into the queue, change dataStatus, and then update the number of items queued. Finally, in line 36, any threads reading this data are notified that there is data available to be consumed.

The data is read by fetch() below. It is only a bit more complicated than insert().


38
39   /** fetch is the "read" method */
40        synchronized Object fetch() throws
DataChannelOverrun, DataChannelShutdown, DataChannelTimeout {
41       Object r = null;
42
43 /* If nothing is waiting, we sleep. */
44 if (dataStatus == NONE) {
45     try {
46         long was = System.currentTimeMillis();
47         wait(timeout);
48         if ((System.currentTimeMillis() - was) >= timeout)
49     throw new DataChannelTimeout();
50     } catch (InterruptedException e) { dataStatus = SHUTDOWN; }
51 }
52
53 /* Data (or status) has arrived so dispatch it. */
54 switch (dataStatus) {
55     case NONE:
56 break;
57     case NEW:
58 r = q[0];
59 System.arraycopy(q, 1, q, 0, q.length - 1);
60 nQueue--;
61 break;
62     case OVERRUN:
63 dataStatus = NEW; // Next call will get the data
64 throw new DataChannelOverrun();
65     case SHUTDOWN:
66 throw new DataChannelShutdown();
67 }
68 if (nQueue == 0)
69     dataStatus = NONE;
70 return r;
71    }
 

In line 44 the code checks the value of dataStatus and goes to sleep in a wait if there is no data available. This is similar to the code in the earlier PingPong class described in Part 1 of this series ("Synchronizing threads in Java," April 1996 JavaWorld).

When the thread is notified, it awakens and then checks to see how much time has passed in case this was a timeout. If it was a timeout it throws a timeout exception. If the thread awakened because it was sent an InterruptedException, it immediately sets the data channel into "shutdown" mode and throws the DataChannelShutdown exception a bit later.

If the state is NEW, the code returns the next object in the array and then shuffles the array to make room for more data. In this example, we use the static method arraycopy in the System class. Alternatively, we could use a read index and a write index into the queue.

If the state is OVERRUN, the code throws the DataChannelOverrun exception. This exception has to be thrown here, rather than at write time, because Java doesn't currently support posting an asynchronous exception to a thread. Note that the only tricky thing here is that the data status changes from OVERRUN to NEW, so that the next time this function is called, the values in the queue will be read.

All of the exceptions are a subclass of DataChannelException. This is done so that clients can either catch individual exceptions if they are interested in those conditions, or they can simply catch DataChannelException if they wish to catch all possible exceptions thrown.

Finally, there are a few convenience methods that we use to monitor the state of the object. These are pretty self explanatory.


63        /** available items to be read */
64        int queueSize() { return nQueue; }
65
66        /** data is waiting check */
67        boolean hasData() { return (nQueue > 0); }
68
69        /** last value posted (doesn't block) */
70        Object lastValue() { return q[0]; }
71
72        synchronized void delete() { dataStatus = SHUTDOWN; notify(); }
73  
74    }
 

However, the last is used by the releaseChannel method of the DataChannel class. The trick is that the thread reading the channel needs to be notified that the channel is being shut down, but the thread is asleep waiting for data that will never come. So in releaseChannel, the DataItem object associated with the passed thread identifier has its delete() method called. This sets the state to shutdown and wakes up the thread. The next thing the thread will see is a DataChannelShutdown exception that it must be prepared to deal with.

Straight is the gate
Using the DataItem class is reasonably straightforward. In the DataChannel class you will notice there are two methods for writing and reading data. The first is for writing and is shown below.


37        public synchronized void putValue(Object x) {
38         for (Enumeration e = TIDs.elements(); e.hasMoreElements(); ) {
39           ((DataItem)(e.nextElement())).insert(x);
40       }
41        }
 

This method, putValue(), puts the object reference passed in x into the queues of all the data items associated with this channel. Remember that there is one data-channel object but many data-item objects (one for each thread that registered interest in this channel.) This method uses an Enumeration to enumerate each element in the TIDs hash table. As insert() is called on each data item, the corresponding thread is sent a notify and may immediately begin running. Further, if no threads have yet registered an interest in this data channel, this method simply returns.

Reading values is straightforward as well. That is handled by the getValue() method shown here.

42        public Object getValue() throws DataChannelOverrun {
43       DataItem di = (DataItem) TIDs.get(Thread.currentThread());
44       return ((di != null) ? di.fetch() : null);
45        }

This method gets the data item associated with the current thread and returns it; if the data item has no information in its queue, the thread will block on the call to fetch(). Otherwise, it will return immediately with data from the data item.

Quadrophenia: building a simple application
Let's take a moment to put together a simple application of these classes. The example application is completely contrived, of course, but it does demonstrate how the DataChannel class might be used. We'll implement a parallel quadratic equation computation engine. This engine takes an equation of the form

AX2 + BX + C

and produces outputs for the value of X between 1 and 5.

The helper class in this application is called Computer. This class takes two values coming in on two data channels, applies a mathematical operation to those values, and writes the result out on a third (result) channel. Here it is.

1    public class Computer implements Runnable {
2
3        public final static int ADD = 1;
4        public final static int SUB = 2;
5        public final static int MUL = 3;
6        public final static int DIV = 4;
7
8        private int op;
9        private Thread tid;
10
11        DataChannel aChannel;
12        DataChannel bChannel;
13        DataChannel outChannel;
14
15        public static Computer create(String inA, String inB,
16

String out, int doOp) {
17
18       if ((doOp < 1) || (doOp > 4))
19           return null;
20
21       Computer result   = new Computer();
22       result.tid        = new Thread(result);
23       result.tid.setName("Computer ("+out+")");
24       result.outChannel = DataChannel.getChannel(out);
25       result.aChannel   = DataChannel.getChannel(inA, result.tid, 4);
27         result.bChannel   = DataChannel.getChannel(inB, result.tid, 4);
28       result.op         = doOp;
29            return result;
30       }

The factory method create() creates a new Computer object. It also creates a thread for this object to run in.

32        public void start() { tid.start(); }
33
xx        public void stop() {
xx       op = 0;
xx       Integer i = new Integer(0);
xx       /** make sure our thread wakes up. */
xx       aChannel.releaseChannel(tid);
xx       bChannel.releaseChannel(tid);
xx       }
 

Then there are two methods, start() and stop(), to turn the computer on and off.

And the computation is done in the computer's run() method.

36        public void run() {
37       int a, b;
38
39       while (true) {
40           try {
41               a = ((Integer) aChannel.getValue()).intValue();
42               b = ((Integer) bChannel.getValue()).intValue();
43           } catch (DataChannelOverrun e) {
44       System.out.println("OVERRUN on "+tid);
45       break;
46           }
47           System.out.print(tid.getName()+" : ");
48           switch (op) {
49       case 0:
50           break;
51       case ADD:
52           System.out.println("ADD "+a+" + "+b+" -> "+(a+b));
53           outChannel.putValue(new Integer(a+b));
54           break;
...
... same for SUB, MUL, and DIV (check for divide by 0!) ...
...
75           }
76       }
77        }
78    }

For each operation, the class prints out a message telling you what operation it is performing. This is useful for watching the threads run on the console when you are running the application.

The primary application class is called Quadratic and takes advantage of the fact that the standard-form quadratic equation can be broken up into postfix notation.

Given:

AX2 + BX + C                            Equation 1

We can write it in postfix notation as:

((X X *) A *) (B X *) +) C +)           Equation 2

And diagramming the intermediate results gives us the following parse tree:

( X X * )    ( B X * )                  Figure 1
    |            |
    V            |
(  R1 A * )      |
      |          |
      V          V
(    R2         R3 + )
           |
           V
(         R4          C + )
                 |
                 V
               Answer
 

As you can see there are five intermediate results including the answer. Thus, we'll need a total of five Computer objects to compute the answer to the equation.

Here is the Quadratic class.


1    import util.*;
2    public class Quadratic {
3        public static void main(String args[]) {
4         DataChannel X = DataChannel.getChannel("X");
5       DataChannel X2 = DataChannel.getChannel("X2");
6       DataChannel A = DataChannel.getChannel("a");
7       DataChannel B = DataChannel.getChannel("b");
8       DataChannel C = DataChannel.getChannel("c");
9       DataChannel answer = DataChannel.getChannel("res5",
10

Thread.currentThread(), 2);
 

First we allocate data channels for all of the constants and intermediate results:


11       Computer res[] = new Computer[5];

Next we allocate five computers for the computation:

13       res[0] = Computer.create("X", "X2", "res1", Computer.MUL);
14       res[1] = Computer.create("X", "b", "res2", Computer.MUL);
15       res[2] = Computer.create("res1", "a", "res3", Computer.MUL);
16       res[3] = Computer.create("res3", "res2", "res4", Computer.SUB);
17       res[4] = Computer.create("res4", "c", "res5", Computer.ADD);
 

Note that we could not use "X" twice in the first allocation because there is no way for the thread reader in a data item to distinguish two registrations for the same channel by the same thread.

All five computers are allocated, so start them up.

18       /* Now start them up. */
19       for (int i = 0; i < res.length; i++) res[i].start();

And now, for five iterations, feed the computers values in reverse order to maximize the delay time between starting threads and thus increase the chance we will see two running at the same time on the output.

21       /* Now write in the values. */
22       for (int i = 0; i < 5; i++) {
23           C.putValue(new Integer(10));
24           B.putValue(new Integer(5));
25           A.putValue(new Integer(3));
26           X.putValue(new Integer(i));
27           X2.putValue(new Integer(i));

Compute the same value to check our results.

28           int check = ((i * i) * 3) - (5 * i) + 10;

And then wait for the result to appear in the answer data channel.

29           try {
30               System.out.println("X = "+i+", result = "+
31    ((Integer) answer.getValue())+
32

" ["+check+"]");
33           } catch (DataChannelOverrun e) {
34       System.out.println("Overrun on the answer!");
35           }
36       }

Finally, shut down the computer threads we have allocated by telling them to stop.

37       for (int i = 0; i < res.length; i++) res[i].stop();
43       try {
44           Thread.currentThread().sleep(100);
45       } catch (InterruptedException e) { }
46        }
47    }
 

And exit the main() method.

When this runs you will see an output that is similar to this:

Computer (res2) : MUL 0 * 5 -> 0
Computer (res1) : MUL 0 * 0 -> 0
Computer (res3) : MUL 0 * 3 -> 0
Computer (res4) : SUB 0 - 0 -> 0
Computer (res5) : ADD 0 + 10 -> 10
X = 0, result = 10 [10]
Computer (res2) : MUL 1 * 5 -> 5
Computer (res1) : MUL 1 * 1 -> 1
Computer (res3) : MUL 1 * 3 -> 3
Computer (res4) : SUB 3 - 5 -> -2
Computer (res5) : ADD -2 + 10 -> 8
X = 1, result = 8 [8]
Computer (res2) : MUL 2 * 5 -> 10
Computer (res1) : MUL 2 * 2 -> 4
Computer (res3) : MUL 4 * 3 -> 12
Computer (res4) : SUB 12 - 10 -> 2
Computer (res5) : ADD 2 + 10 -> 12
X = 2, result = 12 [12]
Computer (res2) : MUL 3 * 5 -> 15
Computer (res1) : MUL 3 * 3 -> 9
Computer (res3) : MUL 9 * 3 -> 27
Computer (res4) : SUB 27 - 15 -> 12
Computer (res5) : ADD 12 + 10 -> 22
X = 3, result = 22 [22]
Computer (res2) : MUL 4 * 5 -> 20
Computer (res1) : MUL 4 * 4 -> 16
Computer (res3) : MUL 16 * 3 -> 48
Computer (res4) : SUB 48 - 20 -> 28
Computer (res5) : ADD 28 + 10 -> 38
X = 4, result = 38 [38]

As you can see, our parallel computer got the same answer that the code got the old fashioned way.

Again, I stress that this is a contrived example and of reasonable illustrative value but limited usefulness. The real power of DataChannel comes to light when combined with applets on a Web page. And that is the subject of the next part of this series.

About the author
Chuck McManis is currently the Director of Technology at GolfWeb Inc. (http://www.golfweb.com), a Web magazine devoted to the game of golf, where he develops technologies that make the presentation of the magazine interactive, compelling, and enjoyable. Before joining GolfWeb, he was a member of the Java group. He joined the Java group just after the formation of FirstPerson Inc. and was a member of the portable OS group (the group responsible for the OS portion of Java). Later, when FirstPerson was dissolved, he stayed with the group through the development of the alpha and beta versions of the software. He was responsible for creating the Java version of the Sun home page in May 1995. He also developed a cryptographic library for Java and versions of the Java class loader that could screen classes based on Digital Signatures. Before joining FirstPerson, Chuck worked in the Operating Systems area of SunSoft developing networking applications, where he did the initial design of NIS+.