/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.transport.peer;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ActiveMQTextMessage;
import org.activemq.message.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class PeerTransportTest
extends TestCase
implements MessageListener {
    protected Log log = LogFactory.getLog(((Object)((Object)this)).getClass());
    protected Destination destination;
    protected boolean topic = true;
    protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
    protected static int MESSAGE_COUNT = 50;
    protected static int NUMBER_IN_CLUSTER = 3;
    protected int deliveryMode = 1;
    protected MessageProducer[] producers;
    protected Connection[] connections;

    protected void setUp() throws Exception {
        this.connections = new Connection[NUMBER_IN_CLUSTER];
        this.producers = new MessageProducer[NUMBER_IN_CLUSTER];
        Destination destination = this.createDestination();
        int portStart = 50000;
        String root = System.getProperty("activemq.store.dir");
        for (int i = 0; i < NUMBER_IN_CLUSTER; ++i) {
            System.setProperty("activemq.store.dir", root + "_broker_" + i);
            this.connections[i] = this.createConnection();
            this.connections[i].setClientID("ClusterTest" + i);
            this.connections[i].start();
            Session session = this.connections[i].createSession(false, 1);
            this.producers[i] = session.createProducer(destination);
            this.producers[i].setDeliveryMode(this.deliveryMode);
            MessageConsumer consumer = this.createMessageConsumer(session, destination);
            consumer.setMessageListener((MessageListener)this);
        }
        System.out.println("Sleeping to ensure cluster is fully connected");
        Thread.sleep(5000L);
    }

    protected void tearDown() throws Exception {
        if (this.connections != null) {
            for (int i = 0; i < this.connections.length; ++i) {
                this.connections[i].close();
            }
        }
    }

    protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
        return session.createConsumer(destination);
    }

    protected int expectedReceiveCount() {
        return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
    }

    protected Connection createConnection() throws JMSException {
        System.err.println("creating connection ....");
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + ((Object)((Object)this)).getClass().getName());
        return fac.createConnection();
    }

    protected Destination createDestination() {
        return this.createDestination(((Object)((Object)this)).getClass().getName());
    }

    protected Destination createDestination(String name) {
        if (this.topic) {
            return new ActiveMQTopic(name);
        }
        return new ActiveMQQueue(name);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(Message msg) {
        this.receivedMessageCount.increment();
        SynchronizedInt synchronizedInt = this.receivedMessageCount;
        synchronized (synchronizedInt) {
            if (this.receivedMessageCount.get() >= this.expectedReceiveCount()) {
                this.receivedMessageCount.notify();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSendReceive() throws Exception {
        for (int i = 0; i < MESSAGE_COUNT; ++i) {
            ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("MSG-NO:" + i);
            for (int x = 0; x < this.producers.length; ++x) {
                this.producers[x].send((Message)textMessage);
            }
        }
        SynchronizedInt synchronizedInt = this.receivedMessageCount;
        synchronized (synchronizedInt) {
            if (this.receivedMessageCount.get() < this.expectedReceiveCount()) {
                this.receivedMessageCount.wait(20000L);
            }
        }
        Thread.sleep(2000L);
        System.err.println("GOT: " + this.receivedMessageCount.get());
        PeerTransportTest.assertEquals((String)"Expected message count not correct", (int)this.expectedReceiveCount(), (int)this.receivedMessageCount.get());
    }
}

