/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.store.journal;

import EDU.oswego.cs.dl.util.concurrent.Callable;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.OutputStream;
import javax.jms.Destination;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.activeio.adapter.PacketByteArrayOutputStream;
import org.activemq.io.impl.DefaultWireFormat;
import org.activemq.message.ActiveMQBytesMessage;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.MessageAck;
import org.activemq.message.Packet;
import org.activemq.store.MessageStore;
import org.activemq.store.PersistenceAdapter;
import org.activemq.store.cache.SimpleCachePersistenceAdapter;
import org.activemq.store.journal.JournalTestHelper;

public class JournalStoreBenchmark
extends TestCase {
    private static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("MESSAGE_COUNT", "100000"));
    private PersistenceAdapter adapter;
    private MessageStore store;
    private ActiveMQBytesMessage message;
    private MessageAck ack;

    public static void main(String[] args) {
        TestRunner.run((Class)JournalStoreBenchmark.class);
    }

    protected void setUp() throws Exception {
        JournalTestHelper helper = new JournalTestHelper();
        this.adapter = helper.createPersistenceAdapter("default");
        this.adapter = new SimpleCachePersistenceAdapter(this.adapter);
        this.adapter.start();
        this.store = this.adapter.createQueueMessageStore("TEST");
        this.store.start();
        this.message = new ActiveMQBytesMessage();
        this.message.writeBytes(new byte[1024]);
        this.message.setJMSDestination((Destination)new ActiveMQQueue("TEST"));
        this.ack = new MessageAck();
        this.ack.setDestination(this.message.getJMSActiveMQDestination());
    }

    protected void tearDown() throws Exception {
        this.store.stop();
        this.adapter.stop();
    }

    private void runConcurrentTest(int workers, final Callable test) throws InterruptedException, Throwable {
        final Throwable[] workerError = new Throwable[1];
        final Semaphore doneSemaphore = new Semaphore((long)(1 - workers));
        for (int i = 0; i < workers; ++i) {
            final String name = "" + i;
            new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void run() {
                    try {
                        Thread.currentThread().setName(name);
                        test.call();
                    }
                    catch (Throwable e) {
                        workerError[0] = e;
                    }
                    finally {
                        doneSemaphore.release();
                    }
                }
            }.start();
        }
        doneSemaphore.acquire();
        if (workerError[0] != null) {
            throw workerError[0];
        }
    }

    public void testAsyncAddMessage() throws Exception {
        this.message.setReceiptRequired(false);
        this.ack.setReceiptRequired(false);
        ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT, 5);
        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; ++i) {
            pp.increment();
            this.message.setJMSMessageID("id:" + i);
            this.store.addMessage((ActiveMQMessage)this.message);
            this.ack.setMessageID(this.message.getJMSMessageID());
            this.store.removeMessage(this.ack);
        }
        long end = System.currentTimeMillis();
        System.out.println(this.getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: " + (float)MESSAGE_COUNT * 1000.0f / (float)(end - start));
    }

    public void testConcurrentAsyncAddMessage() throws Throwable {
        final ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT, 5);
        int workers = 2;
        Callable task = new Callable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Object call() throws Exception {
                ActiveMQMessage messageCopy;
                ActiveMQBytesMessage activeMQBytesMessage = JournalStoreBenchmark.this.message;
                synchronized (activeMQBytesMessage) {
                    messageCopy = JournalStoreBenchmark.this.message.deepCopy();
                }
                messageCopy.setReceiptRequired(false);
                MessageAck ack = new MessageAck();
                ack.setReceiptRequired(false);
                ack.setDestination(messageCopy.getJMSActiveMQDestination());
                messageCopy.setReceiptRequired(false);
                int count = MESSAGE_COUNT / 2;
                String id = "id:" + Thread.currentThread().getName() + ":";
                for (int i = 0; i < count; ++i) {
                    pp.increment();
                    messageCopy.setJMSMessageID(id + i);
                    JournalStoreBenchmark.this.store.addMessage(messageCopy);
                    ack.setMessageID(JournalStoreBenchmark.this.message.getJMSMessageID());
                    JournalStoreBenchmark.this.store.removeMessage(ack);
                }
                return null;
            }
        };
        long start = System.currentTimeMillis();
        this.runConcurrentTest(2, task);
        long end = System.currentTimeMillis();
        System.out.println(this.getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: " + (float)MESSAGE_COUNT * 1000.0f / (float)(end - start));
    }

    public void XtestMessageSerialization() throws Exception {
        this.message.setReceiptRequired(false);
        DefaultWireFormat wireFormat = new DefaultWireFormat();
        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; ++i) {
            this.message.setJMSMessageID("id:" + i);
            PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
            DataOutputStream os = new DataOutputStream((OutputStream)pos);
            os.writeByte(1);
            os.writeUTF("Test");
            wireFormat.writePacket((Packet)this.message, (DataOutput)os);
            os.close();
        }
        long end = System.currentTimeMillis();
        System.out.println(this.getName() + ": test duration: " + (end - start) + " ms, msg/s: " + (float)MESSAGE_COUNT * 1000.0f / (float)(end - start));
    }

    public void XtestConcurrentMessageSerialization() throws Throwable {
        int workers = 10;
        Callable task = new Callable(){

            public Object call() throws Exception {
                DefaultWireFormat wireFormat = new DefaultWireFormat();
                ActiveMQMessage copy = JournalStoreBenchmark.this.message.deepCopy();
                copy.setReceiptRequired(false);
                for (int i = 0; i < MESSAGE_COUNT; ++i) {
                    copy.setJMSMessageID("id:" + i);
                    PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
                    DataOutputStream os = new DataOutputStream((OutputStream)pos);
                    os.writeByte(1);
                    os.writeUTF("Test");
                    wireFormat.writePacket((Packet)copy, (DataOutput)os);
                    os.close();
                }
                return null;
            }
        };
        long start = System.currentTimeMillis();
        this.runConcurrentTest(workers, task);
        long end = System.currentTimeMillis();
        System.out.println(this.getName() + ": test duration: " + (end - start) + " ms, msg/s: " + (float)(MESSAGE_COUNT * workers) * 1000.0f / (float)(end - start));
    }

    static class ProgressPrinter {
        private final int total;
        private final int interval;
        int percentDone = 0;
        int counter = 0;

        public ProgressPrinter(int total, int interval) {
            this.total = total;
            this.interval = interval;
        }

        public synchronized void increment() {
            this.update(++this.counter);
        }

        public synchronized void update(int current) {
            int at = 100 * current / this.total;
            if (this.percentDone / this.interval != at / this.interval) {
                this.percentDone = at;
                System.out.println("Completed: " + this.percentDone + "%");
            }
        }
    }
}

