/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import java.time.Clock;
import java.util.Objects;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;

public class ConsumerProcess
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcess.class);
    private final SpscArrayQueue<Signal> signals = new SpscArrayQueue(100);
    private final Clock clock;
    private final SubscriptionName subscriptionName;
    private final Consumer consumer;
    private final Retransmitter retransmitter;
    private volatile boolean running = true;
    private volatile long healtcheckRefreshTime;

    public ConsumerProcess(SubscriptionName subscriptionName, Consumer consumer, Retransmitter retransmitter, Clock clock) {
        this.subscriptionName = subscriptionName;
        this.consumer = consumer;
        this.retransmitter = retransmitter;
        this.clock = clock;
        this.healtcheckRefreshTime = clock.millis();
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName("consumer-" + this.subscriptionName);
            this.start();
            while (this.running) {
                this.consumer.consume(() -> this.processSignals());
            }
            this.stop();
        }
        finally {
            this.refreshHealthcheck();
            Thread.currentThread().setName("consumer-released-thread");
        }
    }

    public ConsumerProcess accept(Signal signal) {
        this.signals.add((Object)signal);
        return this;
    }

    public long healtcheckRefreshTime() {
        return this.healtcheckRefreshTime;
    }

    private void processSignals() {
        this.refreshHealthcheck();
        this.signals.drain(this::process);
        this.refreshHealthcheck();
    }

    private void refreshHealthcheck() {
        this.healtcheckRefreshTime = this.clock.millis();
    }

    private void process(Signal signal) {
        switch (signal.getType()) {
            case RESTART: {
                this.restart();
                break;
            }
            case STOP: {
                this.running = false;
                break;
            }
            case RETRANSMIT: {
                this.retransmit();
                break;
            }
            case UPDATE_SUBSCRIPTION: {
                this.consumer.updateSubscription((Subscription)signal.getExtractedPayload());
                break;
            }
            case UPDATE_TOPIC: {
                this.consumer.updateTopic((Topic)signal.getExtractedPayload());
            }
        }
    }

    private void start() {
        long startTime = this.clock.millis();
        logger.info("Starting consumer for subscription {}", (Object)this.subscriptionName);
        this.consumer.initialize();
        logger.info("Started consumer for subscription {} in {}ms", (Object)this.subscriptionName, (Object)(this.clock.millis() - startTime));
    }

    private void stop() {
        long startTime = this.clock.millis();
        logger.info("Stopping consumer for subscription {}", (Object)this.subscriptionName);
        this.consumer.tearDown();
        logger.info("Stopped consumer for subscription {} in {}ms", (Object)this.subscriptionName, (Object)(this.clock.millis() - startTime));
    }

    private void retransmit() {
        long startTime = this.clock.millis();
        logger.info("Starting retransmission for consumer of subscription {}", (Object)this.subscriptionName);
        this.stop();
        this.retransmitter.reloadOffsets(this.subscriptionName);
        this.start();
        logger.info("Done retransmission for consumer of subscription {} in {}ms", (Object)this.subscriptionName, (Object)(this.clock.millis() - startTime));
    }

    private void restart() {
        long startTime = this.clock.millis();
        logger.info("Restarting consumer for subscription {}", (Object)this.subscriptionName);
        this.stop();
        this.start();
        logger.info("Done restarting consumer for subscription {} in {}ms", (Object)this.subscriptionName, (Object)(this.clock.millis() - startTime));
    }

    public String toString() {
        return "ConsumerProcess{subscriptionName=" + this.subscriptionName + '}';
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ConsumerProcess that = (ConsumerProcess)o;
        return Objects.equals(this.subscriptionName, that.subscriptionName);
    }

    public int hashCode() {
        return Objects.hash(this.subscriptionName);
    }

    public SubscriptionName getSubscriptionName() {
        return this.subscriptionName;
    }

    public Consumer getConsumer() {
        return this.consumer;
    }
}

