/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.copycat.client.session;

import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.catalyst.util.concurrent.Futures;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.copycat.client.session.ClientSessionState;
import io.atomix.copycat.error.UnknownSessionException;
import io.atomix.copycat.protocol.PublishRequest;
import io.atomix.copycat.protocol.PublishResponse;
import io.atomix.copycat.protocol.Response;
import io.atomix.copycat.session.Event;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

final class ClientSessionListener {
    private final ClientSessionState state;
    private final ThreadContext context;
    private final Map<String, Listeners<Object>> eventListeners = new ConcurrentHashMap<String, Listeners<Object>>();

    public ClientSessionListener(Connection connection, ClientSessionState state, ThreadContext context) {
        this.state = Assert.notNull(state, "state");
        this.context = Assert.notNull(context, "context");
        connection.handler(PublishRequest.class, this::handlePublish);
    }

    public Listener<Void> onEvent(String event, Runnable callback) {
        return this.onEvent(event, (T v) -> callback.run());
    }

    public <T> Listener<T> onEvent(String event, Consumer listener) {
        return this.eventListeners.computeIfAbsent(Assert.notNull(event, "event"), e -> new Listeners()).add(Assert.notNull(listener, "listener"));
    }

    private CompletableFuture<PublishResponse> handlePublish(PublishRequest request) {
        this.state.getLogger().debug("{} - Received {}", (Object)this.state.getSessionId(), (Object)request);
        if (request.session() != this.state.getSessionId()) {
            this.state.getLogger().debug("{} - Inconsistent session ID: {}", (Object)this.state.getSessionId(), (Object)request.session());
            return Futures.exceptionalFuture(new UnknownSessionException("incorrect session ID", new Object[0]));
        }
        if (request.previousIndex() != this.state.getEventIndex()) {
            this.state.getLogger().debug("{} - Inconsistent event index: {}", (Object)this.state.getSessionId(), (Object)request.previousIndex());
            return CompletableFuture.completedFuture(((PublishResponse.Builder)PublishResponse.builder().withStatus(Response.Status.ERROR)).withIndex(this.state.getEventIndex()).build());
        }
        this.state.setEventIndex(request.eventIndex());
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(request.events().size());
        for (Event<?> event : request.events()) {
            Listeners<Object> listeners = this.eventListeners.get(event.name());
            if (listeners == null) continue;
            futures.add(listeners.accept(event.message()));
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).handleAsync((result, error) -> {
            this.state.setCompleteIndex(request.eventIndex());
            return ((PublishResponse.Builder)PublishResponse.builder().withStatus(Response.Status.OK)).withIndex(this.state.getCompleteIndex()).build();
        }, this.context.executor());
    }

    public CompletableFuture<Void> close() {
        return CompletableFuture.completedFuture(null);
    }
}

