/*
 * Decompiled with CFR 0.152.
 */
package org.reveno.atp.clustering.core.components;

import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.reveno.atp.clustering.api.ClusterView;
import org.reveno.atp.clustering.api.SyncMode;
import org.reveno.atp.clustering.core.RevenoClusterConfiguration;
import org.reveno.atp.clustering.core.api.StorageTransferServer;
import org.reveno.atp.clustering.util.Utils;
import org.reveno.atp.commons.NamedThreadFactory;
import org.reveno.atp.core.api.storage.JournalsStorage;
import org.reveno.atp.core.api.storage.SnapshotStorage;
import org.reveno.atp.core.storage.FileSystemStorage;
import org.reveno.atp.utils.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStorageTransferServer
implements StorageTransferServer {
    protected RevenoClusterConfiguration config;
    protected FileSystemStorage storage;
    protected ExecutorService mainListener;
    protected ExecutorService executor;
    protected ServerSocketChannel listener;
    protected Long2ObjectMap<JournalsStorage.JournalStore[]> journals = new Long2ObjectOpenHashMap();
    protected Long2ObjectMap<SnapshotStorage.SnapshotStore> snapshots = new Long2ObjectOpenHashMap();
    protected static final Logger LOG = LoggerFactory.getLogger(FileStorageTransferServer.class);

    @Override
    public void startup() {
        this.mainListener.execute(() -> {
            try {
                InetSocketAddress listenAddr = new InetSocketAddress(this.config.revenoDataSync().port());
                try {
                    this.listener = this.listen(listenAddr);
                }
                catch (IOException e) {
                    LOG.error("FSTF: failed to open server socket.", (Throwable)e);
                    return;
                }
                LOG.debug("FSTF: transfer Server is started on {}", (Object)listenAddr);
                while (!Thread.interrupted()) {
                    SocketChannel conn = this.accept(this.listener);
                    this.executor.execute(() -> this.sendStoragesToNode(conn));
                }
                LOG.debug("FSTF: transfer Server stopped on {}", (Object)listenAddr);
            }
            catch (Throwable t) {
                LOG.error("FSTF: file server executor error.", t);
            }
        });
    }

    @Override
    public void shutdown() {
        try {
            if (this.listener != null) {
                this.listener.close();
            }
        }
        catch (IOException e) {
            LOG.error("FSTF: can't close file storage transfer server.", (Throwable)e);
        }
        this.mainListener.shutdown();
        this.executor.shutdown();
    }

    @Override
    public void fixJournals(ClusterView view) {
        this.journals.put(view.viewId(), (Object)this.storage.getAllStores());
        this.snapshots.put(view.viewId(), (Object)this.storage.getLastSnapshotStore());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendStoragesToNode(SocketChannel conn) {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(17);
            if (this.waitForData(conn, buffer)) {
                buffer.rewind();
                long viewId = buffer.getLong();
                if (this.config.revenoDataSync().mode() == SyncMode.SNAPSHOT) {
                    this.transfer(conn, ((SnapshotStorage.SnapshotStore)this.snapshots.get(viewId)).getSnapshotPath());
                } else {
                    byte transferType = buffer.get();
                    if (transferType != 1 && transferType != 2) {
                        throw new IllegalArgumentException(String.format("Unknown transfer type %s", transferType));
                    }
                    long transactionId = buffer.getLong();
                    this.select(transactionId, viewId).stream().map(s -> {
                        if (transferType == 1) {
                            return s.getTransactionCommitsAddress();
                        }
                        return s.getEventsCommitsAddress();
                    }).forEach(a -> this.transfer(conn, (String)a));
                }
            } else {
                LOG.error("Can't receive data from {}", (Object)conn.getRemoteAddress());
            }
        }
        catch (IOException e) {
            LOG.error("FSTF: Failed to accept incoming connection", (Throwable)e);
        }
        finally {
            LOG.info("FSTF: Closing transfer server connection.");
            this.close(conn);
        }
    }

    protected void transfer(SocketChannel conn, String path) {
        try {
            File file = new File(this.storage.getBaseDir(), path);
            LOG.debug("FSTF: transfering {} to {}", (Object)file, (Object)conn.getRemoteAddress());
            FileChannel fc = new FileInputStream(file).getChannel();
            fc.transferTo(0L, fc.size(), conn);
            fc.close();
        }
        catch (Exception e) {
            throw Exceptions.runtime((Throwable)e);
        }
    }

    protected boolean waitForData(SocketChannel conn, ByteBuffer buffer) throws IOException {
        short[] bytesread = new short[]{0};
        return Utils.waitFor(() -> {
            int read = this.readSilent(conn, buffer);
            if (read != -1) {
                bytesread[0] = (short)(bytesread[0] + read);
            }
            LOG.debug("FSTF: received next {} bytes from {}", (Object)read, (Object)conn);
            return bytesread[0] == 17;
        }, this.config.revenoElectionTimeouts().syncTimeoutNanos());
    }

    protected int readSilent(SocketChannel conn, ByteBuffer buffer) {
        try {
            return conn.read(buffer);
        }
        catch (IOException e) {
            throw Exceptions.runtime((Throwable)e);
        }
    }

    protected SocketChannel accept(ServerSocketChannel listener) {
        try {
            SocketChannel conn = listener.accept();
            conn.configureBlocking(true);
            LOG.info("FSTF: Accepted new connection: {}", (Object)conn);
            return conn;
        }
        catch (Exception e) {
            throw Exceptions.runtime((Throwable)e);
        }
    }

    protected void close(SocketChannel channel) {
        try {
            channel.close();
        }
        catch (Exception e) {
            throw Exceptions.runtime((Throwable)e);
        }
    }

    protected ServerSocketChannel listen(InetSocketAddress listenAddr) throws IOException {
        ServerSocketChannel listener = ServerSocketChannel.open();
        ServerSocket ss = listener.socket();
        ss.setReuseAddress(true);
        ss.bind(listenAddr);
        return listener;
    }

    protected Set<JournalsStorage.JournalStore> select(long transactionId, long viewId) {
        JournalsStorage.JournalStore[] stores = (JournalsStorage.JournalStore[])this.journals.get(viewId);
        LinkedHashSet<JournalsStorage.JournalStore> selected = new LinkedHashSet<JournalsStorage.JournalStore>();
        for (int i = 0; i < stores.length; ++i) {
            if (stores[i].getLastTransactionId() > transactionId) {
                if (i - 1 > 0) {
                    selected.add(stores[i - 1]);
                    selected.add(stores[i]);
                    continue;
                }
                selected.add(stores[i]);
                continue;
            }
            if (stores[i].getLastTransactionId() != transactionId) continue;
            selected.add(stores[i]);
        }
        return selected;
    }

    public FileStorageTransferServer(RevenoClusterConfiguration config, FileSystemStorage storage) {
        this.config = config;
        this.storage = storage;
        this.executor = Executors.newFixedThreadPool(config.revenoDataSync().threadPoolSize(), (ThreadFactory)new NamedThreadFactory("stf"));
        this.mainListener = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("stf-main"));
    }
}

