/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.boot;

import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.boot.ReplayHandler;
import de.zalando.paradox.nakadi.consumer.boot.components.EventReceiverRegistry;
import de.zalando.paradox.nakadi.consumer.boot.components.EventTypeConsumer;
import de.zalando.paradox.nakadi.consumer.core.EventHandler;
import de.zalando.paradox.nakadi.consumer.core.FailedEventSource;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.FailedEvent;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailedEventReplayer {
    private final EventReceiverRegistry eventReceiverRegistry;
    private final Map<String, FailedEventSource> failedEventSourceMap;
    private final ReplayHandler replayHandler;
    private static final String EVENT_SOURCE_NAME_IS_NOT_AVAILABLE_MESSAGE = "Event source name is not available.";
    private static final Logger LOGGER = LoggerFactory.getLogger(FailedEventReplayer.class);

    public FailedEventReplayer(EventReceiverRegistry eventReceiverRegistry, List<FailedEventSource> failedEventSources, ReplayHandler replayHandler) {
        this.eventReceiverRegistry = eventReceiverRegistry;
        this.replayHandler = replayHandler;
        this.failedEventSourceMap = failedEventSources.stream().collect(Collectors.toMap(FailedEventSource::getEventSourceName, Function.identity()));
    }

    public Long getTotalNumberOfFailedEvents(String eventSourceName) {
        Preconditions.checkArgument((boolean)this.failedEventSourceMap.containsKey(eventSourceName), (Object)EVENT_SOURCE_NAME_IS_NOT_AVAILABLE_MESSAGE);
        return this.failedEventSourceMap.get(eventSourceName).getSize();
    }

    public Collection<String> getFailedEventSources() {
        return this.failedEventSourceMap.keySet();
    }

    public void replay(String eventSourceName, Long numberOfFailedEvents, boolean breakProcessingOnException) {
        Preconditions.checkArgument((boolean)this.failedEventSourceMap.containsKey(eventSourceName), (Object)EVENT_SOURCE_NAME_IS_NOT_AVAILABLE_MESSAGE);
        FailedEventSource failedEventSource = this.failedEventSourceMap.get(eventSourceName);
        long totalNumberOfFailedEvents = failedEventSource.getSize();
        long upperBound = Math.min(numberOfFailedEvents, totalNumberOfFailedEvents);
        Optional failedEventOptional = Optional.empty();
        for (long counter = 0L; counter < upperBound; ++counter) {
            try {
                failedEventOptional = failedEventSource.getFailedEvent();
                failedEventOptional.ifPresent(failedEvent -> this.replay((FailedEvent)failedEvent, (FailedEventSource<FailedEvent>)failedEventSource));
                continue;
            }
            catch (Exception ex) {
                if (breakProcessingOnException) {
                    throw new IllegalStateException(String.format("Exception occurred while processing the event. Event = [%s]", failedEventOptional.orElse(null)), ex);
                }
                LOGGER.error(String.format("Exception occurred while processing the event source = [%s]", eventSourceName), (Throwable)ex);
            }
        }
    }

    private void replay(FailedEvent failedEvent, FailedEventSource<FailedEvent> failedEventSource) {
        EventHandler<?> handler = Objects.requireNonNull(this.eventReceiverRegistry.getEventTypeConsumerHandler(new EventTypeConsumer(failedEvent.getEventType().getName(), failedEvent.getConsumerName())), "handler not found");
        this.replayHandler.handle(failedEvent.getConsumerName(), handler, new EventTypePartition(failedEvent.getEventType(), failedEvent.getPartition()), failedEvent.getRawEvent());
        failedEventSource.commit(failedEvent);
    }
}

