/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStreamTask
extends WrappedRunnable {
    private static Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
    public static final int CHUNK_SIZE = 0xA00000;
    public static final int MAX_CONNECT_ATTEMPTS = 8;
    protected final StreamHeader header;
    protected final InetAddress to;
    private SocketChannel channel;

    public FileStreamTask(StreamHeader header, InetAddress to) {
        this.header = header;
        this.to = to;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void runMayThrow() throws IOException {
        try {
            this.connectAttempt();
            this.stream();
        }
        finally {
            block9: {
                try {
                    this.close();
                }
                catch (IOException e) {
                    if (!logger.isDebugEnabled()) break block9;
                    logger.debug("error closing socket", (Throwable)e);
                }
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Done streaming " + this.header.file);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stream() throws IOException {
        ByteBuffer buffer = MessagingService.instance().constructStreamHeader(this.header, false, Gossiper.instance.getVersion(this.to));
        this.writeHeader(buffer);
        if (this.header.file == null) {
            return;
        }
        RandomAccessFile raf = new RandomAccessFile(new File(this.header.file.getFilename()), "r");
        try {
            FileChannel fc = raf.getChannel();
            for (Pair<Long, Long> section : this.header.file.sections) {
                long length = (Long)section.right - (Long)section.left;
                long bytesTransferred = 0L;
                while (bytesTransferred < length) {
                    long lastWrite = this.write(fc, section, length, bytesTransferred);
                    bytesTransferred += lastWrite;
                    this.header.file.progress += lastWrite;
                }
                if (!logger.isDebugEnabled()) continue;
                logger.debug("Bytes transferred " + bytesTransferred + "/" + this.header.file.size);
            }
        }
        finally {
            FileUtils.closeQuietly(raf);
        }
    }

    protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException {
        long toTransfer = Math.min(0xA00000L, length - bytesTransferred);
        return fc.transferTo((Long)section.left + bytesTransferred, toTransfer, this.channel);
    }

    protected void writeHeader(ByteBuffer buffer) throws IOException {
        this.channel.write(buffer);
        assert (buffer.remaining() == 0);
    }

    private void connectAttempt() throws IOException {
        this.bind();
        int attempts = 0;
        while (true) {
            try {
                this.connect();
                break;
            }
            catch (IOException e) {
                if (++attempts >= 8) {
                    throw e;
                }
                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2.0, attempts);
                logger.warn("Failed attempt " + attempts + " to connect to " + this.to + " to stream " + this.header.file + ". Retrying in " + waitms + " ms. (" + e + ")");
                try {
                    Thread.sleep(waitms);
                }
                catch (InterruptedException wtf) {
                    throw new RuntimeException(wtf);
                }
            }
        }
    }

    protected void bind() throws IOException {
        this.channel = SocketChannel.open();
        this.channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
    }

    protected void connect() throws IOException {
        this.channel.connect(new InetSocketAddress(this.to, DatabaseDescriptor.getStoragePort()));
    }

    protected void close() throws IOException {
        this.channel.close();
    }
}

