Сигнал отката от JMS MessageListener

Я работал с JMS и ActiveMQ. Все творит чудеса. Я не использую Spring и не могу.

Интерфейс javax.jms.MessageListener имеет только один метод, onMessage . Изнутри реализации есть вероятность, что будет выброшено исключение. Если на самом деле возникает исключение, я говорю, что сообщение не было обработано должным образом и его нужно повторить. Итак, мне нужно, чтобы ActiveMQ немного подождал, а затем повторил попытку. т.е. мне нужно сгенерированное исключение для отката транзакции JMS.

Как я могу выполнить такое поведение?

Может быть, в ActiveMQ есть какая-то конфигурация, которую я не смог найти.

Или ... может быть, мог бы покончить с регистрацией MessageListener для потребителей и потреблять сообщения самостоятельно, в цикле вроде:

while (true) {
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    // ... some more administrative stuff
}

в паре потоков вместо регистрации слушателя.

Или ...Я мог бы как-то украсить / AOP / байтовое манипулирование MessageListener , чтобы сделать это самостоятельно.

Какой путь вы выберете и почему?

примечание : У меня нет полного контроля через код MessageListener .

EDIT Тест для подтверждения концепции:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        message.acknowledge();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        message.acknowledge();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    brokerService.stop();

    assertEquals(3, atomicInteger.get());
}
13
задан H Marcelo Morales 28 August 2011 в 14:27
поделиться