/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.transport;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.broker.ConsumerInfoListener;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.transport.NetworkMessageBridge;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.composite.CompositeTransportChannel;

public class NetworkChannel
implements Service,
ConsumerInfoListener {
    private static final Log log = LogFactory.getLog((Class)NetworkChannel.class);
    private String uri;
    private BrokerContainer brokerContainer;
    private ActiveMQConnection localConnection;
    private ActiveMQConnection remoteConnection;
    private ConcurrentHashMap consumerMap = new ConcurrentHashMap();
    private String remoteUserName;
    private String remotePassword;
    private String remoteBrokerName;
    private String remoteClusterName;
    private int maximumRetries = 0;
    private long reconnectSleepTime = 1000L;

    public NetworkChannel() {
    }

    public NetworkChannel(BrokerContainer brokerContainer, String uri) {
        this();
        this.brokerContainer = brokerContainer;
        this.uri = uri;
    }

    public String toString() {
        return super.toString() + "[uri=" + this.uri + "]";
    }

    public void start() throws JMSException {
        Thread runner = new Thread(new Runnable(){

            public void run() {
                try {
                    NetworkChannel.this.initialize();
                    NetworkChannel.this.brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
                    NetworkChannel.this.startSubscriptions();
                    log.info((Object)("Started NetworkChannel to " + NetworkChannel.this.uri));
                }
                catch (JMSException jmsEx) {
                    log.error((Object)("Failed to start NetworkChannel: " + NetworkChannel.this.uri));
                }
            }
        }, "NetworkChannel Starter");
        runner.setDaemon(true);
        runner.start();
    }

    public void stop() throws JMSException {
        this.consumerMap.clear();
        if (this.remoteConnection != null) {
            this.remoteConnection.close();
            this.remoteConnection = null;
        }
        if (this.localConnection != null) {
            this.localConnection.close();
            this.localConnection = null;
        }
        Iterator i = this.consumerMap.values().iterator();
        while (i.hasNext()) {
            NetworkMessageBridge consumer = (NetworkMessageBridge)i.next();
            consumer.stop();
        }
    }

    public void onConsumerInfo(BrokerClient client, ConsumerInfo info) {
        if (!client.isClusteredConnection() && !info.hasVisited(this.remoteBrokerName)) {
            if (info.isStarted()) {
                this.addConsumerInfo(info);
            } else {
                this.removeConsumerInfo(info);
            }
        }
    }

    public String getUri() {
        return this.uri;
    }

    public void setUri(String uri) {
        this.uri = uri;
    }

    public String getRemotePassword() {
        return this.remotePassword;
    }

    public void setRemotePassword(String remotePassword) {
        this.remotePassword = remotePassword;
    }

    public String getRemoteUserName() {
        return this.remoteUserName;
    }

    public void setRemoteUserName(String remoteUserName) {
        this.remoteUserName = remoteUserName;
    }

    public BrokerContainer getBrokerContainer() {
        return this.brokerContainer;
    }

    public void setBrokerContainer(BrokerContainer brokerContainer) {
        this.brokerContainer = brokerContainer;
    }

    public int getMaximumRetries() {
        return this.maximumRetries;
    }

    public void setMaximumRetries(int maximumRetries) {
        this.maximumRetries = maximumRetries;
    }

    public long getReconnectSleepTime() {
        return this.reconnectSleepTime;
    }

    public void setReconnectSleepTime(long reconnectSleepTime) {
        this.reconnectSleepTime = reconnectSleepTime;
    }

    public String getRemoteBrokerName() {
        return this.remoteBrokerName;
    }

    public void setRemoteBrokerName(String remoteBrokerName) {
        this.remoteBrokerName = remoteBrokerName;
    }

    private void addConsumerInfo(ConsumerInfo info) {
        this.addConsumerInfo(info.getDestination(), info.isDurableTopic());
    }

    private void addConsumerInfo(ActiveMQDestination destination, boolean durableTopic) {
        NetworkMessageBridge key = new NetworkMessageBridge();
        key.setDestination(destination);
        key.setDurableTopic(durableTopic);
        NetworkMessageBridge bridge = (NetworkMessageBridge)this.consumerMap.get((Object)key);
        if (bridge == null) {
            try {
                bridge = key;
                bridge.setLocalBrokerName(this.brokerContainer.getBroker().getBrokerName());
                bridge.setLocalSession(this.localConnection.createSession(false, 2));
                bridge.setRemoteSession(this.remoteConnection.createSession(false, 2));
                this.consumerMap.put((Object)bridge, (Object)bridge);
                bridge.start();
                log.info((Object)("started NetworkMessageBridge for destination: " + destination));
            }
            catch (JMSException jmsEx) {
                log.error((Object)("Failed to start NetworkMessageBridge for destination: " + destination));
            }
        }
        bridge.incrementReferenceCount();
    }

    private void removeConsumerInfo(final ConsumerInfo info) {
        NetworkMessageBridge key = new NetworkMessageBridge();
        key.setDestination(info.getDestination());
        key.setDurableTopic(info.isDurableTopic());
        final NetworkMessageBridge bridge = (NetworkMessageBridge)this.consumerMap.get((Object)key);
        if (bridge != null && bridge.decrementReferenceCount() <= 0 && !bridge.isDurableTopic() && (bridge.getDestination().isTopic() || bridge.getDestination().isTemporary())) {
            Thread runner = new Thread(new Runnable(){

                public void run() {
                    bridge.stop();
                    NetworkChannel.this.consumerMap.remove((Object)bridge);
                    log.info((Object)("stopped MetworkMessageBridge for destination: " + info.getDestination()));
                }
            });
            runner.setDaemon(true);
            runner.start();
        }
    }

    private void startSubscriptions() {
        MessageContainerManager durableTopicMCM = this.brokerContainer.getBroker().getPersistentTopicContainerManager();
        if (durableTopicMCM != null) {
            Map map = durableTopicMCM.getDestinations();
            this.startSubscriptions(map, true);
        }
        Iterator i = this.brokerContainer.getBroker().getContainerManagerMap().values().iterator();
        while (i.hasNext()) {
            MessageContainerManager mcm = (MessageContainerManager)i.next();
            if (mcm == durableTopicMCM) continue;
            this.startSubscriptions(mcm.getDestinations(), false);
        }
    }

    private void startSubscriptions(Map destinations, boolean durableTopic) {
        if (destinations != null) {
            Iterator i = destinations.values().iterator();
            while (i.hasNext()) {
                ActiveMQDestination dest = (ActiveMQDestination)i.next();
                this.addConsumerInfo(dest, durableTopic);
            }
        }
    }

    private void initialize() throws JMSException {
        this.initializeRemote();
        this.initializeLocal();
    }

    private void initializeRemote() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.remoteUserName, this.remotePassword, this.uri);
        factory.setUseAsyncSend(true);
        this.remoteConnection = (ActiveMQConnection)factory.createConnection();
        this.remoteConnection.setClientID(this.brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
        TransportChannel transportChannel = this.remoteConnection.getTransportChannel();
        if (transportChannel instanceof CompositeTransportChannel) {
            CompositeTransportChannel composite = (CompositeTransportChannel)transportChannel;
            composite.setMaximumRetries(this.maximumRetries);
            composite.setFailureSleepTime(this.reconnectSleepTime);
        }
        this.remoteConnection.start();
        BrokerInfo info = new BrokerInfo();
        info.setBrokerName(this.brokerContainer.getBroker().getBrokerName());
        info.setClusterName(this.brokerContainer.getBroker().getBrokerClusterName());
        Receipt receipt = this.remoteConnection.syncSendRequest(info);
        this.remoteBrokerName = receipt.getBrokerName();
        this.remoteClusterName = receipt.getClusterName();
    }

    private void initializeLocal() throws JMSException {
        String brokerName = this.brokerContainer.getBroker().getBrokerName();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
        factory.setUseAsyncSend(true);
        factory.setBrokerName(brokerName);
        this.localConnection = (ActiveMQConnection)factory.createConnection();
        this.localConnection.start();
        BrokerInfo info = new BrokerInfo();
        info.setBrokerName(this.remoteBrokerName);
        info.setClusterName(this.remoteClusterName);
        this.localConnection.asyncSendPacket(info);
    }
}

