/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.ArrayList;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.TransactionManager;
import org.codehaus.activemq.service.TransactionTask;
import org.codehaus.activemq.service.impl.DefaultQueueList;
import org.codehaus.activemq.service.impl.MessagePointer;

public class SubscriptionImpl
implements Subscription {
    private static final Log log = LogFactory.getLog((Class)SubscriptionImpl.class);
    private String clientId;
    private String subscriberName;
    private ActiveMQDestination destination;
    private String selector;
    private int prefetchLimit;
    private boolean noLocal;
    private boolean active;
    private int consumerNumber;
    private String consumerId;
    private boolean browser;
    protected Dispatcher dispatch;
    protected String brokerName;
    protected String clusterName;
    protected MessageIdentity lastMessageIdentity;
    Filter filter;
    protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
    QueueList messagePtrs = new DefaultQueueList();
    private boolean usePrefetch = false;
    private SubscriberEntry subscriberEntry;
    private ConsumerInfo activeConsumer;
    private BrokerClient activeClient;
    private RedeliveryPolicy redeliveryPolicy;
    private DeadLetterPolicy deadLetterPolicy;

    public SubscriptionImpl(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.dispatch = dispatcher;
        this.filter = filter;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        this.setActiveConsumer(client, info);
    }

    public void setActiveConsumer(BrokerClient client, ConsumerInfo info) {
        BrokerInfo brokerInfo;
        BrokerConnector brokerConnector;
        if (info != null) {
            this.clientId = info.getClientId();
            this.subscriberName = info.getConsumerName();
            this.noLocal = info.isNoLocal();
            this.destination = info.getDestination();
            this.selector = info.getSelector();
            this.prefetchLimit = info.getPrefetchNumber();
            this.consumerNumber = info.getConsumerNo();
            this.consumerId = info.getConsumerId();
            this.browser = info.isBrowser();
        }
        this.activeClient = client;
        this.activeConsumer = info;
        if (client != null && (brokerConnector = client.getBrokerConnector()) != null && (brokerInfo = brokerConnector.getBrokerInfo()) != null) {
            this.brokerName = brokerInfo.getBrokerName();
            this.clusterName = brokerInfo.getClusterName();
        }
    }

    public String toString() {
        String str = "SubscriptionImpl(" + super.hashCode() + ")[" + this.consumerId + "]" + this.clientId + ": " + this.subscriberName + " : " + this.destination;
        return str;
    }

    public synchronized void clear() throws JMSException {
        QueueListEntry entry = this.messagePtrs.getFirstEntry();
        while (entry != null) {
            MessagePointer pointer = (MessagePointer)entry.getElement();
            pointer.clear();
            entry = this.messagePtrs.getNextEntry(entry);
        }
        this.messagePtrs.clear();
    }

    public synchronized void reset() throws JMSException {
        MessagePointer pointer;
        QueueListEntry entry = this.messagePtrs.getFirstEntry();
        while (entry != null && (pointer = (MessagePointer)entry.getElement()).isDispatched()) {
            pointer.reset();
            pointer.setRedelivered(true);
            entry = this.messagePtrs.getNextEntry(entry);
        }
    }

    public String getClientId() {
        return this.clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public Filter getFilter() {
        return this.filter;
    }

    public void setFilter(Filter filter) {
        this.filter = filter;
    }

    public boolean isWildcard() {
        return this.filter.isWildcard();
    }

    public String getPersistentKey() {
        return null;
    }

    public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
        if (this.isDurableTopic()) {
            return SubscriptionImpl.equal(this.clientId, info.getClientId()) && SubscriptionImpl.equal(this.subscriberName, info.getConsumerName());
        }
        return false;
    }

    public boolean isNoLocal() {
        return this.noLocal;
    }

    public void setNoLocal(boolean noLocal) {
        this.noLocal = noLocal;
    }

    public String getSubscriberName() {
        return this.subscriberName;
    }

    public void setSubscriberName(String subscriberName) {
        this.subscriberName = subscriberName;
    }

    public RedeliveryPolicy getRedeliveryPolicy() {
        return this.redeliveryPolicy;
    }

    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
        this.redeliveryPolicy = redeliveryPolicy;
    }

    public boolean isTarget(ActiveMQMessage message) throws JMSException {
        boolean result = false;
        if (!(message == null || this.activeClient != null && this.brokerName != null && this.clusterName != null && this.activeClient.isClusteredConnection() && message.isEntryCluster(this.clusterName) && !message.isEntryBroker(this.brokerName))) {
            boolean bl = result = message.isDispatchedFromDLQ() || this.filter.matches(message);
            if (this.noLocal && result && this.clientIDsEqual(message)) {
                result = false;
            }
        }
        return result;
    }

    public synchronized void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Adding to subscription: " + this + " message: " + message));
        }
        MessagePointer pointer = new MessagePointer(container, message.getJMSMessageIdentity());
        this.messagePtrs.add(pointer);
        this.dispatch.wakeup(this);
        this.lastMessageIdentity = message.getJMSMessageIdentity();
    }

    public synchronized void messageConsumed(final MessageAck ack) throws JMSException {
        int count = 0;
        boolean found = false;
        QueueListEntry entry = this.messagePtrs.getFirstEntry();
        while (entry != null) {
            final MessagePointer pointer = (MessagePointer)entry.getElement();
            ++count;
            if (!ack.isPartOfTransaction() || pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
                if ((ack.isExpired() || ack.isMessageRead()) && !this.browser) {
                    pointer.delete(ack);
                }
                this.unconsumedMessagesDispatched.decrement();
                TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(){

                    public void execute() throws Throwable {
                        SubscriptionImpl.this.unconsumedMessagesDispatched.increment();
                        pointer.reset();
                        pointer.setRedelivered(true);
                        SubscriptionImpl.this.dispatch.wakeup(SubscriptionImpl.this);
                    }
                });
                final QueueListEntry theEntry = entry;
                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){

                    public void execute() throws Throwable {
                        ActiveMQMessage msg;
                        SubscriptionImpl.this.messagePtrs.remove(theEntry);
                        if ((ack.isExpired() || ack.isMessageRead()) && !SubscriptionImpl.this.browser && ack.isExpired() && !pointer.getContainer().isDeadLetterQueue() && (msg = pointer.getContainer().getMessage(pointer.getMessageIdentity())) != null) {
                            SubscriptionImpl.this.deadLetterPolicy.sendToDeadLetter(msg);
                        }
                    }
                });
                if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
                    found = true;
                    break;
                }
            }
            entry = this.messagePtrs.getNextEntry(entry);
        }
        if (!found && log.isDebugEnabled()) {
            log.debug((Object)("Did not find a matching message for identity: " + ack.getMessageIdentity()));
        }
        this.dispatch.wakeup(this);
    }

    public synchronized ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
        if (this.usePrefetch) {
            return this.getMessagesWithPrefetch();
        }
        ArrayList<ActiveMQMessage> tmpList = new ArrayList<ActiveMQMessage>();
        QueueListEntry entry = this.messagePtrs.getFirstEntry();
        while (entry != null) {
            MessagePointer pointer = (MessagePointer)entry.getElement();
            if (!pointer.isDispatched()) {
                ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
                if (msg != null) {
                    if (pointer.isDispatched() || pointer.isRedelivered()) {
                        msg.setJMSRedelivered(true);
                        if (this.redeliveryPolicy.isBackOffMode() && msg.getDeliveryCount() < this.redeliveryPolicy.getMaximumRetryCount()) {
                            long sleepTime = this.redeliveryPolicy.getInitialRedeliveryTimeout();
                            sleepTime = (long)((double)sleepTime * ((double)msg.getDeliveryCount() * this.redeliveryPolicy.getBackOffIncreaseRate()));
                            try {
                                Thread.sleep(sleepTime);
                            }
                            catch (InterruptedException e) {
                                // empty catch block
                            }
                        }
                        msg.incrementDeliveryCount();
                    }
                    if (!pointer.getContainer().isDeadLetterQueue() && (msg.isExpired() || msg.getDeliveryCount() >= this.redeliveryPolicy.getMaximumRetryCount())) {
                        if (msg.isExpired()) {
                            log.warn((Object)("Message: " + msg + " has expired"));
                        } else {
                            log.warn((Object)("Message: " + msg + " exceeded retry count: " + msg.getDeliveryCount()));
                        }
                        this.deadLetterPolicy.sendToDeadLetter(msg);
                        QueueListEntry discarded = entry;
                        entry = this.messagePtrs.getPrevEntry(discarded);
                        this.messagePtrs.remove(discarded);
                    } else {
                        pointer.setDispatched(true);
                        msg.setDispatchedFromDLQ(pointer.getContainer().isDeadLetterQueue());
                        tmpList.add(msg);
                    }
                } else {
                    log.info((Object)("Message probably expired: " + msg));
                    QueueListEntry discarded = entry;
                    entry = this.messagePtrs.getPrevEntry(discarded);
                    this.messagePtrs.remove(discarded);
                    if (msg != null) {
                        this.deadLetterPolicy.sendToDeadLetter(msg);
                    }
                }
            }
            entry = this.messagePtrs.getNextEntry(entry);
        }
        ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
        return tmpList.toArray(messages);
    }

    public synchronized SubscriberEntry getSubscriptionEntry() {
        if (this.subscriberEntry == null) {
            this.subscriberEntry = this.createSubscriptionEntry();
        }
        return this.subscriberEntry;
    }

    public boolean isLocalSubscription() {
        if (this.activeClient != null) {
            return !this.activeClient.isClusteredConnection() && !this.activeClient.isBrokerConnection();
        }
        return true;
    }

    protected SubscriberEntry createSubscriptionEntry() {
        SubscriberEntry answer = new SubscriberEntry();
        answer.setClientID(this.clientId);
        answer.setConsumerName(this.subscriberName);
        answer.setDestination(this.destination.getPhysicalName());
        answer.setSelector(this.selector);
        return answer;
    }

    protected synchronized ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
        ArrayList<ActiveMQMessage> tmpList = new ArrayList<ActiveMQMessage>();
        QueueListEntry entry = this.messagePtrs.getFirstEntry();
        int count = 0;
        int maxNumberToDispatch = this.prefetchLimit - this.unconsumedMessagesDispatched.get();
        while (entry != null && count < maxNumberToDispatch) {
            MessagePointer pointer = (MessagePointer)entry.getElement();
            if (!pointer.isDispatched()) {
                ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
                if (msg != null && !msg.isExpired()) {
                    if (pointer.isDispatched() || pointer.isRedelivered()) {
                        msg.setJMSRedelivered(true);
                    }
                    pointer.setDispatched(true);
                    tmpList.add(msg);
                    this.unconsumedMessagesDispatched.increment();
                    ++count;
                } else {
                    log.info((Object)("Message probably expired: " + msg));
                    QueueListEntry discarded = entry;
                    entry = this.messagePtrs.getPrevEntry(discarded);
                    this.messagePtrs.remove(discarded);
                    if (msg != null) {
                        this.deadLetterPolicy.sendToDeadLetter(msg);
                    }
                }
            }
            entry = this.messagePtrs.getNextEntry(entry);
        }
        ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
        return tmpList.toArray(messages);
    }

    public synchronized boolean isAtPrefetchLimit() throws JMSException {
        if (this.usePrefetch) {
            int underlivedMessageCount = this.messagePtrs.size() - this.unconsumedMessagesDispatched.get();
            return underlivedMessageCount >= this.prefetchLimit;
        }
        return false;
    }

    public synchronized boolean isReadyToDispatch() throws JMSException {
        boolean answer = this.active && this.messagePtrs.size() > 0;
        return answer;
    }

    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    public String getSelector() {
        return this.selector;
    }

    public synchronized boolean isActive() {
        return this.active;
    }

    public synchronized void setActive(boolean active) throws JMSException {
        this.active = active;
        if (!active) {
            this.reset();
        }
    }

    public int getConsumerNumber() {
        return this.consumerNumber;
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public boolean isDurableTopic() throws JMSException {
        return this.destination.isTopic() && this.subscriberName != null && this.subscriberName.length() > 0;
    }

    public boolean isBrowser() throws JMSException {
        return this.browser;
    }

    public MessageIdentity getLastMessageIdentity() throws JMSException {
        return this.lastMessageIdentity;
    }

    public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
        this.lastMessageIdentity = messageIdentity;
    }

    protected boolean clientIDsEqual(ActiveMQMessage message) {
        String msgClientID = message.getJMSClientID();
        String subClientID = this.clientId;
        if (msgClientID == null || subClientID == null) {
            return false;
        }
        return msgClientID.equals(subClientID);
    }

    protected static final boolean equal(Object left, Object right) {
        return left == right || left != null && right != null && left.equals(right);
    }
}

