/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import org.activemq.broker.BrokerClient;
import org.activemq.filter.AndFilter;
import org.activemq.filter.DestinationMap;
import org.activemq.filter.Filter;
import org.activemq.filter.FilterFactory;
import org.activemq.filter.FilterFactoryImpl;
import org.activemq.filter.NoLocalFilter;
import org.activemq.io.util.MemoryBoundedQueueManager;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.MessageAck;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.MessageContainer;
import org.activemq.service.MessageContainerManager;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.service.boundedvm.DurableMessagePointer;
import org.activemq.service.boundedvm.DurableQueueBoundedMessageContainer;
import org.activemq.service.boundedvm.DurableQueueSubscription;
import org.activemq.service.boundedvm.DurableSubscription;
import org.activemq.store.MessageStore;
import org.activemq.store.PersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DurableQueueBoundedMessageManager
implements MessageContainerManager,
Runnable {
    private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10;
    private static final long DEFAULT_INACTIVE_TIMEOUT = 30000L;
    private static final Log log = LogFactory.getLog((Class)DurableQueueBoundedMessageManager.class);
    private MemoryBoundedQueueManager queueManager;
    private ConcurrentHashMap containers;
    private ConcurrentHashMap subscriptions;
    private FilterFactory filterFactory;
    private SynchronizedBoolean started;
    private SynchronizedBoolean doingGarbageCollection;
    private Map destinations;
    private DestinationMap destinationMap;
    private PooledExecutor threadPool;
    private long inactiveTimeout;
    private int garbageCoolectionCapacityLimit;
    private RedeliveryPolicy redeliveryPolicy;
    private DeadLetterPolicy deadLetterPolicy;
    private final PersistenceAdapter persistenceAdapter;

    public DurableQueueBoundedMessageManager(PersistenceAdapter persistenceAdapter, MemoryBoundedQueueManager mgr, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.persistenceAdapter = persistenceAdapter;
        this.queueManager = mgr;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        this.containers = new ConcurrentHashMap();
        this.destinationMap = new DestinationMap();
        this.destinations = new ConcurrentHashMap();
        this.subscriptions = new ConcurrentHashMap();
        this.filterFactory = new FilterFactoryImpl();
        this.started = new SynchronizedBoolean(false);
        this.doingGarbageCollection = new SynchronizedBoolean(false);
        this.threadPool = new PooledExecutor();
        this.threadPool.setThreadFactory((ThreadFactory)new DurableQueueThreadFactory());
        this.inactiveTimeout = 30000L;
        this.garbageCoolectionCapacityLimit = 10;
    }

    public int getGarbageCoolectionCapacityLimit() {
        return this.garbageCoolectionCapacityLimit;
    }

    public void setGarbageCoolectionCapacityLimit(int garbageCoolectionCapacityLimit) {
        this.garbageCoolectionCapacityLimit = garbageCoolectionCapacityLimit;
    }

    public long getInactiveTimeout() {
        return this.inactiveTimeout;
    }

    public void setInactiveTimeout(long inactiveTimeout) {
        this.inactiveTimeout = inactiveTimeout;
    }

    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            Iterator i = this.containers.values().iterator();
            while (i.hasNext()) {
                DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)i.next();
                container.start();
            }
            try {
                this.threadPool.execute((Runnable)this);
            }
            catch (InterruptedException e) {
                JMSException jmsEx = new JMSException("Garbage collection interupted on start()");
                jmsEx.setLinkedException((Exception)e);
                throw jmsEx;
            }
        }
    }

    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            Iterator i = this.containers.values().iterator();
            while (i.hasNext()) {
                DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)i.next();
                container.stop();
            }
            this.threadPool.interruptAll();
            this.threadPool.shutdownNow();
        }
    }

    public void run() {
        while (this.started.get()) {
            this.doGarbageCollection();
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        String name;
        DurableQueueSubscription ts;
        ActiveMQDestination destination = info.getDestination();
        if (!destination.isQueue() || destination.isTemporary()) {
            return;
        }
        DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)this.containers.get((Object)destination);
        if (container == null) {
            container = this.createContainer(destination, false);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Adding consumer: " + info));
        }
        if ((ts = container.addConsumer(this.createFilter(info), info, client)) != null) {
            this.subscriptions.put((Object)info.getConsumerId(), (Object)ts);
        }
        if (!this.destinations.containsKey(name = destination.getPhysicalName())) {
            this.destinations.put(name, destination);
        }
    }

    private DurableQueueBoundedMessageContainer createContainer(ActiveMQDestination destination, boolean isDeadLetterQueue) throws JMSException {
        MessageStore messageStore = this.persistenceAdapter.createQueueMessageStore(destination.getPhysicalName());
        DurableQueueBoundedMessageContainer container = new DurableQueueBoundedMessageContainer(messageStore, (Executor)this.threadPool, this.queueManager, destination, isDeadLetterQueue ? null : this.redeliveryPolicy, this.deadLetterPolicy);
        this.addContainer(container);
        if (this.started.get()) {
            container.start();
        }
        return container;
    }

    public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        ActiveMQDestination destination = info.getDestination();
        if (!destination.isQueue() || destination.isTemporary()) {
            return;
        }
        Iterator i = this.containers.values().iterator();
        while (i.hasNext()) {
            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)i.next();
            if (container == null) continue;
            container.removeConsumer(info);
        }
        this.subscriptions.remove((Object)info.getConsumerId());
    }

    public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
    }

    public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
        DurableQueueBoundedMessageContainer container;
        ActiveMQDestination destination = message.getJMSActiveMQDestination();
        if (!destination.isQueue() || destination.isTemporary() || !message.isPersistent()) {
            return;
        }
        if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
            this.doGarbageCollection();
        }
        if ((container = (DurableQueueBoundedMessageContainer)this.containers.get((Object)destination)) == null) {
            container = this.createContainer(destination, false);
        }
        Set set = this.destinationMap.get(message.getJMSActiveMQDestination());
        Iterator i = set.iterator();
        while (i.hasNext()) {
            container = (DurableQueueBoundedMessageContainer)i.next();
            container.enqueue(message);
        }
    }

    public void acknowledgeMessage(BrokerClient client, final MessageAck ack) throws JMSException {
        ActiveMQDestination destination = ack.getDestination();
        if (destination == null) {
            log.warn((Object)("Ignoring acknowledgeMessage() on null destination: " + ack));
            return;
        }
        if (!destination.isQueue() || destination.isTemporary() || !ack.isPersistent()) {
            return;
        }
        final DurableQueueSubscription ts = (DurableQueueSubscription)this.subscriptions.get((Object)ack.getConsumerId());
        if (ts == null) {
            return;
        }
        DurableMessagePointer messagePointer = ts.acknowledgeMessage(ack.getMessageID());
        if (messagePointer == null) {
            return;
        }
        if (ts.isBrowser()) {
            ts.addAckedMessage(messagePointer);
            return;
        }
        if (!ack.isMessageRead() || ack.isExpired()) {
            this.redeliverMessage(ts, ack, messagePointer);
            return;
        }
        messagePointer.getMessageStore().removeMessage(ack);
        if (TransactionManager.isCurrentTransaction()) {
            if (!ts.hasAckedMessage()) {
                TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(){

                    public void execute() throws Throwable {
                        List ackList = ts.listAckedMessages();
                        HashMap<DurableQueueBoundedMessageContainer, LinkedList<DurableMessagePointer>> redeliverMap = new HashMap<DurableQueueBoundedMessageContainer, LinkedList<DurableMessagePointer>>();
                        for (int x = 0; x < ackList.size(); ++x) {
                            DurableMessagePointer messagePointer = (DurableMessagePointer)ackList.get(x);
                            ActiveMQMessage message = messagePointer.getMessage();
                            message.setJMSRedelivered(true);
                            if (message.incrementDeliveryCount() >= DurableQueueBoundedMessageManager.this.redeliveryPolicy.getMaximumRetryCount()) {
                                if (log.isDebugEnabled()) {
                                    log.debug((Object)("Message: " + message + " has exceeded its retry count"));
                                }
                                DurableQueueBoundedMessageManager.this.deadLetterPolicy.sendToDeadLetter(message);
                                continue;
                            }
                            if (ack.isExpired()) {
                                if (log.isDebugEnabled()) {
                                    log.debug((Object)("Message: " + message + " has expired"));
                                }
                                DurableQueueBoundedMessageManager.this.deadLetterPolicy.sendToDeadLetter(message);
                                continue;
                            }
                            Set containers = DurableQueueBoundedMessageManager.this.destinationMap.get(message.getJMSActiveMQDestination());
                            Iterator i = containers.iterator();
                            while (i.hasNext()) {
                                DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)i.next();
                                LinkedList<DurableMessagePointer> l = (LinkedList<DurableMessagePointer>)redeliverMap.get(container);
                                if (l == null) {
                                    l = new LinkedList<DurableMessagePointer>();
                                    redeliverMap.put(container, l);
                                }
                                l.add(messagePointer);
                            }
                        }
                        Iterator i = redeliverMap.keySet().iterator();
                        while (i.hasNext()) {
                            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)i.next();
                            List l = (List)redeliverMap.get(container);
                            container.redeliver(l);
                        }
                        ts.removeAllAckedMessages();
                    }
                });
                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){

                    public void execute() throws Throwable {
                        ts.removeAllAckedMessages();
                    }
                });
            }
            ts.addAckedMessage(messagePointer);
        }
    }

    private void redeliverMessage(DurableQueueSubscription ts, MessageAck ack, DurableMessagePointer message) throws JMSException {
        block3: {
            block4: {
                block2: {
                    message.getMessage().setJMSRedelivered(true);
                    if (message.incrementDeliveryCount() < this.redeliveryPolicy.getMaximumRetryCount()) break block2;
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Message: " + message + " has exceeded its retry count"));
                    }
                    this.deadLetterPolicy.sendToDeadLetter(message.getMessage());
                    break block3;
                }
                if (!ack.isExpired()) break block4;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Message: " + message + " has expired"));
                }
                this.deadLetterPolicy.sendToDeadLetter(message.getMessage());
                break block3;
            }
            Set set = this.destinationMap.get(message.getMessage().getJMSActiveMQDestination());
            Iterator i = set.iterator();
            if (!i.hasNext()) break block3;
            DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)i.next();
            container.redeliver(message);
        }
    }

    public void poll() throws JMSException {
    }

    public MessageContainer getContainer(String physicalName) throws JMSException {
        ActiveMQDestination key = (ActiveMQDestination)this.destinations.get(physicalName);
        if (key != null) {
            return (MessageContainer)this.containers.get((Object)key);
        }
        return null;
    }

    public Map getDestinations() {
        return Collections.unmodifiableMap(this.containers);
    }

    public Map getLocalDestinations() {
        HashMap<String, ActiveMQDestination> localDestinations = new HashMap<String, ActiveMQDestination>();
        Iterator iter = this.subscriptions.values().iterator();
        while (iter.hasNext()) {
            DurableSubscription sub = (DurableSubscription)iter.next();
            if (!sub.isLocalSubscription()) continue;
            ActiveMQDestination dest = sub.getDestination();
            localDestinations.put(dest.getPhysicalName(), dest);
        }
        return Collections.unmodifiableMap(localDestinations);
    }

    public DeadLetterPolicy getDeadLetterPolicy() {
        return this.deadLetterPolicy;
    }

    public void setDeadLetterPolicy(DeadLetterPolicy policy) {
        this.deadLetterPolicy = policy;
    }

    protected Filter createFilter(ConsumerInfo info) throws JMSException {
        Filter filter = this.filterFactory.createFilter(info.getDestination(), info.getSelector());
        if (info.isNoLocal()) {
            filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
        }
        return filter;
    }

    private void doGarbageCollection() {
        if (this.doingGarbageCollection.commit(true, false)) {
            DurableQueueBoundedMessageContainer container;
            Iterator i;
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                i = this.containers.values().iterator();
                while (i.hasNext()) {
                    container = (DurableQueueBoundedMessageContainer)i.next();
                    container.removeExpiredMessages();
                }
            }
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                i = this.containers.values().iterator();
                while (i.hasNext()) {
                    container = (DurableQueueBoundedMessageContainer)i.next();
                    if (container.isActive() || container.getIdleTimestamp() >= System.currentTimeMillis() - this.inactiveTimeout) continue;
                    this.removeContainer(container);
                    log.warn((Object)("memory limit low - forced to remove inactive and idle queue: " + container.getDestinationName()));
                }
            }
            i = this.containers.values().iterator();
            while (i.hasNext()) {
                container = (DurableQueueBoundedMessageContainer)i.next();
                if (container.isActive() || container.isEmpty()) continue;
                this.removeContainer(container);
            }
            this.doingGarbageCollection.set(false);
        }
    }

    private synchronized void addContainer(DurableQueueBoundedMessageContainer container) {
        this.containers.put((Object)container.getDestination(), (Object)container);
        this.destinationMap.put(container.getDestination(), container);
    }

    private synchronized void removeContainer(DurableQueueBoundedMessageContainer container) {
        try {
            container.close();
            log.info((Object)("closed inactive Durable queue container: " + container.getDestinationName()));
        }
        catch (JMSException e) {
            log.warn((Object)"failure closing container", (Throwable)e);
        }
        this.containers.remove((Object)container.getDestination());
        this.destinationMap.remove(container.getDestination(), container);
    }

    protected Executor getThreadPool() {
        return this.threadPool;
    }

    public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
        this.createContainer(dest, false);
    }

    public Map getMessageContainerAdmins() throws JMSException {
        return Collections.unmodifiableMap(this.containers);
    }

    public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
        if (!dest.isQueue()) {
            return;
        }
        DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)this.containers.remove((Object)dest);
        if (container != null) {
            container.empty();
            container.stop();
        }
        this.destinationMap.removeAll(dest);
    }

    public void sendToDeadLetterQueue(String deadLetterName, ActiveMQMessage message) throws JMSException {
        ActiveMQQueue destination = new ActiveMQQueue(deadLetterName);
        DurableQueueBoundedMessageContainer container = (DurableQueueBoundedMessageContainer)this.containers.get((Object)destination);
        if (container == null) {
            container = this.createContainer(destination, true);
        }
        container.enqueue(message);
    }

    protected static class DurableQueueThreadFactory
    implements ThreadFactory {
        protected DurableQueueThreadFactory() {
        }

        public Thread newThread(Runnable command) {
            Thread result = new Thread(command, "Durable Queue Worker");
            result.setPriority(6);
            result.setDaemon(true);
            return result;
        }
    }
}

