/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadi.service.subscription;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.exceptions.ExceptionWrapper;
import org.zalando.nakadi.service.subscription.KafkaClient;
import org.zalando.nakadi.service.subscription.StreamParameters;
import org.zalando.nakadi.service.subscription.SubscriptionOutput;
import org.zalando.nakadi.service.subscription.SubscriptionStreamer;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.service.subscription.state.CleanupState;
import org.zalando.nakadi.service.subscription.state.DummyState;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.state.State;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;

public class StreamingContext
implements SubscriptionStreamer {
    private final StreamParameters parameters;
    private final Session session;
    private final ZkSubscriptionClient zkClient;
    private final KafkaClient kafkaClient;
    private final SubscriptionOutput out;
    private final long kafkaPollTimeout;
    private final AtomicBoolean connectionReady;
    private final ScheduledExecutorService timer;
    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
    private final BiFunction<Session[], Partition[], Partition[]> rebalancer;
    private final String loggingPath;
    private State currentState = new DummyState();
    private ZKSubscription clientListChanges;
    public static final State DEAD_STATE = new DummyState();
    private final Logger log;

    StreamingContext(SubscriptionOutput out, StreamParameters parameters, Session session, ScheduledExecutorService timer, ZkSubscriptionClient zkClient, KafkaClient kafkaClient, BiFunction<Session[], Partition[], Partition[]> rebalancer, long kafkaPollTimeout, String loggingPath, AtomicBoolean connectionReady) {
        this.out = out;
        this.parameters = parameters;
        this.session = session;
        this.rebalancer = rebalancer;
        this.timer = timer;
        this.zkClient = zkClient;
        this.kafkaClient = kafkaClient;
        this.kafkaPollTimeout = kafkaPollTimeout;
        this.loggingPath = loggingPath + ".stream";
        this.log = LoggerFactory.getLogger((String)loggingPath);
        this.connectionReady = connectionReady;
    }

    public StreamParameters getParameters() {
        return this.parameters;
    }

    public ZkSubscriptionClient getZkClient() {
        return this.zkClient;
    }

    public String getSessionId() {
        return this.session.getId();
    }

    public KafkaClient getKafkaClient() {
        return this.kafkaClient;
    }

    public SubscriptionOutput getOut() {
        return this.out;
    }

    public long getKafkaPollTimeout() {
        return this.kafkaPollTimeout;
    }

    @Override
    public void stream() throws InterruptedException {
        this.streamInternal(new StartingState());
    }

    void streamInternal(State firstState) throws InterruptedException {
        this.switchState(firstState);
        while (this.currentState != DEAD_STATE) {
            Runnable task = this.taskQueue.poll(1L, TimeUnit.HOURS);
            try {
                if (task == null) continue;
                task.run();
            }
            catch (ExceptionWrapper ex) {
                this.log.error("Failed to process task " + task + ", will rethrow original error", (Throwable)ex);
                this.switchState(new CleanupState(ex.getWrapped()));
            }
            catch (RuntimeException ex) {
                this.log.error("Failed to process task " + task + ", code carefully!", (Throwable)ex);
                this.switchState(new CleanupState(ex));
            }
        }
    }

    public void switchState(State newState) {
        this.addTask(() -> {
            this.log.info("Switching state from " + this.currentState.getClass().getSimpleName());
            this.currentState.onExit();
            this.currentState = newState;
            this.log.info("Switching state to " + this.currentState.getClass().getSimpleName());
            this.currentState.setContext(this, this.loggingPath);
            this.currentState.onEnter();
        });
    }

    public void registerSession() {
        this.log.info("Registering session {}", (Object)this.session);
        this.clientListChanges = this.zkClient.subscribeForSessionListChanges(() -> this.addTask(this::rebalance));
        this.zkClient.registerSession(this.session);
    }

    public void unregisterSession() {
        this.log.info("Unregistering session {}", (Object)this.session);
        if (null != this.clientListChanges) {
            try {
                this.clientListChanges.cancel();
            }
            finally {
                this.clientListChanges = null;
                this.zkClient.unregisterSession(this.session);
            }
        }
    }

    public boolean isInState(State state) {
        return this.currentState == state;
    }

    public void addTask(Runnable task) {
        this.taskQueue.offer(task);
    }

    public void scheduleTask(Runnable task, long timeout, TimeUnit unit) {
        this.timer.schedule(() -> this.addTask(task), timeout, unit);
    }

    public boolean isConnectionReady() {
        return this.connectionReady.get();
    }

    private void rebalance() {
        if (null != this.clientListChanges) {
            this.clientListChanges.refresh();
            this.zkClient.runLocked(() -> {
                Partition[] changeset = this.rebalancer.apply(this.zkClient.listSessions(), this.zkClient.listPartitions());
                if (changeset.length > 0) {
                    Stream.of(changeset).forEach(this.zkClient::updatePartitionConfiguration);
                    this.zkClient.incrementTopology();
                }
            });
        }
    }
}

