package org.apache.cassandra.streaming;

import com.ning.compress.lzf.LZFOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.net.Header;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/FileStreamTask.class */
public class FileStreamTask extends WrappedRunnable {
    private static Logger logger;
    public static final int CHUNK_SIZE = 65536;
    public static final int MAX_CONNECT_ATTEMPTS = 4;
    protected final StreamHeader header;
    protected final InetAddress to;
    private Socket socket;
    private OutputStream output;
    private OutputStream compressedoutput;
    private DataInputStream input;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final byte[] transferBuffer = new byte[65536];
    private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
    private final Throttle throttle = new Throttle(toString(), new Throttle.ThroughputFunction() { // from class: org.apache.cassandra.streaming.FileStreamTask.1
        @Override // org.apache.cassandra.utils.Throttle.ThroughputFunction
        public int targetThroughput() {
            if (DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1) {
                return 0;
            }
            return ((((DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024) * 1024) / 8) / 1000) / Math.max(1, MessagingService.instance().getActiveStreamsOutbound());
        }
    });

    public FileStreamTask(StreamHeader streamHeader, InetAddress inetAddress) {
        this.header = streamHeader;
        this.to = inetAddress;
    }

    @Override // org.apache.cassandra.utils.WrappedRunnable
    public void runMayThrow() throws IOException {
        try {
            try {
                connectAttempt();
                stream();
                StreamOutSession streamOutSession = StreamOutSession.get(this.to, this.header.sessionId);
                if (streamOutSession == null) {
                    logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
                } else if (streamOutSession.getFiles().size() == 0) {
                    receiveReply();
                    logger.info("Finished streaming session to {}", this.to);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Done streaming " + this.header.file);
                }
            } catch (IOException e) {
                StreamOutSession streamOutSession2 = StreamOutSession.get(this.to, this.header.sessionId);
                if (streamOutSession2 != null) {
                    streamOutSession2.close(false);
                }
                throw e;
            }
        } finally {
            try {
                close();
            } catch (IOException e2) {
                if (logger.isDebugEnabled()) {
                    logger.debug("error closing socket", (Throwable) e2);
                }
            }
        }
    }

    private void stream() throws IOException {
        this.output.write(ByteBufferUtil.getArray(MessagingService.instance().constructStreamHeader(this.header, false, Gossiper.instance.getVersion(this.to).intValue())));
        if (this.header.file == null) {
            return;
        }
        RandomAccessReader open = this.header.file.sstable.compression ? CompressedRandomAccessReader.open(this.header.file.getFilename(), this.header.file.sstable.getCompressionMetadata(), true) : RandomAccessReader.open(new File(this.header.file.getFilename()), true);
        this.compressedoutput = new LZFOutputStream(this.output);
        MessagingService.instance().incrementActiveStreamsOutbound();
        try {
            for (Pair<Long, Long> pair : this.header.file.sections) {
                open.seek(pair.left.longValue());
                long longValue = pair.right.longValue() - pair.left.longValue();
                long j = 0;
                while (j < longValue) {
                    long write = write(open, longValue, j);
                    j += write;
                    this.header.file.progress += write;
                }
                this.compressedoutput.flush();
                if (logger.isDebugEnabled()) {
                    logger.debug("Bytes transferred " + j + "/" + this.header.file.size);
                }
            }
            receiveReply();
            MessagingService.instance().decrementActiveStreamsOutbound();
            FileUtils.closeQuietly(open);
        } catch (Throwable th) {
            MessagingService.instance().decrementActiveStreamsOutbound();
            FileUtils.closeQuietly(open);
            throw th;
        }
    }

    private void receiveReply() throws IOException {
        MessagingService.validateMagic(this.input.readInt());
        int readInt = this.input.readInt();
        if (!$assertionsDisabled && MessagingService.getBits(readInt, 3, 1) != 0) {
            throw new AssertionError("Stream received before stream reply");
        }
        int bits = MessagingService.getBits(readInt, 15, 8);
        this.input.readInt();
        String readUTF = this.input.readUTF();
        Header deserialize2 = Header.serializer().deserialize2(this.input, bits);
        byte[] bArr = new byte[this.input.readInt()];
        this.input.readFully(bArr);
        Message message = new Message(deserialize2, bArr, bits);
        if (!$assertionsDisabled && message.getVerb() != StorageService.Verb.STREAM_REPLY) {
            throw new AssertionError("Non-reply message received on stream socket");
        }
        this.handler.doVerb(message, readUTF);
    }

    protected long write(RandomAccessReader randomAccessReader, long j, long j2) throws IOException {
        int min = (int) Math.min(65536L, j - j2);
        randomAccessReader.readFully(this.transferBuffer, 0, min);
        this.compressedoutput.write(this.transferBuffer, 0, min);
        this.throttle.throttleDelta(min);
        return min;
    }

    private void connectAttempt() throws IOException {
        int i = 0;
        while (true) {
            try {
                this.socket = MessagingService.instance().getConnectionPool(this.to).newSocket();
                this.socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
                this.output = this.socket.getOutputStream();
                this.input = new DataInputStream(this.socket.getInputStream());
                return;
            } catch (IOException e) {
                i++;
                if (i >= 4) {
                    throw e;
                }
                long rpcTimeout = DatabaseDescriptor.getRpcTimeout() * ((long) Math.pow(2.0d, i));
                logger.warn("Failed attempt " + i + " to connect to " + this.to + " to stream " + this.header.file + ". Retrying in " + rpcTimeout + " ms. (" + e + DefaultExpressionEngine.DEFAULT_INDEX_END);
                try {
                    Thread.sleep(rpcTimeout);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
    }

    protected void close() throws IOException {
        if (this.output != null) {
            this.output.close();
        }
    }

    public String toString() {
        return String.format("FileStreamTask(session=%s, to=%s)", Long.valueOf(this.header.sessionId), this.to);
    }

    static {
        $assertionsDisabled = !FileStreamTask.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(FileStreamTask.class);
    }
}
