Я хотел бы создать своего рода Producer/Consumer
поточная обработка приложения. Но я не уверен что лучший способ реализовать очередь между двумя.
Таким образом, у меня есть некоторые с двумя идеями (оба из которых могли быть совершенно неправильными). Я хотел бы знать, который будет лучше и если бы они оба сосут затем, что было бы лучшим способом реализовать очередь. Главным образом моя реализация очереди в этих примерах я обеспокоен. Я расширяю класс Очереди, который является в классе дома и ориентирован на многопотоковое исполнение. Ниже два примера с 4 классами каждый.
Основной класс -
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Потребительский класс -
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
Класс производителя -
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
Класс очереди -
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
ИЛИ
Основной класс -
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
Потребительский класс -
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
Класс производителя -
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
Класс очереди -
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
И пойдите!
В Java 5+ есть все инструменты, необходимые для такого рода вещь. Вам нужно:
ExecutorService
; ExecutorService
; BlockingQueue
. Я говорю «если необходимо» для (3), потому что по моему опыту это ненужный шаг. Все, что вам нужно сделать, это отправить новые задачи в службу исполнителя-потребителя. Итак:
final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Итак, производители
подчиняются напрямую потребителям
.
Вы заново изобретаете колесо.
Если вам нужна персистентность и другие корпоративные возможности, используйте JMS (я бы предложил ActiveMq).
Если вам нужны быстрые очереди in-memory, используйте одну из имплементаций Queue от java.
Если вам нужна поддержка java 1.4 или более ранней версии, используйте отличный пакет concurrent от Doug Lea.
Хорошо, как отмечают другие, лучше всего использовать пакет java.util.concurrent
. Я настоятельно рекомендую "Java Concurrency in Practice". Это отличная книга, охватывающая практически все, что вам нужно знать.
Что касается вашей конкретной реализации, как я отмечал в комментариях, не запускайте потоки из конструкторов - это может быть небезопасно.
Если оставить это в стороне, вторая реализация кажется лучше. Вы не хотите помещать очереди в статические поля. Вы, вероятно, просто зря теряете гибкость.
Если вы хотите продолжить работу над своей собственной реализацией (я полагаю, в целях обучения?), Предоставьте хотя бы метод start ()
. Вы должны создать объект (вы можете создать экземпляр объекта Thread
), а затем вызвать start ()
, чтобы запустить поток.
Редактировать: ExecutorService
имеют свою собственную очередь, так что это может сбивать с толку .. Вот кое-что для начала.
public class Main {
public static void main(String[] args) {
//The numbers are just silly tune parameters. Refer to the API.
//The important thing is, we are passing a bounded queue.
ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
//No need to bound the queue for this executor.
//Use utility method instead of the complicated Constructor.
ExecutorService producer = Executors.newSingleThreadExecutor();
Runnable produce = new Produce(consumer);
producer.submit(produce);
}
}
class Produce implements Runnable {
private final ExecutorService consumer;
public Produce(ExecutorService consumer) {
this.consumer = consumer;
}
@Override
public void run() {
Pancake cake = Pan.cook();
Runnable consume = new Consume(cake);
consumer.submit(consume);
}
}
class Consume implements Runnable {
private final Pancake cake;
public Consume(Pancake cake){
this.cake = cake;
}
@Override
public void run() {
cake.eat();
}
}
Дальнейшее РЕДАКТИРОВАНИЕ:
Для производителя вместо while (true)
вы можете сделать что-то вроде:
@Override
public void run(){
while(!Thread.currentThread().isInterrupted()){
//do stuff
}
}
Таким образом, вы можете выключить исполнителя, вызвав .shutdownNow ()
. Если вы используете while (true)
, он не завершится.
Также обратите внимание, что Производитель
все еще уязвим для RuntimeExceptions
(то есть одно RuntimeException
остановит обработку)