package io.simplesource.kafka.internal.cluster;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.simplesource.api.CommandAPI;
import io.simplesource.data.FutureResult;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.RemoteCommandResponseStore;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.streams.state.HostInfo;

/* loaded from: input_file:io/simplesource/kafka/internal/cluster/ClusterSubsystem.class */
public final class ClusterSubsystem implements RemoteCommandResponseStore {
    private final ClusterConfig config;
    private final Server server;
    private final ClientImpl client;
    private final RequestResponseMapper requestResponseMapper;

    public ClusterSubsystem(CommandAPILoader commandAPILoader, ClusterConfig clusterConfig, ScheduledExecutorService scheduledExecutorService) {
        this.config = clusterConfig;
        EventLoopGroup eventLoopGroup = getEventLoopGroup(clusterConfig.bossThreadCount(), clusterConfig.isNative(), new NamedThreadFactory("SimpleSourcing-Cluster-BossThread"));
        EventLoopGroup eventLoopGroup2 = getEventLoopGroup(clusterConfig.bossThreadCount(), clusterConfig.isNative(), new NamedThreadFactory("SimpleSourcing-Cluster-WorkerThread"));
        this.requestResponseMapper = new RequestResponseMapper(new HostInfo(clusterConfig.iface(), clusterConfig.port()), scheduledExecutorService);
        PipelineInitializer pipelineInitializer = new PipelineInitializer(new MessageHandler((hostInfo, message) -> {
            r4[0].send(hostInfo, message);
        }, commandAPILoader, this.requestResponseMapper));
        this.server = new Server(clusterConfig, eventLoopGroup, eventLoopGroup2, getServerSocketChannel(), pipelineInitializer);
        this.client = new ClientImpl(clusterConfig, eventLoopGroup2, getClientSocketChannel(), pipelineInitializer);
        Client[] clientArr = {this.client};
    }

    @Override // io.simplesource.kafka.api.RemoteCommandResponseStore
    public FutureResult<CommandAPI.CommandError, NonEmptyList<Sequence>> get(HostInfo hostInfo, String str, UUID uuid, Duration duration) {
        MappedCommandRequest newCommandRequest = this.requestResponseMapper.newCommandRequest(hostInfo, str, uuid, duration);
        this.client.send(hostInfo, newCommandRequest.commandRequest());
        return FutureResult.ofCompletableFuture(newCommandRequest.completableFuture());
    }

    private EventLoopGroup getEventLoopGroup(int i, boolean z, ThreadFactory threadFactory) {
        return z ? new EpollEventLoopGroup(i, threadFactory) : new NioEventLoopGroup(i, threadFactory);
    }

    private Class getServerSocketChannel() {
        return this.config.isNative() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
    }

    private Class getClientSocketChannel() {
        return this.config.isNative() ? EpollSocketChannel.class : NioSocketChannel.class;
    }

    public void start() {
        this.server.start();
    }

    public void stop() {
        try {
            this.client.stop();
        } finally {
            this.server.stop();
        }
    }
}
