/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.event.internal;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.EventListener;
import net.kuujo.copycat.event.EventLog;
import net.kuujo.copycat.event.EventLogConfig;
import net.kuujo.copycat.log.LogSegment;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;

public class DefaultEventLog<T>
extends AbstractResource<EventLog<T>>
implements EventLog<T> {
    private EventListener<T> consumer;
    private ScheduledFuture<?> retentionFuture;
    private Long commitIndex;

    public DefaultEventLog(ResourceManager context) {
        super(context);
        context.consumer(this::consume);
    }

    @Override
    public EventLog<T> consumer(EventListener<T> consumer) {
        this.consumer = consumer;
        return this;
    }

    @Override
    public CompletableFuture<T> get(long index) {
        CompletableFuture future = new CompletableFuture();
        this.context.execute(() -> {
            if (!this.context.log().containsIndex(index)) {
                this.executor.execute(() -> future.completeExceptionally(new IndexOutOfBoundsException(String.format("Log index %d out of bounds", index))));
            } else {
                ByteBuffer buffer = this.context.log().getEntry(index);
                if (buffer != null) {
                    Object entry = this.serializer.readObject(buffer);
                    this.executor.execute(() -> future.complete(entry));
                } else {
                    this.executor.execute(() -> future.complete(null));
                }
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Long> commit(T entry) {
        return this.context.commit(this.serializer.writeObject(entry)).thenApplyAsync(ByteBuffer::getLong, this.executor);
    }

    private ByteBuffer consume(long term, Long index, ByteBuffer entry) {
        ByteBuffer result = ByteBuffer.allocateDirect(8);
        result.putLong(index);
        if (this.consumer != null) {
            Object value = this.serializer.readObject(entry);
            this.executor.execute(() -> this.consumer.accept(value));
        }
        this.commitIndex = index;
        result.flip();
        return result;
    }

    private synchronized void compact() {
        if (this.commitIndex != null) {
            Iterator<Map.Entry<Long, LogSegment>> iterator = this.context.log().segments().entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, LogSegment> entry = iterator.next();
                LogSegment segment = entry.getValue();
                if (this.context.log().lastSegment() == segment || segment.lastIndex() == null || segment.lastIndex() > this.commitIndex || ((EventLogConfig)this.context.config().getResourceConfig()).getRetentionPolicy().retain(entry.getValue())) continue;
                iterator.remove();
                try {
                    segment.close();
                    segment.delete();
                }
                catch (IOException e) {}
            }
        }
    }

    @Override
    public synchronized CompletableFuture<EventLog<T>> open() {
        return ((CompletableFuture)((CompletableFuture)this.runStartupTasks().thenComposeAsync(v -> this.context.open(), this.executor)).thenRun(() -> {
            this.retentionFuture = this.context.scheduleWithFixedDelay(this::compact, 0L, ((EventLogConfig)this.context.config().getResourceConfig()).getRetentionCheckInterval(), TimeUnit.MILLISECONDS);
        })).thenApply(v -> this);
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        if (this.retentionFuture != null) {
            this.retentionFuture.cancel(false);
        }
        return this.context.close().thenCompose(v -> this.runShutdownTasks());
    }
}

