/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import com.google.common.collect.ImmutableMap;
import java.time.Clock;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;

public class ConsumerProcess
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcess.class);
    private final SpscArrayQueue<Signal> signals = new SpscArrayQueue(100);
    private final Clock clock;
    private final Consumer consumer;
    private final Retransmitter retransmitter;
    private final java.util.function.Consumer<SubscriptionName> onConsumerStopped;
    private final long unhealthyAfter;
    private volatile boolean running = true;
    private volatile long healthcheckRefreshTime;
    private Map<Signal.SignalType, Long> signalTimesheet = new ConcurrentHashMap<Signal.SignalType, Long>();

    public ConsumerProcess(Signal startSignal, Consumer consumer, Retransmitter retransmitter, Clock clock, long unhealthyAfter, java.util.function.Consumer<SubscriptionName> onConsumerStopped) {
        this.consumer = consumer;
        this.retransmitter = retransmitter;
        this.onConsumerStopped = onConsumerStopped;
        this.clock = clock;
        this.healthcheckRefreshTime = clock.millis();
        this.unhealthyAfter = unhealthyAfter;
        this.signals.add((Object)startSignal);
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName("consumer-" + this.getSubscriptionName());
            while (this.running && !Thread.currentThread().isInterrupted()) {
                this.consumer.consume(this::processSignals);
            }
        }
        catch (Exception ex) {
            logger.error("Consumer process of subscription {} failed", (Object)this.getSubscriptionName(), (Object)ex);
        }
        finally {
            logger.info("Releasing consumer process thread of subscription {}", (Object)this.getSubscriptionName());
            this.refreshHealthcheck();
            this.stop();
            this.onConsumerStopped.accept(this.getSubscriptionName());
            Thread.currentThread().setName("consumer-released-thread");
        }
    }

    public ConsumerProcess accept(Signal signal) {
        this.signals.add((Object)signal);
        return this;
    }

    public boolean isHealthy() {
        return this.unhealthyAfter > this.lastSeen();
    }

    public long lastSeen() {
        return this.clock.millis() - this.healthcheckRefreshTime;
    }

    public long healthcheckRefreshTime() {
        return this.healthcheckRefreshTime;
    }

    private void processSignals() {
        this.refreshHealthcheck();
        this.signals.drain(this::process);
        this.refreshHealthcheck();
    }

    private void refreshHealthcheck() {
        this.healthcheckRefreshTime = this.clock.millis();
    }

    private void process(Signal signal) {
        try {
            switch (signal.getType()) {
                case START: {
                    this.start(signal);
                    break;
                }
                case STOP: {
                    logger.info("Stopping main loop for consumer {}. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
                    this.running = false;
                    break;
                }
                case RETRANSMIT: {
                    this.retransmit(signal);
                    break;
                }
                case UPDATE_SUBSCRIPTION: {
                    this.consumer.updateSubscription((Subscription)signal.getPayload());
                    break;
                }
                case UPDATE_TOPIC: {
                    this.consumer.updateTopic((Topic)signal.getPayload());
                    break;
                }
                case COMMIT: {
                    this.consumer.commit((Set)signal.getPayload());
                    break;
                }
                default: {
                    logger.warn("Unhandled signal found {}", (Object)signal);
                }
            }
            this.signalTimesheet.put(signal.getType(), this.clock.millis());
        }
        catch (Exception ex) {
            logger.error("Failed to process signal {}", (Object)signal, (Object)ex);
        }
    }

    private void start(Signal signal) {
        long startTime = this.clock.millis();
        logger.info("Starting consumer for subscription {}. {}", (Object)this.getSubscriptionName(), (Object)signal.getLogWithIdAndType());
        this.consumer.initialize();
        long initializationTime = this.clock.millis();
        logger.info("Started consumer for subscription {} in {}ms. {}", new Object[]{this.getSubscriptionName(), initializationTime - startTime, signal.getLogWithIdAndType()});
        this.signalTimesheet.put(Signal.SignalType.START, initializationTime);
    }

    private void stop() {
        long startTime = this.clock.millis();
        logger.info("Stopping consumer for subscription {}", (Object)this.getSubscriptionName());
        this.consumer.tearDown();
        logger.info("Stopped consumer for subscription {} in {}ms", (Object)this.getSubscriptionName(), (Object)(this.clock.millis() - startTime));
    }

    private void retransmit(Signal signal) {
        long startTime = this.clock.millis();
        logger.info("Starting retransmission for consumer of subscription {}. {}", (Object)this.getSubscriptionName(), (Object)signal.getLogWithIdAndType());
        try {
            this.retransmitter.reloadOffsets(this.getSubscriptionName(), this.consumer);
            logger.info("Done retransmission for consumer of subscription {} in {}ms", (Object)this.getSubscriptionName(), (Object)(this.clock.millis() - startTime));
        }
        catch (Exception ex) {
            logger.error("Failed retransmission for consumer of subscription {} in {}ms", new Object[]{this.getSubscriptionName(), this.clock.millis() - startTime, ex});
        }
    }

    public String toString() {
        return "ConsumerProcess{subscriptionName=" + this.getSubscriptionName() + '}';
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ConsumerProcess that = (ConsumerProcess)o;
        return Objects.equals(this.getSubscriptionName(), that.getSubscriptionName());
    }

    public int hashCode() {
        return Objects.hash(this.getSubscriptionName());
    }

    public Subscription getSubscription() {
        return this.consumer.getSubscription();
    }

    public Map<Signal.SignalType, Long> getSignalTimesheet() {
        return ImmutableMap.copyOf(this.signalTimesheet);
    }

    SubscriptionName getSubscriptionName() {
        return this.getSubscription().getQualifiedName();
    }
}

