/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.edison.logging;

import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.eventsource.EventSource;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class EventSourceSyncDurationLogger {
    private static final Logger LOG = LoggerFactory.getLogger(EventSourceSyncDurationLogger.class);
    private static final long TEN_SECONDS = 10000L;
    private final Set<String> allChannels;
    private final Set<String> healthyChannels;
    private final AtomicBoolean startupDone = new AtomicBoolean(false);
    private Clock clock = Clock.systemDefaultZone();
    private Instant startTime;
    private final Map<String, Instant> mapChannelToStartTime = new ConcurrentHashMap<String, Instant>();

    void setClock(Clock clock) {
        this.clock = clock;
    }

    @Autowired
    public EventSourceSyncDurationLogger(Optional<List<EventSource>> eventSources) {
        this.allChannels = eventSources.orElse(Collections.emptyList()).stream().map(EventSource::getChannelName).collect(Collectors.toSet());
        this.healthyChannels = ConcurrentHashMap.newKeySet();
    }

    @EventListener
    public void on(MessageReceiverNotification notification) {
        if (this.startupDone.get()) {
            return;
        }
        String channelName = notification.getChannelName();
        if (notification.getStatus() == MessageReceiverStatus.STARTING) {
            this.mapChannelToStartTime.put(channelName, this.clock.instant());
            if (this.startTime == null) {
                this.startTime = this.clock.instant();
            }
        } else if (notification.getStatus() == MessageReceiverStatus.RUNNING && this.mapChannelToStartTime.containsKey(channelName) && this.isInSync(notification.getChannelDurationBehind())) {
            Instant stopTime = this.clock.instant();
            Duration duration = Duration.between(this.mapChannelToStartTime.get(channelName), stopTime);
            this.log(String.format("KinesisEventSource '%s' duration for getting in sync: %s", channelName, duration.toString()));
            this.healthyChannels.add(channelName);
            if (this.allChannelsAreUpToDate()) {
                this.log(String.format("All channels up to date after %s", Duration.between(this.startTime, stopTime)));
                this.startupDone.set(true);
            }
            this.mapChannelToStartTime.remove(channelName);
        }
    }

    private boolean allChannelsAreUpToDate() {
        return this.healthyChannels.containsAll(this.allChannels);
    }

    private boolean isInSync(Optional<ChannelDurationBehind> channelDurationBehind) {
        return channelDurationBehind.isPresent() && channelDurationBehind.get().getDurationBehind().toMillis() <= 10000L;
    }

    void log(String message) {
        LOG.info(message);
    }
}

