/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.store.journal;

import EDU.oswego.cs.dl.util.concurrent.Callable;
import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.broker.BrokerContainer;
import org.activemq.broker.impl.BrokerContainerImpl;
import org.activemq.message.ActiveMQQueue;

public class JournalBrokerBenchmark
extends TestCase {
    private static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("MESSAGE_COUNT", "10000"));
    private BrokerContainerImpl broker;
    private ActiveMQQueue dest;
    private ActiveMQConnectionFactory connectionFactory;

    public static void main(String[] args) {
        TestRunner.run((Class)JournalBrokerBenchmark.class);
    }

    protected void setUp() throws Exception {
        this.broker = new BrokerContainerImpl("localhost");
        this.broker.addConnector("tcp://localhost:61616");
        this.broker.start();
        this.connectionFactory = new ActiveMQConnectionFactory((BrokerContainer)this.broker, "tcp://localhost:61616");
        this.dest = new ActiveMQQueue("TEST");
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
    }

    public void testConcurrentSendReceive() throws Throwable {
        int i;
        final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
        final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
        final ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT * 2, 5);
        final Semaphore connectionsEstablished = new Semaphore((long)(1 - (CONSUMER_COUNT + PRODUCER_COUNT)));
        final Latch startTest = new Latch();
        final Semaphore testsFinished = new Semaphore((long)(1 - (CONSUMER_COUNT + PRODUCER_COUNT)));
        final Callable producer = new Callable(){

            public Object call() throws JMSException, InterruptedException {
                Connection connection = JournalBrokerBenchmark.this.connectionFactory.createConnection();
                Session session = connection.createSession(false, 1);
                MessageProducer producer = session.createProducer((Destination)JournalBrokerBenchmark.this.dest);
                producer.setDeliveryMode(2);
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(new byte[1024]);
                connection.start();
                connectionsEstablished.release();
                startTest.acquire();
                int msgs = MESSAGE_COUNT / PRODUCER_COUNT + 1;
                for (int i = 0; i < msgs; ++i) {
                    pp.increment();
                    producer.send((Message)message);
                }
                testsFinished.release();
                connection.close();
                return null;
            }
        };
        final Callable consumer = new Callable(){

            public Object call() throws JMSException, InterruptedException {
                Latch doneLatch = new Latch();
                Connection connection = JournalBrokerBenchmark.this.connectionFactory.createConnection();
                Session session = connection.createSession(false, 1);
                MessageConsumer consumer = session.createConsumer((Destination)JournalBrokerBenchmark.this.dest);
                connectionsEstablished.release();
                startTest.acquire();
                int msgs = MESSAGE_COUNT / CONSUMER_COUNT - 1;
                consumer.setMessageListener(new MessageListener(this, msgs, doneLatch){
                    int counter;
                    private final /* synthetic */ int val$msgs;
                    private final /* synthetic */ Latch val$doneLatch;
                    private final /* synthetic */ 2 this$1;
                    {
                        this.this$1 = this$1;
                        this.val$msgs = val$msgs;
                        this.val$doneLatch = val$doneLatch;
                        this.counter = 0;
                    }

                    public void onMessage(Message msg) {
                        2.access$300(this.this$1).increment();
                        ++this.counter;
                        if (this.counter >= this.val$msgs) {
                            this.val$doneLatch.release();
                        }
                    }
                });
                connection.start();
                doneLatch.acquire();
                testsFinished.release();
                connection.close();
                return null;
            }

            static /* synthetic */ ProgressPrinter access$300(2 x0) {
                return x0.pp;
            }
        };
        final Throwable[] workerError = new Throwable[1];
        for (i = 0; i < PRODUCER_COUNT; ++i) {
            new Thread("Producer:" + i){

                public void run() {
                    try {
                        producer.call();
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        workerError[0] = e;
                    }
                }
            }.start();
        }
        for (i = 0; i < CONSUMER_COUNT; ++i) {
            new Thread("Consumer:" + i){

                public void run() {
                    try {
                        consumer.call();
                    }
                    catch (Throwable e) {
                        workerError[0] = e;
                    }
                }
            }.start();
        }
        connectionsEstablished.acquire();
        startTest.release();
        long start = System.currentTimeMillis();
        testsFinished.acquire();
        long end = System.currentTimeMillis();
        System.out.println(this.getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: " + (float)MESSAGE_COUNT * 1000.0f / (float)(end - start));
        if (workerError[0] != null) {
            throw workerError[0];
        }
    }

    static class ProgressPrinter {
        private final int total;
        private final int interval;
        int percentDone = 0;
        int counter = 0;

        public ProgressPrinter(int total, int interval) {
            this.total = total;
            this.interval = interval;
        }

        public synchronized void increment() {
            this.update(++this.counter);
        }

        public synchronized void update(int current) {
            int at = 100 * current / this.total;
            if (this.percentDone / this.interval != at / this.interval) {
                this.percentDone = at;
                System.out.println("Completed: " + this.percentDone + "%");
            }
        }
    }
}

