/*
 * Decompiled with CFR 0.152.
 */
package me.lucko.helper.mongo.external.mongodriver.connection;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import me.lucko.helper.mongo.external.bson.ByteBuf;
import me.lucko.helper.mongo.external.bson.io.ByteBufferBsonInput;
import me.lucko.helper.mongo.external.mongodriver.MongoException;
import me.lucko.helper.mongo.external.mongodriver.MongoInternalException;
import me.lucko.helper.mongo.external.mongodriver.MongoInterruptedException;
import me.lucko.helper.mongo.external.mongodriver.MongoSocketClosedException;
import me.lucko.helper.mongo.external.mongodriver.MongoSocketReadException;
import me.lucko.helper.mongo.external.mongodriver.MongoSocketReadTimeoutException;
import me.lucko.helper.mongo.external.mongodriver.MongoSocketWriteException;
import me.lucko.helper.mongo.external.mongodriver.ServerAddress;
import me.lucko.helper.mongo.external.mongodriver.assertions.Assertions;
import me.lucko.helper.mongo.external.mongodriver.async.SingleResultCallback;
import me.lucko.helper.mongo.external.mongodriver.connection.AsyncCompletionHandler;
import me.lucko.helper.mongo.external.mongodriver.connection.ConnectionDescription;
import me.lucko.helper.mongo.external.mongodriver.connection.ConnectionId;
import me.lucko.helper.mongo.external.mongodriver.connection.InternalConnection;
import me.lucko.helper.mongo.external.mongodriver.connection.InternalConnectionInitializer;
import me.lucko.helper.mongo.external.mongodriver.connection.ReplyHeader;
import me.lucko.helper.mongo.external.mongodriver.connection.ResponseBuffers;
import me.lucko.helper.mongo.external.mongodriver.connection.ServerId;
import me.lucko.helper.mongo.external.mongodriver.connection.Stream;
import me.lucko.helper.mongo.external.mongodriver.connection.StreamFactory;
import me.lucko.helper.mongo.external.mongodriver.diagnostics.logging.Logger;
import me.lucko.helper.mongo.external.mongodriver.diagnostics.logging.Loggers;
import me.lucko.helper.mongo.external.mongodriver.internal.async.ErrorHandlingResultCallback;

class InternalStreamConnection
implements InternalConnection {
    private final ServerId serverId;
    private final StreamFactory streamFactory;
    private final InternalConnectionInitializer connectionInitializer;
    private final Lock writerLock = new ReentrantLock(false);
    private final Lock readerLock = new ReentrantLock(false);
    private final Deque<SendMessageRequest> writeQueue = new ArrayDeque<SendMessageRequest>();
    private final Map<Integer, SingleResultCallback<ResponseBuffers>> readQueue = new HashMap<Integer, SingleResultCallback<ResponseBuffers>>();
    private final Map<Integer, ResponseBuffers> messages = new ConcurrentHashMap<Integer, ResponseBuffers>();
    private boolean isWriting;
    private boolean isReading;
    private final AtomicReference<CountDownLatch> readingPhase = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
    private volatile MongoException exceptionThatPrecededStreamClosing;
    private volatile ConnectionDescription description;
    private volatile Stream stream;
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final AtomicBoolean opened = new AtomicBoolean();
    static final Logger LOGGER = Loggers.getLogger("connection");

    InternalStreamConnection(ServerId serverId, StreamFactory streamFactory, InternalConnectionInitializer connectionInitializer) {
        this.serverId = Assertions.notNull("serverId", serverId);
        this.streamFactory = Assertions.notNull("streamFactory", streamFactory);
        this.connectionInitializer = Assertions.notNull("connectionInitializer", connectionInitializer);
        this.description = new ConnectionDescription(serverId);
    }

    @Override
    public ConnectionDescription getDescription() {
        return this.description;
    }

    @Override
    public void open() {
        Assertions.isTrue("Open already called", this.stream == null);
        this.stream = this.streamFactory.create(this.serverId.getAddress());
        try {
            this.stream.open();
            this.description = this.connectionInitializer.initialize(this);
            this.opened.set(true);
            LOGGER.info(String.format("Opened connection [%s] to %s", this.getId(), this.serverId.getAddress()));
        }
        catch (Throwable t) {
            this.close();
            if (t instanceof MongoException) {
                throw (MongoException)t;
            }
            throw new MongoException(t.toString(), t);
        }
    }

    @Override
    public void openAsync(final SingleResultCallback<Void> callback) {
        Assertions.isTrue("Open already called", this.stream == null, callback);
        try {
            this.stream = this.streamFactory.create(this.serverId.getAddress());
        }
        catch (Throwable t) {
            callback.onResult(null, t);
            return;
        }
        this.stream.openAsync(new AsyncCompletionHandler<Void>(){

            @Override
            public void completed(Void aVoid) {
                InternalStreamConnection.this.connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback<ConnectionDescription>(){

                    @Override
                    public void onResult(ConnectionDescription result, Throwable t) {
                        if (t != null) {
                            InternalStreamConnection.this.close();
                            callback.onResult(null, t);
                        } else {
                            InternalStreamConnection.this.description = result;
                            InternalStreamConnection.this.opened.set(true);
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info(String.format("Opened connection [%s] to %s", InternalStreamConnection.this.getId(), InternalStreamConnection.this.serverId.getAddress()));
                            }
                            callback.onResult(null, null);
                        }
                    }
                });
            }

            @Override
            public void failed(Throwable t) {
                callback.onResult(null, t);
            }
        });
    }

    @Override
    public void close() {
        if (!this.isClosed.getAndSet(true)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Closing connection %s", this.getId()));
            }
            if (this.stream != null) {
                this.stream.close();
            }
        }
    }

    @Override
    public boolean opened() {
        return this.opened.get();
    }

    @Override
    public boolean isClosed() {
        return this.isClosed.get();
    }

    @Override
    public void sendMessage(List<ByteBuf> byteBuffers, int lastRequestId) {
        Assertions.notNull("stream is open", this.stream);
        if (this.isClosed()) {
            throw new MongoSocketClosedException("Cannot write to a closed stream", this.getServerAddress());
        }
        this.writerLock.lock();
        try {
            this.stream.write(byteBuffers);
        }
        catch (Exception e) {
            this.close();
            throw this.translateWriteException(e);
        }
        finally {
            this.writerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResponseBuffers receiveMessage(int responseTo) {
        Assertions.notNull("stream is open", this.stream);
        if (this.isClosed()) {
            throw new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress());
        }
        CountDownLatch localLatch = new CountDownLatch(1);
        this.readerLock.lock();
        try {
            ResponseBuffers responseBuffers = this.receiveResponseBuffers();
            this.messages.put(responseBuffers.getReplyHeader().getResponseTo(), responseBuffers);
            this.readingPhase.getAndSet(localLatch).countDown();
        }
        catch (Throwable t) {
            this.exceptionThatPrecededStreamClosing = this.translateReadException(t);
            this.close();
            this.readingPhase.getAndSet(localLatch).countDown();
        }
        finally {
            this.readerLock.unlock();
        }
        while (true) {
            if (this.isClosed()) {
                if (this.exceptionThatPrecededStreamClosing != null) {
                    throw this.exceptionThatPrecededStreamClosing;
                }
                throw new MongoSocketClosedException("Socket has been closed", this.getServerAddress());
            }
            ResponseBuffers myResponse = this.messages.remove(responseTo);
            if (myResponse != null) {
                return myResponse;
            }
            try {
                localLatch.await();
            }
            catch (InterruptedException e) {
                throw new MongoInterruptedException("Interrupted while reading from stream", e);
            }
            localLatch = this.readingPhase.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendMessageAsync(List<ByteBuf> byteBuffers, int lastRequestId, SingleResultCallback<Void> callback) {
        Assertions.notNull("stream is open", this.stream, callback);
        if (this.isClosed()) {
            callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", this.getServerAddress()));
            return;
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace(String.format("Queuing send message: %s. ([%s])", lastRequestId, this.getId()));
        }
        SendMessageRequest sendMessageRequest = new SendMessageRequest(byteBuffers, lastRequestId, ErrorHandlingResultCallback.errorHandlingCallback(callback, LOGGER));
        boolean mustWrite = false;
        this.writerLock.lock();
        try {
            if (this.isWriting) {
                this.writeQueue.add(sendMessageRequest);
            } else {
                this.isWriting = true;
                mustWrite = true;
            }
        }
        finally {
            this.writerLock.unlock();
        }
        if (mustWrite) {
            this.writeAsync(sendMessageRequest);
        }
    }

    private void writeAsync(final SendMessageRequest request) {
        this.stream.writeAsync(request.getByteBuffers(), new AsyncCompletionHandler<Void>(){

            @Override
            public void completed(Void v) {
                SendMessageRequest nextMessage = null;
                InternalStreamConnection.this.writerLock.lock();
                try {
                    nextMessage = (SendMessageRequest)InternalStreamConnection.this.writeQueue.poll();
                    if (nextMessage == null) {
                        InternalStreamConnection.this.isWriting = false;
                    }
                }
                finally {
                    InternalStreamConnection.this.writerLock.unlock();
                }
                request.getCallback().onResult(null, null);
                if (nextMessage != null) {
                    InternalStreamConnection.this.writeAsync(nextMessage);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void failed(Throwable t) {
                InternalStreamConnection.this.writerLock.lock();
                try {
                    SendMessageRequest nextMessage;
                    MongoException translatedWriteException = InternalStreamConnection.this.translateWriteException(t);
                    request.getCallback().onResult(null, translatedWriteException);
                    while ((nextMessage = (SendMessageRequest)InternalStreamConnection.this.writeQueue.poll()) != null) {
                        nextMessage.callback.onResult(null, translatedWriteException);
                    }
                    InternalStreamConnection.this.isWriting = false;
                    InternalStreamConnection.this.close();
                }
                finally {
                    InternalStreamConnection.this.writerLock.unlock();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveMessageAsync(int responseTo, SingleResultCallback<ResponseBuffers> callback) {
        Assertions.isTrue("stream is open", this.stream != null, callback);
        if (this.isClosed()) {
            callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", this.getServerAddress()));
            return;
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace(String.format("Queuing read message: %s. ([%s])", responseTo, this.getId()));
        }
        ResponseBuffers response = null;
        this.readerLock.lock();
        boolean mustRead = false;
        try {
            response = this.messages.remove(responseTo);
            if (response == null) {
                this.readQueue.put(responseTo, callback);
            }
            if (!this.readQueue.isEmpty() && !this.isReading) {
                this.isReading = true;
                mustRead = true;
            }
        }
        finally {
            this.readerLock.unlock();
        }
        this.executeCallbackAndReceiveResponse(callback, response, mustRead);
    }

    private void executeCallbackAndReceiveResponse(SingleResultCallback<ResponseBuffers> callback, ResponseBuffers result, boolean mustRead) {
        if (callback != null && result != null) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(String.format("Executing callback for %s on %s", result.getReplyHeader().getResponseTo(), this.getId()));
            }
            callback.onResult(result, null);
        }
        if (mustRead) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(String.format("Start receiving response on %s", this.getId()));
            }
            this.receiveResponseAsync();
        }
    }

    private ConnectionId getId() {
        return this.description.getConnectionId();
    }

    private ServerAddress getServerAddress() {
        return this.description.getServerAddress();
    }

    private void receiveResponseAsync() {
        this.readAsync(36, ErrorHandlingResultCallback.errorHandlingCallback(new ResponseHeaderCallback(new ResponseBuffersCallback()), LOGGER));
    }

    private void readAsync(int numBytes, final SingleResultCallback<ByteBuf> callback) {
        if (this.isClosed()) {
            callback.onResult(null, new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress()));
            return;
        }
        try {
            this.stream.readAsync(numBytes, new AsyncCompletionHandler<ByteBuf>(){

                @Override
                public void completed(ByteBuf buffer) {
                    callback.onResult(buffer, null);
                }

                @Override
                public void failed(Throwable t) {
                    InternalStreamConnection.this.close();
                    callback.onResult(null, InternalStreamConnection.this.translateReadException(t));
                }
            });
        }
        catch (Exception e) {
            callback.onResult(null, this.translateReadException(e));
        }
    }

    private MongoException translateWriteException(Throwable e) {
        if (e instanceof MongoException) {
            return (MongoException)e;
        }
        if (e instanceof IOException) {
            return new MongoSocketWriteException("Exception sending message", this.getServerAddress(), e);
        }
        if (e instanceof InterruptedException) {
            return new MongoInternalException("Thread interrupted exception", e);
        }
        return new MongoInternalException("Unexpected exception", e);
    }

    private MongoException translateReadException(Throwable e) {
        if (e instanceof MongoException) {
            return (MongoException)e;
        }
        if (e instanceof SocketTimeoutException) {
            return new MongoSocketReadTimeoutException("Timeout while receiving message", this.getServerAddress(), e);
        }
        if (e instanceof InterruptedIOException) {
            return new MongoInterruptedException("Interrupted while receiving message", (InterruptedIOException)e);
        }
        if (e instanceof ClosedByInterruptException) {
            return new MongoInterruptedException("Interrupted while receiving message", (ClosedByInterruptException)e);
        }
        if (e instanceof IOException) {
            return new MongoSocketReadException("Exception receiving message", this.getServerAddress(), e);
        }
        if (e instanceof RuntimeException) {
            return new MongoInternalException("Unexpected runtime exception", e);
        }
        if (e instanceof InterruptedException) {
            return new MongoInternalException("Interrupted exception", e);
        }
        return new MongoInternalException("Unexpected exception", e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ResponseBuffers receiveResponseBuffers() throws IOException {
        ReplyHeader replyHeader;
        ByteBuf headerByteBuffer = this.stream.read(36);
        ByteBufferBsonInput headerInputBuffer = new ByteBufferBsonInput(headerByteBuffer);
        try {
            replyHeader = new ReplyHeader(headerInputBuffer, this.description.getMaxMessageSize());
        }
        finally {
            headerInputBuffer.close();
        }
        ByteBuf bodyByteBuffer = null;
        if (replyHeader.getNumberReturned() > 0) {
            bodyByteBuffer = this.stream.read(replyHeader.getMessageLength() - 36);
        }
        return new ResponseBuffers(replyHeader, bodyByteBuffer);
    }

    @Override
    public ByteBuf getBuffer(int size) {
        Assertions.notNull("open", this.stream);
        return this.stream.getBuffer(size);
    }

    private void failAllQueuedReads(Throwable t) {
        this.close();
        Iterator<Map.Entry<Integer, SingleResultCallback<ResponseBuffers>>> it = this.readQueue.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SingleResultCallback<ResponseBuffers>> pairs = it.next();
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(String.format("Processing unknown failed message: %s. ([%s] %s)", pairs.getKey(), this.getId(), this.serverId));
            }
            SingleResultCallback<ResponseBuffers> callback = pairs.getValue();
            it.remove();
            try {
                callback.onResult(null, t);
            }
            catch (Throwable tr) {
                LOGGER.warn("Exception calling callback", tr);
            }
        }
    }

    private static class SendMessageRequest {
        private final SingleResultCallback<Void> callback;
        private final List<ByteBuf> byteBuffers;
        private final int messageId;

        SendMessageRequest(List<ByteBuf> byteBuffers, int messageId, SingleResultCallback<Void> callback) {
            this.byteBuffers = byteBuffers;
            this.messageId = messageId;
            this.callback = callback;
        }

        public SingleResultCallback<Void> getCallback() {
            return this.callback;
        }

        public List<ByteBuf> getByteBuffers() {
            return this.byteBuffers;
        }

        public int getMessageId() {
            return this.messageId;
        }
    }

    private class ResponseHeaderCallback
    implements SingleResultCallback<ByteBuf> {
        private final SingleResultCallback<ResponseBuffers> callback;

        ResponseHeaderCallback(SingleResultCallback<ResponseBuffers> callback) {
            this.callback = callback;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onResult(ByteBuf result, Throwable throwableFromCallback) {
            if (throwableFromCallback != null) {
                this.callback.onResult(null, throwableFromCallback);
            } else {
                try {
                    ReplyHeader replyHeader;
                    ByteBufferBsonInput headerInputBuffer = new ByteBufferBsonInput(result);
                    try {
                        replyHeader = new ReplyHeader(headerInputBuffer, InternalStreamConnection.this.description.getMaxMessageSize());
                    }
                    finally {
                        headerInputBuffer.close();
                    }
                    if (replyHeader.getMessageLength() == 36) {
                        this.onSuccess(new ResponseBuffers(replyHeader, null));
                    } else {
                        InternalStreamConnection.this.readAsync(replyHeader.getMessageLength() - 36, new ResponseBodyCallback(replyHeader));
                    }
                }
                catch (Throwable t) {
                    this.callback.onResult(null, t);
                }
            }
        }

        private void onSuccess(ResponseBuffers responseBuffers) {
            if (responseBuffers == null) {
                this.callback.onResult(null, new MongoException("Unexpected empty response buffers"));
                return;
            }
            try {
                this.callback.onResult(responseBuffers, null);
            }
            catch (Throwable t) {
                LOGGER.warn("Exception calling callback", t);
            }
        }

        private class ResponseBodyCallback
        implements SingleResultCallback<ByteBuf> {
            private final ReplyHeader replyHeader;

            ResponseBodyCallback(ReplyHeader replyHeader) {
                this.replyHeader = replyHeader;
            }

            @Override
            public void onResult(ByteBuf result, Throwable t) {
                if (t != null) {
                    try {
                        ResponseHeaderCallback.this.callback.onResult(new ResponseBuffers(this.replyHeader, result), t);
                    }
                    catch (Throwable tr) {
                        LOGGER.warn("Exception calling callback", tr);
                    }
                } else {
                    ResponseHeaderCallback.this.onSuccess(new ResponseBuffers(this.replyHeader, result));
                }
            }
        }
    }

    private class ResponseBuffersCallback
    implements SingleResultCallback<ResponseBuffers> {
        private ResponseBuffersCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onResult(ResponseBuffers result, Throwable t) {
            SingleResultCallback callback = null;
            boolean mustRead = false;
            InternalStreamConnection.this.readerLock.lock();
            try {
                if (t != null) {
                    InternalStreamConnection.this.failAllQueuedReads(t);
                    return;
                }
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace(String.format("Read response to message %s on %s", result.getReplyHeader().getResponseTo(), InternalStreamConnection.this.getId()));
                }
                callback = (SingleResultCallback)InternalStreamConnection.this.readQueue.remove(result.getReplyHeader().getResponseTo());
                if (InternalStreamConnection.this.readQueue.isEmpty()) {
                    InternalStreamConnection.this.isReading = false;
                } else {
                    mustRead = true;
                }
                if (callback == null) {
                    InternalStreamConnection.this.messages.put(result.getReplyHeader().getResponseTo(), result);
                }
            }
            finally {
                InternalStreamConnection.this.readerLock.unlock();
            }
            InternalStreamConnection.this.executeCallbackAndReceiveResponse(callback, result, mustRead);
        }
    }
}

