/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.core.partitioned.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallbackProvider;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListenerProvider;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;

public abstract class AbstractPartitionCoordinator
implements PartitionCoordinator,
PartitionCommitCallbackProvider,
PartitionRebalanceListenerProvider {
    protected final Logger log;
    private final ConcurrentMap<EventType, PartitionRebalanceListener> rebalanceListeners = new ConcurrentHashMap<EventType, PartitionRebalanceListener>();
    private final ConcurrentMap<EventTypePartition, PartitionCommitCallback> commitCallbacks = new ConcurrentHashMap<EventTypePartition, PartitionCommitCallback>();

    protected AbstractPartitionCoordinator(Logger log) {
        this.log = log;
    }

    @Override
    public void registerRebalanceListener(EventType eventType, PartitionRebalanceListener listener) {
        this.log.info("Register PartitionRebalanceListener for [{}]", (Object)eventType);
        Preconditions.checkState((null == this.rebalanceListeners.putIfAbsent(eventType, listener) ? 1 : 0) != 0, (String)"PartitionRebalanceListener for [%s] already registered", (Object[])new Object[]{eventType});
    }

    @Override
    public void unregisterRebalanceListener(EventType eventType) {
        this.log.info("Unregister PartitionRebalanceListener for [{}]", (Object)eventType);
        if (null == this.rebalanceListeners.remove(eventType)) {
            this.log.warn("PartitionRebalanceListener for [{}] is already unregistered", (Object)eventType);
        }
    }

    @Override
    public void registerCommitCallback(EventTypePartition eventTypePartition, PartitionCommitCallback callback) {
        PartitionCommitCallback registeredPartitionCommitCallback = this.commitCallbacks.putIfAbsent(eventTypePartition, callback);
        Preconditions.checkState((null == registeredPartitionCommitCallback ? 1 : 0) != 0, (String)"PartitionCommitCallback for [%s] already registered (try to register callback [%s] but found [%s])", (Object[])new Object[]{eventTypePartition, callback, registeredPartitionCommitCallback});
    }

    @Override
    public void unregisterCommitCallback(EventTypePartition eventTypePartition) {
        if (null == this.commitCallbacks.remove(eventTypePartition)) {
            this.log.warn("PartitionCommitCallback for [%s] is already unregistered", (Object)eventTypePartition);
        }
    }

    @Override
    public void finished(EventTypePartition eventTypePartition) {
        this.log.info("Revoke partition on finished [{}]", (Object)eventTypePartition);
        this.revokePartition(eventTypePartition.getEventType(), eventTypePartition.getPartition());
    }

    protected Set<String> getPartitionsToAssign(EventTypePartitions consumerPartitions, Collection<NakadiPartition> nakadiPartitions) {
        Set<String> currentPartitions = this.getPartitions(nakadiPartitions);
        return Sets.difference(currentPartitions, consumerPartitions.getPartitions());
    }

    protected Set<String> getPartitionsToRevoke(EventTypePartitions consumerPartitions, Collection<NakadiPartition> nakadiPartitions) {
        Set<String> currentPartitions = this.getPartitions(nakadiPartitions);
        return Sets.difference(consumerPartitions.getPartitions(), currentPartitions);
    }

    protected Set<String> getPartitions(Collection<NakadiPartition> nakadiPartitions) {
        return nakadiPartitions.stream().map(NakadiPartition::getPartition).collect(Collectors.toSet());
    }

    protected void revokePartition(EventType eventType, String partitionsToRevoke) {
        this.revokePartitions(eventType, Collections.singleton(partitionsToRevoke));
    }

    protected void revokePartitions(EventType eventType, Set<String> partitionsToRevoke) {
        if (!partitionsToRevoke.isEmpty()) {
            PartitionRebalanceListener listener = (PartitionRebalanceListener)this.rebalanceListeners.get(eventType);
            Preconditions.checkState((null != listener ? 1 : 0) != 0, (String)"PartitionRebalanceListener for [%s] is not registered", (Object[])new Object[]{eventType});
            List<EventTypePartition> partitions = partitionsToRevoke.stream().map(entry -> EventTypePartition.of(eventType, entry)).collect(Collectors.toList());
            listener.onPartitionsRevoked(partitions);
        }
    }

    protected void assignPartition(EventType eventType, NakadiPartition nakadiPartition, Function<NakadiPartition, EventTypeCursor> offsetSelector) {
        EventTypeCursor cursor = offsetSelector.apply(nakadiPartition);
        PartitionRebalanceListener listener = (PartitionRebalanceListener)this.rebalanceListeners.get(eventType);
        Preconditions.checkState((null != listener ? 1 : 0) != 0, (String)"PartitionRebalanceListener for [%s] is not registered", (Object[])new Object[]{eventType});
        listener.onPartitionsAssigned(Collections.singleton(cursor));
    }

    protected void assignPartitions(EventType eventType, Set<String> partitionsToAssign, Collection<NakadiPartition> nakadiPartitions, Function<NakadiPartition, EventTypeCursor> offsetSelector) {
        if (!partitionsToAssign.isEmpty()) {
            List<EventTypeCursor> cursors = nakadiPartitions.stream().filter(entry -> partitionsToAssign.contains(entry.getPartition())).map(offsetSelector).collect(Collectors.toList());
            PartitionRebalanceListener listener = (PartitionRebalanceListener)this.rebalanceListeners.get(eventType);
            Preconditions.checkState((null != listener ? 1 : 0) != 0, (String)"PartitionRebalanceListener for [%s] is not registered", (Object[])new Object[]{eventType});
            listener.onPartitionsAssigned(cursors);
        } else {
            PartitionRebalanceListener listener = (PartitionRebalanceListener)this.rebalanceListeners.get(eventType);
            if (null != listener) {
                listener.onPartitionsHealthCheck();
            }
        }
    }

    @Override
    public PartitionCommitCallback getPartitionCommitCallback(EventTypePartition partition) {
        return (PartitionCommitCallback)this.commitCallbacks.get(partition);
    }

    @Override
    public PartitionRebalanceListener getPartitionRebalanceListener(EventType eventType) {
        return (PartitionRebalanceListener)this.rebalanceListeners.get(eventType);
    }
}

