/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.core.partitioned.impl;

import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback;
import de.zalando.paradox.nakadi.consumer.core.partitioned.impl.AbstractPartitionCoordinator;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.LoggerFactory;

public class SimplePartitionCoordinator
extends AbstractPartitionCoordinator {
    private final AtomicBoolean running = new AtomicBoolean(true);
    private boolean startNewestAvailableOffset = true;

    public SimplePartitionCoordinator() {
        super(LoggerFactory.getLogger(SimplePartitionCoordinator.class));
    }

    @Override
    public void init() {
        if (this.running.compareAndSet(false, true)) {
            this.log.info("Init coordinator");
        } else {
            this.log.info("Coordinator is already running");
        }
    }

    @Override
    public void close() {
        if (this.running.compareAndSet(true, false)) {
            this.log.info("Closing coordinator");
        } else {
            this.log.warn("Coordinator is already closed");
        }
    }

    @Override
    public void rebalance(EventTypePartitions consumerPartitions, Collection<NakadiPartition> nakadiPartitions) {
        if (this.running.get()) {
            this.revokePartitions(consumerPartitions.getEventType(), this.getPartitionsToRevoke(consumerPartitions, nakadiPartitions));
            this.assignPartitions(consumerPartitions.getEventType(), this.getPartitionsToAssign(consumerPartitions, nakadiPartitions), nakadiPartitions, this.getOffsetSelector(consumerPartitions.getEventType()));
        } else {
            this.log.warn("Coordinator is not running.");
        }
    }

    private Function<NakadiPartition, EventTypeCursor> getOffsetSelector(EventType eventType) {
        return entry -> {
            String offset;
            if (this.startNewestAvailableOffset) {
                offset = entry.getNewestAvailableOffset();
            } else {
                offset = entry.getOldestAvailableOffset();
                this.log.warn("Using oldest available offset [{}] without persistent storage.", (Object)offset);
            }
            return EventTypeCursor.of(EventTypePartition.of(eventType, entry.getPartition()), offset);
        };
    }

    @Override
    public void commit(EventTypeCursor cursor) {
        this.log.debug("Commit {} ", (Object)cursor);
        PartitionCommitCallback callback = this.getPartitionCommitCallback(cursor.getEventTypePartition());
        if (null != callback) {
            callback.onCommitComplete(cursor);
        }
    }

    @Override
    public void flush(EventTypePartition eventTypePartition) {
        this.log.debug("Flush {} ", (Object)eventTypePartition);
    }

    @Override
    public void error(Throwable t, EventTypePartition eventTypePartition) {
        if (ThrowableUtils.isUnrecoverableException(t)) {
            this.log.error("Error [{}] reason [{}]", (Object)eventTypePartition, (Object)ExceptionUtils.getMessage((Throwable)t));
            ThrowableUtils.throwException(t);
        } else {
            this.log.error("Error [{}] reason [{}]", new Object[]{eventTypePartition, ExceptionUtils.getMessage((Throwable)t), t});
        }
    }

    @Override
    public void error(int statusCode, String content, EventTypePartition eventTypePartition) {
        this.log.error("Error [{}] code [{} / {}]", new Object[]{eventTypePartition, statusCode, content});
    }

    public void setStartNewestAvailableOffset(boolean startNewestAvailableOffset) {
        this.startNewestAvailableOffset = startNewestAvailableOffset;
    }
}

