/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.raftlog.segmented;

import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.server.raftlog.segmented.LogSegment;
import org.apache.ratis.server.raftlog.segmented.LogSegmentStartEnd;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.DataBlockingQueue;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SegmentedRaftLogWorker {
    static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
    static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1L, TimeUnit.SECONDS);
    private final Consumer<Object> infoIndexChange = s2 -> LOG.info("{}: {}", (Object)this, s2);
    private final Consumer<Object> traceIndexChange = s2 -> LOG.trace("{}: {}", (Object)this, s2);
    private final String name;
    private final DataBlockingQueue<SegmentedRaftLog.Task> queue;
    private final WriteLogTasks writeTasks = new WriteLogTasks();
    private volatile boolean running = true;
    private final ExecutorService workerThreadExecutor;
    private final RaftStorage storage;
    private volatile SegmentedRaftLogOutputStream out;
    private final Runnable submitUpdateCommitEvent;
    private final StateMachine stateMachine;
    private final Timer logFlushTimer;
    private final Timer raftLogSyncTimer;
    private final Timer raftLogQueueingTimer;
    private final Timer raftLogEnqueueingDelayTimer;
    private final SegmentedRaftLogMetrics raftLogMetrics;
    private final ByteBuffer writeBuffer;
    private final AtomicReference<byte[]> sharedBuffer;
    private int pendingFlushNum = 0;
    private long lastWrittenIndex;
    private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0L);
    private final RaftLogIndex safeCacheEvictIndex = new RaftLogIndex("safeCacheEvictIndex", 0L);
    private final int forceSyncNum;
    private final long segmentMaxSize;
    private final long preallocatedSize;
    private final RaftServer.Division server;
    private int flushBatchSize;
    private final boolean asyncFlush;
    private final boolean unsafeFlush;
    private final ExecutorService flushExecutor;
    private final StateMachineDataPolicy stateMachineDataPolicy;

    SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent, RaftServer.Division server, RaftStorage storage, RaftProperties properties, SegmentedRaftLogMetrics metricRegistry) {
        this.name = memberId + "-" + JavaUtils.getClassSimpleName(this.getClass());
        LOG.info("new {} for {}", (Object)this.name, (Object)storage);
        this.submitUpdateCommitEvent = submitUpdateCommitEvent;
        this.stateMachine = stateMachine;
        this.raftLogMetrics = metricRegistry;
        this.storage = storage;
        this.server = server;
        SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
        int queueElementLimit = RaftServerConfigKeys.Log.queueElementLimit(properties);
        this.queue = new DataBlockingQueue<SegmentedRaftLog.Task>(this.name, queueByteLimit, queueElementLimit, SegmentedRaftLog.Task::getSerializedSize);
        this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
        this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
        this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
        this.flushBatchSize = 0;
        this.stateMachineDataPolicy = new StateMachineDataPolicy(properties, metricRegistry);
        this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor(this.name);
        metricRegistry.addDataQueueSizeGauge(this.queue);
        metricRegistry.addLogWorkerQueueSizeGauge(this.writeTasks.q);
        metricRegistry.addFlushBatchSizeGauge(() -> () -> this.flushBatchSize);
        this.logFlushTimer = metricRegistry.getFlushTimer();
        this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
        this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
        this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
        int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
        this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
        int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
        this.sharedBuffer = new AtomicReference<byte[]>(new byte[logEntryLimit + 8]);
        this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
        this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
        if (this.asyncFlush && this.unsafeFlush) {
            throw new IllegalStateException("Cannot enable both raft.server.log.unsafe-flush.enabled and raft.server.log.async-flush.enabled");
        }
        this.flushExecutor = !this.asyncFlush && !this.unsafeFlush ? null : ConcurrentUtils.newSingleThreadExecutor(this.name + "-flush");
    }

    void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
        LOG.trace("{} start(latestIndex={}, openSegmentFile={})", this.name, latestIndex, openSegmentFile);
        this.lastWrittenIndex = latestIndex;
        this.flushIndex.setUnconditionally(latestIndex, this.infoIndexChange);
        this.safeCacheEvictIndex.setUnconditionally(evictIndex, this.infoIndexChange);
        if (openSegmentFile != null) {
            Preconditions.assertTrue(openSegmentFile.exists());
            this.allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
        }
        this.workerThreadExecutor.submit(this::run);
    }

    void close() {
        this.running = false;
        this.sharedBuffer.set(null);
        Optional.ofNullable(this.flushExecutor).ifPresent(ExecutorService::shutdown);
        ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3.0), this.workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, (Object)this.name));
        IOUtils.cleanup(LOG, this.out);
        LOG.info("{} close()", (Object)this.name);
    }

    void syncWithSnapshot(long lastSnapshotIndex) {
        this.queue.clear();
        this.lastWrittenIndex = lastSnapshotIndex;
        this.flushIndex.setUnconditionally(lastSnapshotIndex, this.infoIndexChange);
        this.safeCacheEvictIndex.setUnconditionally(lastSnapshotIndex, this.infoIndexChange);
        this.pendingFlushNum = 0;
    }

    public String toString() {
        return this.name;
    }

    private SegmentedRaftLog.Task addIOTask(SegmentedRaftLog.Task task) {
        LOG.debug("{} adds IO task {}", (Object)this.name, (Object)task);
        try {
            Timer.Context enqueueTimerContext = this.raftLogEnqueueingDelayTimer.time();
            while (!this.queue.offer(task, ONE_SECOND)) {
                Preconditions.assertTrue(this.isAlive(), "the worker thread is not alive");
            }
            enqueueTimerContext.stop();
            task.startTimerOnEnqueue(this.raftLogQueueingTimer);
        }
        catch (Exception e) {
            if (e instanceof InterruptedException && !this.running) {
                LOG.info("Got InterruptedException when adding task " + task + ". The SegmentedRaftLogWorker already stopped.");
            }
            LOG.error("Failed to add IO task {}", (Object)task, (Object)e);
            Optional.ofNullable(this.server).ifPresent(RaftServer.Division::close);
        }
        return task;
    }

    boolean isAlive() {
        return this.running && !this.workerThreadExecutor.isTerminated();
    }

    private void run() {
        RaftLogIOException logIOException = null;
        while (this.running) {
            try {
                SegmentedRaftLog.Task task = this.queue.poll(ONE_SECOND);
                if (task == null) continue;
                task.stopTimerOnDequeue();
                try {
                    if (logIOException != null) {
                        throw logIOException;
                    }
                    Timer.Context executionTimeContext = this.raftLogMetrics.getRaftLogTaskExecutionTimer(JavaUtils.getClassSimpleName(task.getClass()).toLowerCase()).time();
                    task.execute();
                    executionTimeContext.stop();
                }
                catch (IOException e) {
                    if (task.getEndIndex() < this.lastWrittenIndex) {
                        LOG.info("Ignore IOException when handling task " + task + " which is smaller than the lastWrittenIndex. There should be a snapshot installed.", e);
                    }
                    task.failed(e);
                    if (logIOException != null) continue;
                    logIOException = new RaftLogIOException("Log already failed at index " + task.getEndIndex() + " for task " + task, e);
                    continue;
                }
                task.done();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.running) {
                    LOG.warn("{} got interrupted while still running", (Object)Thread.currentThread().getName());
                }
                LOG.info(Thread.currentThread().getName() + " was interrupted, exiting. There are " + this.queue.getNumElements() + " tasks remaining in the queue.");
                return;
            }
            catch (Exception e) {
                if (!this.running) {
                    LOG.info("{} got closed and hit exception", (Object)Thread.currentThread().getName(), (Object)e);
                    continue;
                }
                LOG.error("{} hit exception", (Object)Thread.currentThread().getName(), (Object)e);
                Optional.ofNullable(this.server).ifPresent(RaftServer.Division::close);
            }
        }
    }

    private boolean shouldFlush() {
        if (this.out == null) {
            return false;
        }
        if (this.pendingFlushNum >= this.forceSyncNum) {
            return true;
        }
        return this.pendingFlushNum > 0 && this.queue.isEmpty();
    }

    private void flushIfNecessary() throws IOException {
        if (this.shouldFlush()) {
            this.raftLogMetrics.onRaftLogFlush();
            LOG.debug("{}: flush {}", (Object)this.name, (Object)this.out);
            Timer.Context timerContext = this.logFlushTimer.time();
            try {
                CompletableFuture<Object> f;
                CompletableFuture<Object> completableFuture = f = this.stateMachine != null ? this.stateMachine.data().flush(this.lastWrittenIndex) : CompletableFuture.completedFuture(null);
                if (this.stateMachineDataPolicy.isSync()) {
                    this.stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
                }
                this.flushBatchSize = (int)(this.lastWrittenIndex - this.flushIndex.get());
                if (this.unsafeFlush) {
                    this.unsafeFlushOutStream();
                    this.updateFlushedIndexIncreasingly();
                } else if (this.asyncFlush) {
                    this.asyncFlushOutStream(f);
                } else {
                    this.flushOutStream();
                    if (!this.stateMachineDataPolicy.isSync()) {
                        IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
                    }
                    this.updateFlushedIndexIncreasingly();
                }
            }
            finally {
                timerContext.stop();
            }
        }
    }

    private void unsafeFlushOutStream() throws IOException {
        Timer.Context logSyncTimerContext = this.raftLogSyncTimer.time();
        this.out.asyncFlush(this.flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
    }

    private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) throws IOException {
        Timer.Context logSyncTimerContext = this.raftLogSyncTimer.time();
        ((CompletableFuture)this.out.asyncFlush(this.flushExecutor).thenCombine(stateMachineFlush, (async, sm) -> async)).whenComplete((v, e) -> {
            this.updateFlushedIndexIncreasingly(this.lastWrittenIndex);
            logSyncTimerContext.stop();
        });
    }

    private void flushOutStream() throws IOException {
        Timer.Context logSyncTimerContext = this.raftLogSyncTimer.time();
        try {
            this.out.flush();
        }
        finally {
            logSyncTimerContext.stop();
        }
    }

    private void updateFlushedIndexIncreasingly() {
        this.updateFlushedIndexIncreasingly(this.lastWrittenIndex);
    }

    private void updateFlushedIndexIncreasingly(long index) {
        long i = index;
        this.flushIndex.updateIncreasingly(i, this.traceIndexChange);
        this.postUpdateFlushedIndex(Math.toIntExact(this.lastWrittenIndex - index));
        this.writeTasks.updateIndex(i);
    }

    private void postUpdateFlushedIndex(int count) {
        this.pendingFlushNum = count;
        Optional.ofNullable(this.submitUpdateCommitEvent).ifPresent(Runnable::run);
    }

    void startLogSegment(long startIndex) {
        LOG.info("{}: Starting segment from index:{}", (Object)this.name, (Object)startIndex);
        this.addIOTask(new StartLogSegment(startIndex));
    }

    void rollLogSegment(LogSegment segmentToClose) {
        LOG.info("{}: Rolling segment {} to index:{}", this.name, segmentToClose.toString(), segmentToClose.getEndIndex());
        this.addIOTask(new FinalizeLogSegment(segmentToClose));
        this.addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1L));
    }

    SegmentedRaftLog.Task writeLogEntry(RaftProtos.LogEntryProto entry) {
        return this.addIOTask(new WriteLog(entry));
    }

    SegmentedRaftLog.Task truncate(SegmentedRaftLogCache.TruncationSegments ts, long index) {
        LOG.info("{}: Truncating segments {}, start index {}", this.name, ts, index);
        return this.addIOTask(new TruncateLog(ts, index));
    }

    void closeLogSegment(LogSegment segmentToClose) {
        LOG.info("{}: Closing segment {} to index: {}", this.name, segmentToClose.toString(), segmentToClose.getEndIndex());
        this.addIOTask(new FinalizeLogSegment(segmentToClose));
    }

    SegmentedRaftLog.Task purge(SegmentedRaftLogCache.TruncationSegments ts) {
        return this.addIOTask(new PurgeLog(ts));
    }

    File getFile(long startIndex, Long endIndex) {
        return LogSegmentStartEnd.valueOf(startIndex, endIndex).getFile(this.storage);
    }

    long getFlushIndex() {
        return this.flushIndex.get();
    }

    long getSafeCacheEvictIndex() {
        return this.safeCacheEvictIndex.get();
    }

    private void freeSegmentedRaftLogOutputStream() {
        IOUtils.cleanup(LOG, this.out);
        this.out = null;
        Preconditions.assertTrue(this.writeBuffer.position() == 0);
    }

    private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
        Preconditions.assertTrue(this.out == null && this.writeBuffer.position() == 0);
        this.out = new SegmentedRaftLogOutputStream(file, append, this.segmentMaxSize, this.preallocatedSize, this.writeBuffer, this.sharedBuffer::get);
    }

    private class TruncateLog
    extends SegmentedRaftLog.Task {
        private final SegmentedRaftLogCache.TruncationSegments segments;
        private CompletableFuture<Void> stateMachineFuture = null;

        TruncateLog(SegmentedRaftLogCache.TruncationSegments ts, long index) {
            this.segments = ts;
            if (SegmentedRaftLogWorker.this.stateMachine != null) {
                this.stateMachineFuture = SegmentedRaftLogWorker.this.stateMachine.data().truncate(index);
            }
        }

        @Override
        void execute() throws IOException {
            SegmentedRaftLogWorker.this.freeSegmentedRaftLogOutputStream();
            if (this.segments.getToTruncate() != null) {
                File fileToTruncate = this.segments.getToTruncate().getFile(SegmentedRaftLogWorker.this.storage);
                Preconditions.assertTrue(fileToTruncate.exists(), "File %s to be truncated does not exist", fileToTruncate);
                FileUtils.truncateFile(fileToTruncate, this.segments.getToTruncate().getTargetLength());
                File dstFile = this.segments.getToTruncate().getNewFile(SegmentedRaftLogWorker.this.storage);
                Preconditions.assertTrue(!dstFile.exists(), "Truncated file %s already exists ", dstFile);
                FileUtils.move(fileToTruncate, dstFile);
                LOG.info("{}: Truncated log file {} to length {} and moved it to {}", SegmentedRaftLogWorker.this.name, fileToTruncate, this.segments.getToTruncate().getTargetLength(), dstFile);
                SegmentedRaftLogWorker.this.lastWrittenIndex = this.segments.getToTruncate().getNewEndIndex();
            }
            if (this.segments.getToDelete() != null && this.segments.getToDelete().length > 0) {
                long minStart = this.segments.getToDelete()[0].getStartIndex();
                for (SegmentedRaftLogCache.SegmentFileInfo del : this.segments.getToDelete()) {
                    File delFile = del.getFile(SegmentedRaftLogWorker.this.storage);
                    Preconditions.assertTrue(delFile.exists(), "File %s to be deleted does not exist", delFile);
                    FileUtils.deleteFile(delFile);
                    LOG.info("{}: Deleted log file {}", (Object)SegmentedRaftLogWorker.this.name, (Object)delFile);
                    minStart = Math.min(minStart, del.getStartIndex());
                }
                if (this.segments.getToTruncate() == null) {
                    SegmentedRaftLogWorker.this.lastWrittenIndex = minStart - 1L;
                }
            }
            if (this.stateMachineFuture != null) {
                IOUtils.getFromFuture(this.stateMachineFuture, () -> this + "-truncateStateMachineData");
            }
            SegmentedRaftLogWorker.this.flushIndex.setUnconditionally(SegmentedRaftLogWorker.this.lastWrittenIndex, SegmentedRaftLogWorker.this.infoIndexChange);
            SegmentedRaftLogWorker.this.safeCacheEvictIndex.setUnconditionally(SegmentedRaftLogWorker.this.lastWrittenIndex, SegmentedRaftLogWorker.this.infoIndexChange);
            SegmentedRaftLogWorker.this.postUpdateFlushedIndex(0);
        }

        @Override
        long getEndIndex() {
            if (this.segments.getToTruncate() != null) {
                return this.segments.getToTruncate().getNewEndIndex();
            }
            if (this.segments.getToDelete().length > 0) {
                return this.segments.getToDelete()[this.segments.getToDelete().length - 1].getEndIndex();
            }
            return -1L;
        }

        @Override
        public String toString() {
            return super.toString() + ": " + this.segments;
        }
    }

    private class StartLogSegment
    extends SegmentedRaftLog.Task {
        private final long newStartIndex;

        StartLogSegment(long newStartIndex) {
            this.newStartIndex = newStartIndex;
        }

        @Override
        void execute() throws IOException {
            File openFile = SegmentedRaftLogWorker.this.getFile(this.newStartIndex, null);
            Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s", openFile, SegmentedRaftLogWorker.this.name);
            Preconditions.assertTrue(SegmentedRaftLogWorker.this.pendingFlushNum == 0);
            SegmentedRaftLogWorker.this.allocateSegmentedRaftLogOutputStream(openFile, false);
            Preconditions.assertTrue(openFile.exists(), "Failed to create file %s for %s", openFile.getAbsolutePath(), SegmentedRaftLogWorker.this.name);
            LOG.info("{}: created new log segment {}", (Object)SegmentedRaftLogWorker.this.name, (Object)openFile);
        }

        @Override
        long getEndIndex() {
            return this.newStartIndex;
        }
    }

    private class FinalizeLogSegment
    extends SegmentedRaftLog.Task {
        private final long startIndex;
        private final long endIndex;

        FinalizeLogSegment(LogSegment segmentToClose) {
            Preconditions.assertTrue(segmentToClose != null, "Log segment to be rolled is null");
            this.startIndex = segmentToClose.getStartIndex();
            this.endIndex = segmentToClose.getEndIndex();
        }

        @Override
        public void execute() throws IOException {
            SegmentedRaftLogWorker.this.freeSegmentedRaftLogOutputStream();
            File openFile = SegmentedRaftLogWorker.this.getFile(this.startIndex, null);
            Preconditions.assertTrue(openFile.exists(), () -> SegmentedRaftLogWorker.this.name + ": File " + openFile + " to be rolled does not exist");
            if (this.endIndex - this.startIndex + 1L > 0L) {
                File dstFile = SegmentedRaftLogWorker.this.getFile(this.startIndex, this.endIndex);
                Preconditions.assertTrue(!dstFile.exists());
                FileUtils.move(openFile, dstFile);
                LOG.info("{}: Rolled log segment from {} to {}", SegmentedRaftLogWorker.this.name, openFile, dstFile);
            } else {
                FileUtils.deleteFile(openFile);
                LOG.info("{}: Deleted empty log segment {}", (Object)SegmentedRaftLogWorker.this.name, (Object)openFile);
            }
            SegmentedRaftLogWorker.this.updateFlushedIndexIncreasingly();
            SegmentedRaftLogWorker.this.safeCacheEvictIndex.updateToMax(this.endIndex, SegmentedRaftLogWorker.this.traceIndexChange);
        }

        @Override
        void failed(IOException e) {
            SegmentedRaftLogWorker.this.stateMachine.event().notifyLogFailed(e, null);
            super.failed(e);
        }

        @Override
        long getEndIndex() {
            return this.endIndex;
        }

        @Override
        public String toString() {
            return super.toString() + ": startIndex=" + this.startIndex + " endIndex=" + this.endIndex;
        }
    }

    private class WriteLog
    extends SegmentedRaftLog.Task {
        private final RaftProtos.LogEntryProto entry;
        private final CompletableFuture<?> stateMachineFuture;
        private final CompletableFuture<Long> combined;

        WriteLog(RaftProtos.LogEntryProto entry) {
            this.entry = LogProtoUtils.removeStateMachineData(entry);
            if (this.entry == entry) {
                RaftProtos.StateMachineLogEntryProto proto;
                RaftProtos.StateMachineLogEntryProto stateMachineLogEntryProto = proto = entry.hasStateMachineLogEntry() ? entry.getStateMachineLogEntry() : null;
                if (SegmentedRaftLogWorker.this.stateMachine != null && proto != null && proto.getType() == RaftProtos.StateMachineLogEntryProto.Type.DATASTREAM) {
                    ClientInvocationId invocationId = ClientInvocationId.valueOf(proto);
                    CompletableFuture<StateMachine.DataStream> removed = SegmentedRaftLogWorker.this.server.getDataStreamMap().remove(invocationId);
                    this.stateMachineFuture = removed == null ? SegmentedRaftLogWorker.this.stateMachine.data().link(null, entry) : removed.thenApply(stream -> SegmentedRaftLogWorker.this.stateMachine.data().link((StateMachine.DataStream)stream, entry));
                } else {
                    this.stateMachineFuture = null;
                }
            } else {
                try {
                    this.stateMachineFuture = SegmentedRaftLogWorker.this.stateMachine.data().write(entry);
                }
                catch (Exception e) {
                    LOG.error(SegmentedRaftLogWorker.this.name + ": writeStateMachineData failed for index " + entry.getIndex() + ", entry=" + LogProtoUtils.toLogEntryString(entry, SegmentedRaftLogWorker.this.stateMachine::toStateMachineLogEntryString), e);
                    throw e;
                }
            }
            this.combined = this.stateMachineFuture == null ? super.getFuture() : super.getFuture().thenCombine(this.stateMachineFuture, (index, stateMachineResult) -> index);
        }

        @Override
        void failed(IOException e) {
            SegmentedRaftLogWorker.this.stateMachine.event().notifyLogFailed(e, this.entry);
            super.failed(e);
        }

        @Override
        int getSerializedSize() {
            return LogProtoUtils.getSerializedSize(this.entry);
        }

        @Override
        CompletableFuture<Long> getFuture() {
            return this.combined;
        }

        @Override
        void done() {
            SegmentedRaftLogWorker.this.writeTasks.offerOrCompleteFuture(this);
        }

        @Override
        public void execute() throws IOException {
            if (SegmentedRaftLogWorker.this.stateMachineDataPolicy.isSync() && this.stateMachineFuture != null) {
                SegmentedRaftLogWorker.this.stateMachineDataPolicy.getFromFuture(this.stateMachineFuture, () -> this + "-writeStateMachineData");
            }
            SegmentedRaftLogWorker.this.raftLogMetrics.onRaftLogAppendEntry();
            Preconditions.assertTrue(SegmentedRaftLogWorker.this.out != null);
            Preconditions.assertTrue(SegmentedRaftLogWorker.this.lastWrittenIndex + 1L == this.entry.getIndex(), "lastWrittenIndex == %s, entry == %s", SegmentedRaftLogWorker.this.lastWrittenIndex, this.entry);
            SegmentedRaftLogWorker.this.out.write(this.entry);
            SegmentedRaftLogWorker.this.lastWrittenIndex = this.entry.getIndex();
            SegmentedRaftLogWorker.this.pendingFlushNum++;
            SegmentedRaftLogWorker.this.flushIfNecessary();
        }

        @Override
        long getEndIndex() {
            return this.entry.getIndex();
        }

        @Override
        public String toString() {
            return super.toString() + ": " + LogProtoUtils.toLogEntryString(this.entry, SegmentedRaftLogWorker.this.stateMachine == null ? null : SegmentedRaftLogWorker.this.stateMachine::toStateMachineLogEntryString);
        }
    }

    private final class PurgeLog
    extends SegmentedRaftLog.Task {
        private final SegmentedRaftLogCache.TruncationSegments segments;

        private PurgeLog(SegmentedRaftLogCache.TruncationSegments segments) {
            this.segments = segments;
        }

        @Override
        void execute() throws IOException {
            if (this.segments.getToDelete() != null) {
                Timer.Context purgeLogContext = SegmentedRaftLogWorker.this.raftLogMetrics.getRaftLogPurgeTimer().time();
                for (SegmentedRaftLogCache.SegmentFileInfo fileInfo : this.segments.getToDelete()) {
                    FileUtils.deleteFile(fileInfo.getFile(SegmentedRaftLogWorker.this.storage));
                }
                purgeLogContext.stop();
            }
        }

        @Override
        long getEndIndex() {
            return this.segments.maxEndIndex();
        }
    }

    static class WriteLogTasks {
        private final Queue<WriteLog> q = new LinkedList<WriteLog>();
        private volatile long index;

        WriteLogTasks() {
        }

        void offerOrCompleteFuture(WriteLog writeLog) {
            if (writeLog.getEndIndex() <= this.index || !this.offer(writeLog)) {
                writeLog.completeFuture();
            }
        }

        private synchronized boolean offer(WriteLog writeLog) {
            if (writeLog.getEndIndex() <= this.index) {
                return false;
            }
            this.q.offer(writeLog);
            return true;
        }

        synchronized void updateIndex(long i) {
            this.index = i;
            SegmentedRaftLog.Task peeked;
            while ((peeked = (SegmentedRaftLog.Task)this.q.peek()) != null && peeked.getEndIndex() <= this.index) {
                SegmentedRaftLog.Task polled = this.q.poll();
                Preconditions.assertTrue(polled == peeked);
                polled.completeFuture();
            }
            return;
        }
    }

    static class StateMachineDataPolicy {
        private final boolean sync;
        private final TimeDuration syncTimeout;
        private final int syncTimeoutRetry;
        private final SegmentedRaftLogMetrics metrics;

        StateMachineDataPolicy(RaftProperties properties, SegmentedRaftLogMetrics metricRegistry) {
            this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties);
            this.syncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
            this.syncTimeoutRetry = RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties);
            this.metrics = metricRegistry;
            Preconditions.assertTrue(this.syncTimeoutRetry >= -1);
        }

        boolean isSync() {
            return this.sync;
        }

        void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) throws IOException {
            Preconditions.assertTrue(this.isSync());
            TimeoutIOException lastException = null;
            for (int retry = 0; this.syncTimeoutRetry == -1 || retry <= this.syncTimeoutRetry; ++retry) {
                try {
                    IOUtils.getFromFuture(future, getName, this.syncTimeout);
                    return;
                }
                catch (TimeoutIOException e) {
                    LOG.warn("Timeout " + retry + (this.syncTimeoutRetry == -1 ? "/~" : "/" + this.syncTimeoutRetry), e);
                    lastException = e;
                    this.metrics.onStateMachineDataWriteTimeout();
                    continue;
                }
            }
            Objects.requireNonNull(lastException, "lastException == null");
            throw lastException;
        }
    }
}

