Производитель/Потребитель распараллеливает использование Очереди

Я хотел бы создать своего рода Producer/Consumer поточная обработка приложения. Но я не уверен что лучший способ реализовать очередь между двумя.

Таким образом, у меня есть некоторые с двумя идеями (оба из которых могли быть совершенно неправильными). Я хотел бы знать, который будет лучше и если бы они оба сосут затем, что было бы лучшим способом реализовать очередь. Главным образом моя реализация очереди в этих примерах я обеспокоен. Я расширяю класс Очереди, который является в классе дома и ориентирован на многопотоковое исполнение. Ниже два примера с 4 классами каждый.

Основной класс -

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Потребительский класс -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Класс производителя -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Класс очереди -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

ИЛИ

Основной класс -

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Потребительский класс -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Класс производителя -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Класс очереди -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

И пойдите!

56
задан Ravindra babu 17 September 2017 в 20:20
поделиться

3 ответа

В Java 5+ есть все инструменты, необходимые для такого рода вещь. Вам нужно:

  1. Поместить всех ваших Продюсеров в одну ExecutorService ;
  2. Поместить всех ваших Потребителей в другую ExecutorService ;
  3. При необходимости, общаться между двумя, используя a BlockingQueue .

Я говорю «если необходимо» для (3), потому что по моему опыту это ненужный шаг. Все, что вам нужно сделать, это отправить новые задачи в службу исполнителя-потребителя. Итак:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

Итак, производители подчиняются напрямую потребителям .

76
ответ дан 26 November 2019 в 17:18
поделиться

Вы заново изобретаете колесо.

Если вам нужна персистентность и другие корпоративные возможности, используйте JMS (я бы предложил ActiveMq).

Если вам нужны быстрые очереди in-memory, используйте одну из имплементаций Queue от java.

Если вам нужна поддержка java 1.4 или более ранней версии, используйте отличный пакет concurrent от Doug Lea.

8
ответ дан 26 November 2019 в 17:18
поделиться

Хорошо, как отмечают другие, лучше всего использовать пакет java.util.concurrent . Я настоятельно рекомендую "Java Concurrency in Practice". Это отличная книга, охватывающая практически все, что вам нужно знать.

Что касается вашей конкретной реализации, как я отмечал в комментариях, не запускайте потоки из конструкторов - это может быть небезопасно.

Если оставить это в стороне, вторая реализация кажется лучше. Вы не хотите помещать очереди в статические поля. Вы, вероятно, просто зря теряете гибкость.

Если вы хотите продолжить работу над своей собственной реализацией (я полагаю, в целях обучения?), Предоставьте хотя бы метод start () . Вы должны создать объект (вы можете создать экземпляр объекта Thread ), а затем вызвать start () , чтобы запустить поток.

Редактировать: ExecutorService имеют свою собственную очередь, так что это может сбивать с толку .. Вот кое-что для начала.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

Дальнейшее РЕДАКТИРОВАНИЕ: Для производителя вместо while (true) вы можете сделать что-то вроде:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

Таким образом, вы можете выключить исполнителя, вызвав .shutdownNow () . Если вы используете while (true) , он не завершится.

Также обратите внимание, что Производитель все еще уязвим для RuntimeExceptions (то есть одно RuntimeException остановит обработку)

17
ответ дан 26 November 2019 в 17:18
поделиться
Другие вопросы по тегам:

Похожие вопросы: