/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.transport.vm;

import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import javax.jms.JMSException;
import org.activemq.broker.BrokerConnector;
import org.activemq.message.Packet;
import org.activemq.message.PacketListener;
import org.activemq.transport.TransportChannelListener;
import org.activemq.transport.TransportChannelSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class VmTransportChannel
extends TransportChannelSupport
implements Runnable {
    private static final Log log = LogFactory.getLog((Class)VmTransportChannel.class);
    private static final Object TERMINATE = new Object();
    private static int lastThreadId = 0;
    private Channel sendChannel;
    private Channel receiveChannel;
    private int sendCapacity = 10;
    private int receiveCapacity = 10;
    private boolean asyncSend = false;
    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private Thread thread;
    private PacketListener sendListener;
    private VmTransportChannel clientSide;

    public VmTransportChannel() {
    }

    public VmTransportChannel(Channel sendChannel, Channel receiveChannel) {
        this();
        this.sendChannel = sendChannel;
        this.receiveChannel = receiveChannel;
    }

    public VmTransportChannel(int capacity) {
        this((Channel)new BoundedLinkedQueue(capacity), (Channel)new BoundedLinkedQueue(capacity));
    }

    public void start() throws JMSException {
        if (this.started.commit(false, true) && this.isAsyncSend()) {
            this.getSendChannel();
            this.getReceiveChannel();
            this.thread = new Thread((Runnable)this, "VM Transport: " + VmTransportChannel.getNextThreadId());
            if (this.isServerSide()) {
                this.thread.setDaemon(true);
            }
            this.thread.start();
        }
    }

    public void stop() {
        if (this.closed.commit(false, true)) {
            super.stop();
            try {
                if (this.sendChannel != null) {
                    this.sendChannel.put(TERMINATE);
                }
                if (this.receiveChannel != null) {
                    this.receiveChannel.put(TERMINATE);
                }
                if (this.thread != null) {
                    this.thread.join();
                }
            }
            catch (Exception e) {
                log.trace((Object)(this.toString() + " now closed with exception: " + e));
            }
        }
    }

    public void forceDisconnect() {
        throw new IllegalStateException("Disconnection not applicable for VM transport");
    }

    public void asyncSend(Packet packet) throws JMSException {
        block7: {
            if (this.sendChannel != null) {
                while (true) {
                    try {
                        this.sendChannel.put((Object)packet);
                        break block7;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }
            if (this.sendListener == null && this.clientSide != null) {
                this.sendListener = this.clientSide.createPacketListenerSender();
            }
            if (this.sendListener != null) {
                this.sendListener.consume(packet);
            } else {
                throw new JMSException("No sendListener available");
            }
        }
    }

    public boolean isMulticast() {
        return false;
    }

    public void run() {
        while (!this.closed.get()) {
            try {
                Object answer = this.receiveChannel.take();
                if (answer == TERMINATE) {
                    log.trace((Object)"The socket peer is now closed");
                    this.stop();
                    return;
                }
                if (answer == null) continue;
                Packet packet = (Packet)answer;
                if (this.closed.get()) break;
                this.doConsumePacket(packet);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public String toString() {
        return "VmTransportChannel: " + this.sendChannel;
    }

    public void connect(BrokerConnector brokerConnector) throws JMSException {
        TransportChannelListener listener = (TransportChannelListener)((Object)brokerConnector);
        VmTransportChannel serverSide = this.createServerSide();
        listener.addClient(serverSide);
        serverSide.start();
    }

    public VmTransportChannel createServerSide() throws JMSException {
        VmTransportChannel channel = new VmTransportChannel(this.getReceiveChannel(), this.getSendChannel());
        channel.clientSide = this;
        return channel;
    }

    public void setPacketListener(PacketListener listener) {
        super.setPacketListener(listener);
        if (this.clientSide != null) {
            this.clientSide.sendListener = listener;
        }
    }

    public boolean canProcessWireFormatVersion(int version) {
        return true;
    }

    public boolean doesSupportWireFormatVersioning() {
        return false;
    }

    public int getCurrentWireFormatVersion() {
        return -1;
    }

    public boolean doesSupportMessageFragmentation() {
        return false;
    }

    public boolean doesSupportMessageCompression() {
        return false;
    }

    public int getReceiveCapacity() {
        return this.receiveCapacity;
    }

    public void setReceiveCapacity(int receiveCapacity) {
        this.receiveCapacity = receiveCapacity;
    }

    public int getSendCapacity() {
        return this.sendCapacity;
    }

    public void setSendCapacity(int sendCapacity) {
        this.sendCapacity = sendCapacity;
    }

    public boolean isAsyncSend() {
        return this.asyncSend;
    }

    public void setAsyncSend(boolean asyncSend) {
        this.asyncSend = asyncSend;
    }

    public Channel getSendChannel() {
        if (this.isAsyncSend() && this.sendChannel == null) {
            this.sendChannel = this.createChannel(this.getSendCapacity());
        }
        return this.sendChannel;
    }

    public void setSendChannel(Channel sendChannel) {
        this.sendChannel = sendChannel;
    }

    public Channel getReceiveChannel() {
        if (this.isAsyncSend() && this.receiveChannel == null) {
            this.receiveChannel = this.createChannel(this.getReceiveCapacity());
        }
        return this.receiveChannel;
    }

    public void setReceiveChannel(Channel receiveChannel) {
        this.receiveChannel = receiveChannel;
    }

    protected static synchronized int getNextThreadId() {
        return lastThreadId++;
    }

    protected Channel createChannel(int capacity) {
        return new BoundedLinkedQueue(capacity);
    }

    protected PacketListener createPacketListenerSender() {
        return new PacketListener(){

            public void consume(Packet packet) {
                VmTransportChannel.this.doConsumePacket(packet, VmTransportChannel.this.getPacketListener());
            }
        };
    }

    protected void doClose(Exception ex) {
        if (!this.closed.get()) {
            JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
            jmsEx.setLinkedException(ex);
            this.onAsyncException(jmsEx);
            this.stop();
        }
    }

    public PacketListener getSendListener() {
        return this.sendListener;
    }

    public void setSendListener(PacketListener sendListener) {
        this.sendListener = sendListener;
    }
}

