/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload;

import java.util.List;
import java.util.Optional;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignment;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentAware;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentPathSerializer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentView;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges;

public class HierarchicalConsumerAssignmentRegistry
implements ConsumerAssignmentRegistry,
SubscriptionAssignmentAware {
    private static final Logger logger = LoggerFactory.getLogger(HierarchicalConsumerAssignmentRegistry.class);
    public static final byte[] AUTO_ASSIGNED_MARKER = "AUTO_ASSIGNED".getBytes();
    private final CuratorFramework curator;
    private final SubscriptionAssignmentPathSerializer pathSerializer;
    private final ConsumerAssignmentCache assignmentCache;
    private final CreateMode assignmentNodeCreationMode;

    public HierarchicalConsumerAssignmentRegistry(CuratorFramework curator, ConsumerAssignmentCache assignmentCache, SubscriptionAssignmentPathSerializer pathSerializer, CreateMode assignmentNodeCreationMode) {
        this.curator = curator;
        this.pathSerializer = pathSerializer;
        this.assignmentCache = assignmentCache;
        this.assignmentNodeCreationMode = assignmentNodeCreationMode;
        this.assignmentCache.registerAssignmentCallback(this);
    }

    @Override
    public void onSubscriptionAssigned(SubscriptionName subscriptionName) {
    }

    @Override
    public void onAssignmentRemoved(SubscriptionName subscriptionName) {
        this.removeSubscriptionEntryIfEmpty(subscriptionName);
    }

    @Override
    public Optional<String> watchedConsumerId() {
        return Optional.empty();
    }

    @Override
    public WorkDistributionChanges updateAssignments(SubscriptionAssignmentView initialState, SubscriptionAssignmentView targetState) {
        if (initialState.equals(targetState)) {
            return WorkDistributionChanges.NO_CHANGES;
        }
        List<SubscriptionAssignment> deletions = initialState.deletions(targetState).getAllAssignments();
        List<SubscriptionAssignment> additions = initialState.additions(targetState).getAllAssignments();
        deletions.forEach(this::dropAssignment);
        additions.forEach(this::addAssignment);
        return new WorkDistributionChanges(deletions.size(), additions.size());
    }

    private void dropAssignment(SubscriptionAssignment assignment) {
        String message = String.format("Dropping assignment [consumer=%s, subscription=%s]", assignment.getConsumerNodeId(), assignment.getSubscriptionName().getQualifiedName());
        logger.info(message);
        this.askCuratorPolitely(() -> {
            Void cfr_ignored_0 = (Void)this.curator.delete().guaranteed().forPath(this.pathSerializer.serialize(assignment.getSubscriptionName(), assignment.getConsumerNodeId()));
        }, message);
    }

    private void addAssignment(SubscriptionAssignment assignment) {
        String message = String.format("Adding assignment [consumer=%s, subscription=%s]", assignment.getConsumerNodeId(), assignment.getSubscriptionName().getQualifiedName());
        logger.info(message);
        this.askCuratorPolitely(() -> {
            String path = this.pathSerializer.serialize(assignment.getSubscriptionName(), assignment.getConsumerNodeId());
            ((ACLBackgroundPathAndBytesable)this.curator.create().creatingParentsIfNeeded().withMode(this.assignmentNodeCreationMode)).forPath(path, AUTO_ASSIGNED_MARKER);
        }, message);
    }

    private void removeSubscriptionEntryIfEmpty(SubscriptionName subscriptionName) {
        String message = String.format("Removing empty assignment node [subscription=%s]", subscriptionName.getQualifiedName());
        this.askCuratorPolitely(() -> {
            if (((List)this.curator.getChildren().forPath(this.pathSerializer.serialize(subscriptionName))).isEmpty()) {
                logger.info(message);
                this.curator.delete().guaranteed().forPath(this.pathSerializer.serialize(subscriptionName));
            }
        }, message);
    }

    private void askCuratorPolitely(CuratorTask task, String description) {
        try {
            task.run();
        }
        catch (KeeperException.NoNodeException | KeeperException.NodeExistsException ex) {
            logger.warn("An error occurred while writing to assignment registry, ignoring. Action: {}", (Object)description, (Object)ex);
        }
        catch (Exception ex) {
            logger.error("An error occurred while writing to assignment registry. Action: {}", (Object)description, (Object)ex);
            throw new InternalProcessingException((Throwable)ex);
        }
    }

    static interface CuratorTask {
        public void run() throws Exception;
    }
}

