/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import io.atomix.catalyst.concurrent.CatalystThreadFactory;
import io.atomix.catalyst.concurrent.SingleThreadContext;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.Server;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.impl.CopycatTransport;
import org.onosproject.store.primitives.impl.CopycatTransportConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopycatTransportServer
implements Server {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final AtomicBoolean listening = new AtomicBoolean(false);
    private CompletableFuture<Void> listenFuture = new CompletableFuture();
    private final ScheduledExecutorService executorService;
    private final PartitionId partitionId;
    private final MessagingService messagingService;
    private final String messageSubject;
    private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();

    CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
        this.partitionId = (PartitionId)Preconditions.checkNotNull((Object)partitionId);
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService);
        this.messageSubject = String.format("onos-copycat-%s", partitionId);
        this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()), (ThreadFactory)new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
    }

    public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
        if (this.listening.compareAndSet(false, true)) {
            ThreadContext context = ThreadContext.currentContextOrThrow();
            this.listen(address, listener, context);
        }
        return this.listenFuture;
    }

    private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
        this.messagingService.registerHandler(this.messageSubject, (sender, payload) -> {
            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream((byte[])payload));){
                long connectionId = input.readLong();
                AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
                CopycatTransportConnection connection = this.connections.computeIfAbsent(connectionId, k -> {
                    newConnectionCreated.set(true);
                    CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId, CopycatTransport.Mode.SERVER, this.partitionId, CopycatTransport.toAddress(sender), this.messagingService, this.getOrCreateContext(context));
                    this.log.debug("Created new incoming connection {}", (Object)connectionId);
                    newConnection.closeListener(c -> this.connections.remove(connectionId, c));
                    return newConnection;
                });
                byte[] request = IOUtils.toByteArray((InputStream)input);
                CompletionStage completionStage = CompletableFuture.supplyAsync(() -> {
                    if (newConnectionCreated.get()) {
                        listener.accept(connection);
                    }
                    return connection;
                }, context.executor()).thenCompose(c -> c.handle(request));
                return completionStage;
            }
            catch (IOException e) {
                return Tools.exceptionalFuture((Throwable)e);
            }
        });
        context.execute(() -> this.listenFuture.complete(null));
    }

    public CompletableFuture<Void> close() {
        this.messagingService.unregisterHandler(this.messageSubject);
        this.executorService.shutdown();
        return CompletableFuture.completedFuture(null);
    }

    private ThreadContext getOrCreateContext(ThreadContext parentContext) {
        ThreadContext context = ThreadContext.currentContext();
        if (context != null) {
            return context;
        }
        return new SingleThreadContext(this.executorService, parentContext.serializer().clone());
    }
}

