/*
 * Decompiled with CFR 0.152.
 */
package org.activemq;

import java.io.IOException;
import java.io.InputStream;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQDispatcher;
import org.activemq.MessageDispatchChannel;
import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQMessage;
import org.activemq.command.ConsumerId;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.command.ProducerId;
import org.activemq.selector.SelectorParser;
import org.activemq.util.IOExceptionSupport;
import org.activemq.util.JMSExceptionSupport;

public class ActiveMQInputStream
extends InputStream
implements ActiveMQDispatcher {
    private final ActiveMQConnection connection;
    private final ConsumerInfo info;
    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
    private int deliveredCounter = 0;
    private MessageDispatch lastDelivered;
    private boolean eosReached;
    private byte[] buffer;
    private int pos;
    private ProducerId producerId;
    private long nextSequenceId = 0L;

    public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch) throws JMSException {
        this.connection = connection;
        if (dest == null) {
            throw new InvalidDestinationException("Don't understand null destinations");
        }
        if (dest.isTemporary()) {
            String physicalName = dest.getPhysicalName();
            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
            }
            String connectionID = connection.getConnectionInfo().getConnectionId().getConnectionId();
            if (physicalName.indexOf(connectionID) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }
            if (connection.isDeleted(dest)) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
        }
        this.info = new ConsumerInfo(consumerId);
        this.info.setDestination(dest);
        this.info.setSubcriptionName(name);
        selector = selector != null && selector.trim().length() != 0 ? "JMSType='org.activemq.Stream' AND ( " + selector + " ) " : "JMSType='org.activemq.Stream'";
        new SelectorParser().parse(selector);
        this.info.setSelector(selector);
        this.info.setPrefetchSize(prefetch);
        this.info.setNoLocal(noLocal);
        this.info.setBrowser(false);
        this.info.setDispatchAsync(false);
        this.connection.addInputStream(this);
        this.connection.addDispatcher(this.info.getConsumerId(), this);
        this.connection.syncSendPacket(this.info);
        this.unconsumedMessages.start();
    }

    public void close() throws IOException {
        if (!this.unconsumedMessages.isClosed()) {
            try {
                if (this.lastDelivered != null) {
                    MessageAck ack = new MessageAck(this.lastDelivered, 2, this.deliveredCounter);
                    this.connection.asyncSendPacket(ack);
                }
                this.dispose();
                this.connection.syncSendPacket(this.info.createRemoveCommand());
            }
            catch (JMSException e) {
                throw IOExceptionSupport.create((Exception)((Object)e));
            }
        }
    }

    public void dispose() {
        if (!this.unconsumedMessages.isClosed()) {
            this.unconsumedMessages.close();
            this.connection.removeDispatcher(this.info.getConsumerId());
            this.connection.removeInputStream(this);
        }
    }

    public ActiveMQMessage receive() throws JMSException {
        MessageDispatch md;
        this.checkClosed();
        try {
            md = this.unconsumedMessages.dequeue(-1L);
        }
        catch (InterruptedException e) {
            throw JMSExceptionSupport.create(e);
        }
        if (md == null || this.unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
            return null;
        }
        ++this.deliveredCounter;
        if (0.75 * (double)this.info.getPrefetchSize() <= (double)this.deliveredCounter) {
            MessageAck ack = new MessageAck(md, 2, this.deliveredCounter);
            this.connection.asyncSendPacket(ack);
            this.deliveredCounter = 0;
            this.lastDelivered = null;
        } else {
            this.lastDelivered = md;
        }
        return (ActiveMQMessage)md.getMessage();
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.unconsumedMessages.isClosed()) {
            throw new IllegalStateException("The Consumer is closed");
        }
    }

    public int read() throws IOException {
        this.fillBuffer();
        if (this.eosReached) {
            return -1;
        }
        return this.buffer[this.pos++] & 0xFF;
    }

    public int read(byte[] b, int off, int len) throws IOException {
        this.fillBuffer();
        if (this.eosReached) {
            return -1;
        }
        int max = Math.min(len, this.buffer.length - this.pos);
        System.arraycopy(this.buffer, this.pos, b, off, max);
        this.pos += max;
        return max;
    }

    private void fillBuffer() throws IOException {
        if (this.eosReached || this.buffer != null && this.buffer.length > this.pos) {
            return;
        }
        try {
            block8: {
                ActiveMQMessage m;
                while ((m = this.receive()) != null && m.getDataStructureType() == 24) {
                    if (this.producerId == null) {
                        if (m.getMessageId().getProducerSequenceId() != 0L) continue;
                        ++this.nextSequenceId;
                        this.producerId = m.getMessageId().getProducerId();
                    } else {
                        if (!m.getMessageId().getProducerId().equals(this.producerId)) {
                            throw new IOException("Received an unexpected message: invalid producer: " + m);
                        }
                        if (m.getMessageId().getProducerSequenceId() != this.nextSequenceId++) {
                            throw new IOException("Received an unexpected message: invalid sequence id: " + m);
                        }
                    }
                    ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
                    this.buffer = new byte[(int)bm.getBodyLength()];
                    bm.readBytes(this.buffer);
                    this.pos = 0;
                    break block8;
                }
                this.eosReached = true;
            }
            return;
        }
        catch (JMSException e) {
            this.eosReached = true;
            throw IOExceptionSupport.create((Exception)((Object)e));
        }
    }

    public void dispatch(MessageDispatch md) {
        this.unconsumedMessages.enqueue(md);
    }
}

