/*
 * Decompiled with CFR 0.152.
 */
package org.activemq;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.Iterator;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.TestSupport;
import org.activemq.broker.BrokerContainer;
import org.activemq.broker.impl.BrokerContainerImpl;
import org.activemq.broker.impl.DefaultBroker;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ActiveMQTopic;
import org.activemq.util.IdGenerator;

public abstract class LargeMessageTestSupport
extends TestCase
implements MessageListener {
    protected static final int LARGE_MESSAGE_SIZE = 131072;
    protected static final int MESSAGE_COUNT = 100;
    protected BrokerContainer broker;
    protected Connection producerConnection;
    protected Connection consumerConnection;
    protected MessageConsumer consumer;
    protected MessageProducer producer;
    protected Session producerSession;
    protected Session consumerSession;
    protected byte[] largeMessageData;
    protected Destination destination;
    protected boolean isTopic = true;
    protected boolean isDurable = true;
    protected int deliveryMode = 2;
    protected IdGenerator idGen = new IdGenerator();
    protected boolean validMessageConsumption = true;
    protected SynchronizedInt messageCount = new SynchronizedInt(0);
    protected String URL = "tcp://localhost:61717";
    protected DefaultBroker defaultBroker;
    protected int prefetchValue = 10000000;

    protected Destination createDestination() {
        String subject = ((Object)((Object)this)).getClass().getName();
        if (this.isTopic) {
            return new ActiveMQTopic(subject);
        }
        return new ActiveMQQueue(subject);
    }

    protected MessageConsumer createConsumer() throws Exception {
        if (this.isTopic && this.isDurable) {
            return this.consumerSession.createDurableSubscriber((Topic)this.destination, this.idGen.generateId());
        }
        return this.consumerSession.createConsumer(this.destination);
    }

    protected abstract ActiveMQConnectionFactory createConnectionFactory();

    protected void setUp() throws Exception {
        TestSupport.removeMessageStore();
        System.out.println("Setting up");
        this.messageCount.set(0);
        this.broker = new BrokerContainerImpl("TestLargeMessages");
        this.broker.addConnector(this.URL);
        this.broker.start();
        this.destination = this.createDestination();
        this.largeMessageData = new byte[131072];
        for (int i = 0; i < 131072; ++i) {
            this.largeMessageData[i] = i % 2 == 0 ? 97 : 122;
        }
        this.defaultBroker = (DefaultBroker)this.broker.getBroker();
        Thread.sleep(1000L);
        ActiveMQConnectionFactory fac = this.createConnectionFactory();
        fac.setQuickClose(true);
        this.producerConnection = fac.createConnection();
        this.setPrefetchPolicy((ActiveMQConnection)this.producerConnection);
        this.producerConnection.start();
        this.consumerConnection = fac.createConnection();
        this.setPrefetchPolicy((ActiveMQConnection)this.consumerConnection);
        this.consumerConnection.setClientID(this.idGen.generateId());
        this.consumerConnection.start();
        this.producerSession = this.producerConnection.createSession(false, 1);
        this.producer = this.producerSession.createProducer(this.createDestination());
        this.producer.setDeliveryMode(this.deliveryMode);
        this.consumerSession = this.consumerConnection.createSession(false, 1);
        this.consumer = this.createConsumer();
        this.consumer.setMessageListener((MessageListener)this);
        System.out.println("Setup complete");
    }

    protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) {
        activeMQConnection.getPrefetchPolicy().setTopicPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setQueuePrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(this.prefetchValue);
    }

    protected void tearDown() throws Exception {
        Thread.sleep(1000L);
        this.producerConnection.close();
        this.consumerConnection.close();
        this.broker.stop();
        this.largeMessageData = null;
    }

    protected boolean isSame(BytesMessage msg1) throws Exception {
        boolean result = false;
        for (int i = 0; i < 131072; ++i) {
            boolean bl = result = msg1.readByte() == this.largeMessageData[i];
            if (!result) break;
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(Message msg) {
        try {
            BytesMessage ba = (BytesMessage)msg;
            this.validMessageConsumption &= this.isSame(ba);
            LargeMessageTestSupport.assertTrue((ba.getBodyLength() == 131072L ? 1 : 0) != 0);
            if (this.messageCount.increment() >= 100) {
                SynchronizedInt synchronizedInt = this.messageCount;
                synchronized (synchronizedInt) {
                    this.messageCount.notify();
                }
            }
            System.out.println("got message = " + this.messageCount);
            if (this.messageCount.get() % 50 == 0) {
                System.out.println("count = " + this.messageCount);
                System.out.println(this.defaultBroker.getQueueManager().dumpContents());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testLargeMessages() throws Exception {
        for (int i = 0; i < 100; ++i) {
            System.out.println("Sending message: " + i);
            BytesMessage msg = this.producerSession.createBytesMessage();
            msg.writeBytes(this.largeMessageData);
            this.producer.send((Message)msg);
        }
        long now = System.currentTimeMillis();
        while (now + 60000L > System.currentTimeMillis() && this.messageCount.get() < 100) {
            System.out.println("message count = " + this.messageCount);
            SynchronizedInt synchronizedInt = this.messageCount;
            synchronized (synchronizedInt) {
                this.messageCount.wait(1000L);
            }
        }
        System.out.println("Finished count = " + this.messageCount);
        System.out.println(this.defaultBroker.getQueueManager().dumpContents());
        LargeMessageTestSupport.assertTrue((String)("Not enough messages - expected 100 but got " + this.messageCount), (this.messageCount.get() == 100 ? 1 : 0) != 0);
        LargeMessageTestSupport.assertTrue((String)"received messages are not valid", (boolean)this.validMessageConsumption);
        Thread.sleep(1000L);
        System.out.println("FINAL count = " + this.messageCount);
        System.out.println(this.defaultBroker.getQueueManager().dumpContents());
        List list = this.defaultBroker.getQueueManager().getMemoryBoundedQueues();
        Iterator i = list.iterator();
        while (i.hasNext()) {
            MemoryBoundedQueue q = (MemoryBoundedQueue)i.next();
            if (q.getLocalMemoryUsedByThisQueue() <= 0L) continue;
            System.err.println("Queue with enqueued messages at end of test: " + q.getName() + "= " + q.getLocalMemoryUsedByThisQueue());
        }
    }
}

