/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import com.google.common.collect.ImmutableMap;
import java.time.Clock;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;

public class ConsumerProcess
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcess.class);
    private final SpscArrayQueue<Signal> signals = new SpscArrayQueue(100);
    private final Clock clock;
    private final SubscriptionName subscriptionName;
    private final Consumer consumer;
    private final Retransmitter retransmitter;
    private final java.util.function.Consumer<Signal> shutdownCallback;
    private final long unhealthyAfter;
    private volatile boolean running = true;
    private volatile long healthcheckRefreshTime;
    private Map<Signal.SignalType, Long> signalTimesheet = new ConcurrentHashMap<Signal.SignalType, Long>();
    private Signal lastSignal;

    public ConsumerProcess(Signal startSignal, Retransmitter retransmitter, java.util.function.Consumer<Signal> shutdownCallback, Clock clock, long unhealthyAfter) {
        this.subscriptionName = startSignal.getTarget();
        this.consumer = (Consumer)startSignal.getPayload();
        this.retransmitter = retransmitter;
        this.shutdownCallback = shutdownCallback;
        this.clock = clock;
        this.healthcheckRefreshTime = clock.millis();
        this.unhealthyAfter = unhealthyAfter;
        this.signals.add((Object)startSignal);
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName("consumer-" + this.subscriptionName);
            while (this.running && !Thread.interrupted()) {
                this.consumer.consume(this::processSignals);
            }
            this.stop();
        }
        catch (Exception ex) {
            logger.error("Consumer process of subscription {} failed", (Object)this.subscriptionName, (Object)ex);
        }
        finally {
            logger.info("Releasing consumer process thread of subscription {}", (Object)this.subscriptionName);
            this.shutdownCallback.accept(this.lastSignal);
            this.refreshHealthcheck();
            Thread.currentThread().setName("consumer-released-thread");
        }
    }

    public ConsumerProcess accept(Signal signal) {
        this.signals.add((Object)signal);
        return this;
    }

    public boolean isHealthy() {
        return this.unhealthyAfter > this.lastSeen();
    }

    public long lastSeen() {
        return this.clock.millis() - this.healthcheckRefreshTime;
    }

    public long healthcheckRefreshTime() {
        return this.healthcheckRefreshTime;
    }

    private void processSignals() {
        this.refreshHealthcheck();
        this.signals.drain(this::process);
        this.refreshHealthcheck();
    }

    private void refreshHealthcheck() {
        this.healthcheckRefreshTime = this.clock.millis();
    }

    private void process(Signal signal) {
        this.lastSignal = signal;
        try {
            switch (signal.getType()) {
                case START: {
                    this.start(signal);
                    break;
                }
                case RESTART: {
                    this.restart(signal);
                    break;
                }
                case STOP: {
                    logger.info("Stopping main loop for consumer {}. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
                    this.running = false;
                    break;
                }
                case RETRANSMIT: {
                    this.retransmit(signal);
                    break;
                }
                case UPDATE_SUBSCRIPTION: {
                    this.consumer.updateSubscription((Subscription)signal.getPayload());
                    break;
                }
                case UPDATE_TOPIC: {
                    this.consumer.updateTopic((Topic)signal.getPayload());
                    break;
                }
                case COMMIT: {
                    this.consumer.commit((Set)signal.getPayload());
                    break;
                }
                default: {
                    logger.warn("Unhandled signal found {}", (Object)signal);
                }
            }
            this.signalTimesheet.put(signal.getType(), this.clock.millis());
        }
        catch (Exception ex) {
            logger.error("Failed to process signal {}", (Object)signal, (Object)ex);
        }
    }

    private void start(Signal signal) {
        long startTime = this.clock.millis();
        logger.info("Starting consumer for subscription {}. {}", (Object)this.subscriptionName, (Object)signal.getLogWithIdAndType());
        this.consumer.initialize();
        long initializationTime = this.clock.millis();
        logger.info("Started consumer for subscription {} in {}ms. {}", new Object[]{this.subscriptionName, initializationTime - startTime, signal.getLogWithIdAndType()});
        this.signalTimesheet.put(Signal.SignalType.START, initializationTime);
    }

    private void stop() {
        long startTime = this.clock.millis();
        logger.info("Stopping consumer for subscription {}", (Object)this.subscriptionName);
        this.consumer.tearDown();
        logger.info("Stopped consumer for subscription {} in {}ms", (Object)this.subscriptionName, (Object)(this.clock.millis() - startTime));
    }

    private void retransmit(Signal signal) {
        long startTime = this.clock.millis();
        logger.info("Starting retransmission for consumer of subscription {}. {}", (Object)this.subscriptionName, (Object)signal.getLogWithIdAndType());
        try {
            this.retransmitter.reloadOffsets(this.subscriptionName, this.consumer);
            logger.info("Done retransmission for consumer of subscription {} in {}ms", (Object)this.subscriptionName, (Object)(this.clock.millis() - startTime));
        }
        catch (Exception ex) {
            logger.error("Failed retransmission for consumer of subscription {} in {}ms", new Object[]{this.subscriptionName, this.clock.millis() - startTime, ex});
        }
    }

    private void restart(Signal signal) {
        long startTime = this.clock.millis();
        try {
            logger.info("Restarting consumer for subscription {}. {}", (Object)this.subscriptionName, (Object)signal.getLogWithIdAndType());
            this.stop();
            this.start(signal);
            logger.info("Done restarting consumer for subscription {} in {}ms. {}", new Object[]{this.subscriptionName, this.clock.millis() - startTime, signal.getLogWithIdAndType()});
        }
        catch (Exception e) {
            logger.error("Failed restarting consumer for subscription {} in {}ms. {}", new Object[]{this.subscriptionName, this.clock.millis() - startTime, signal.getLogWithIdAndType(), e});
        }
    }

    public String toString() {
        return "ConsumerProcess{subscriptionName=" + this.subscriptionName + '}';
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ConsumerProcess that = (ConsumerProcess)o;
        return Objects.equals(this.subscriptionName, that.subscriptionName);
    }

    public int hashCode() {
        return Objects.hash(this.subscriptionName);
    }

    public SubscriptionName getSubscriptionName() {
        return this.subscriptionName;
    }

    public Consumer getConsumer() {
        return this.consumer;
    }

    public Map<Signal.SignalType, Long> getSignalTimesheet() {
        return ImmutableMap.copyOf(this.signalTimesheet);
    }
}

