/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Options;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsStatistics;
import io.nats.client.support.NatsConstants;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

class NatsConnectionWriter
implements Runnable {
    private final NatsConnection connection;
    private Future<Boolean> stopped;
    private Future<DataPort> dataPortFuture;
    private DataPort dataPort = null;
    private final AtomicBoolean running;
    private final AtomicBoolean reconnectMode;
    private final ReentrantLock startStopLock;
    private final AtomicBoolean isWriting;
    private byte[] sendBuffer;
    private MessageQueue outgoing;
    private MessageQueue reconnectOutgoing;

    NatsConnectionWriter(NatsConnection connection) {
        this.connection = connection;
        this.running = new AtomicBoolean(false);
        this.reconnectMode = new AtomicBoolean(false);
        this.isWriting = new AtomicBoolean(false);
        this.startStopLock = new ReentrantLock();
        this.stopped = new CompletableFuture<Boolean>();
        ((CompletableFuture)this.stopped).complete(Boolean.TRUE);
        Options options = connection.getOptions();
        int bufSize = options.getBufferSize();
        this.sendBuffer = new byte[bufSize];
        this.outgoing = new MessageQueue(true, options.getMaxMessagesInOutgoingQueue(), options.isDiscardMessagesWhenOutgoingQueueFull());
        this.reconnectOutgoing = new MessageQueue(true, 0);
    }

    void start(Future<DataPort> dataPortFuture) {
        this.startStopLock.lock();
        try {
            this.dataPortFuture = dataPortFuture;
            this.running.set(true);
            this.outgoing.resume();
            this.reconnectOutgoing.resume();
            this.stopped = this.connection.getExecutor().submit(this, Boolean.TRUE);
        }
        finally {
            this.startStopLock.unlock();
        }
    }

    Future<Boolean> stop() {
        this.running.set(false);
        this.startStopLock.lock();
        try {
            this.outgoing.pause();
            this.reconnectOutgoing.pause();
            this.outgoing.filter(msg -> Arrays.equals(NatsConstants.OP_PING_BYTES, msg.getProtocolBytes()) || Arrays.equals(NatsConstants.OP_PONG_BYTES, msg.getProtocolBytes()));
        }
        finally {
            this.startStopLock.unlock();
        }
        return this.stopped;
    }

    synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, NatsStatistics stats) throws IOException {
        int sendPosition = 0;
        while (msg != null) {
            long size = msg.getSizeInBytes();
            if ((long)sendPosition + size > (long)this.sendBuffer.length) {
                if (sendPosition == 0) {
                    this.sendBuffer = new byte[(int)Math.max((long)this.sendBuffer.length + size, (long)(this.sendBuffer.length * 2))];
                } else {
                    dataPort.write(this.sendBuffer, sendPosition);
                    this.connection.getNatsStatistics().registerWrite(sendPosition);
                    sendPosition = 0;
                    msg = msg.next;
                    if (msg == null) break;
                }
            }
            byte[] bytes = msg.getProtocolBytes();
            System.arraycopy(bytes, 0, this.sendBuffer, sendPosition, bytes.length);
            sendPosition += bytes.length;
            this.sendBuffer[sendPosition++] = 13;
            this.sendBuffer[sendPosition++] = 10;
            if (!msg.isProtocol()) {
                bytes = msg.getSerializedHeader();
                if (bytes != null && bytes.length > 0) {
                    System.arraycopy(bytes, 0, this.sendBuffer, sendPosition, bytes.length);
                    sendPosition += bytes.length;
                }
                if ((bytes = msg.getData()).length > 0) {
                    System.arraycopy(bytes, 0, this.sendBuffer, sendPosition, bytes.length);
                    sendPosition += bytes.length;
                }
                this.sendBuffer[sendPosition++] = 13;
                this.sendBuffer[sendPosition++] = 10;
            }
            stats.incrementOutMsgs();
            stats.incrementOutBytes(size);
            msg = msg.next;
        }
        dataPort.write(this.sendBuffer, sendPosition);
        this.connection.getNatsStatistics().registerWrite(sendPosition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Duration waitForMessage = Duration.ofMinutes(2L);
        Duration reconnectWait = Duration.ofMillis(1L);
        try {
            this.dataPort = this.dataPortFuture.get();
            NatsStatistics stats = this.connection.getNatsStatistics();
            int maxAccumulate = 1000;
            while (this.running.get()) {
                NatsMessage msg = null;
                msg = this.reconnectMode.get() ? this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxAccumulate, reconnectWait) : this.outgoing.accumulate(this.sendBuffer.length, maxAccumulate, waitForMessage);
                if (msg == null) continue;
                this.sendMessageBatch(msg, this.dataPort, stats);
            }
        }
        catch (IOException | BufferOverflowException io) {
            this.connection.handleCommunicationIssue(io);
        }
        catch (InterruptedException | CancellationException | ExecutionException exception) {
        }
        finally {
            this.running.set(false);
        }
    }

    void setReconnectMode(boolean tf) {
        this.reconnectMode.set(tf);
    }

    boolean canQueue(NatsMessage msg, long maxSize) {
        return maxSize < 0L || this.outgoing.sizeInBytes() + msg.getSizeInBytes() < maxSize;
    }

    boolean queue(NatsMessage msg) {
        return this.outgoing.push(msg);
    }

    void queueInternalMessage(NatsMessage msg) {
        if (this.reconnectMode.get()) {
            this.reconnectOutgoing.push(msg);
        } else {
            this.outgoing.push(msg, true);
        }
    }

    synchronized void flushBuffer() {
        try {
            if (this.running.get()) {
                this.dataPort.flush();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

