/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Iterables;
import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class InMemoryChannel
extends AbstractMessageLogReceiverEndpoint
implements MessageLogReceiverEndpoint,
MessageQueueReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryChannel.class);
    private final List<Message<String>> eventQueue;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public InMemoryChannel(String channelName) {
        super(channelName, new ObjectMapper().registerModule((Module)new JavaTimeModule()), null);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public InMemoryChannel(String channelName, ObjectMapper objectMapper, ApplicationEventPublisher eventPublisher) {
        super(channelName, objectMapper, eventPublisher);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public synchronized void send(Message<String> message) {
        LOG.info("Sending {} to {} at position{}", new Object[]{message, this.getChannelName(), this.eventQueue.size()});
        this.eventQueue.add(message);
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition startFrom, @Nonnull Instant until) {
        this.publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + this.getChannelName(), null);
        Message lastMessage = this.eventQueue.isEmpty() ? null : (Message)Iterables.getLast(this.eventQueue);
        ChannelDurationBehind durationBehind = lastMessage != null ? ChannelDurationBehind.channelDurationBehind().with(this.getChannelName(), Duration.between(lastMessage.getHeader().getArrivalTimestamp(), Instant.now())).build() : null;
        this.publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + this.getChannelName(), durationBehind);
        return CompletableFuture.supplyAsync(() -> {
            boolean shouldStop = false;
            AtomicInteger pos = new AtomicInteger(startFrom.shard(this.getChannelName()).startFrom() == StartFrom.HORIZON ? -1 : Integer.valueOf(startFrom.shard(this.getChannelName()).position()));
            do {
                if (this.hasMessageAfter(pos.get())) {
                    int index = pos.incrementAndGet();
                    Message<String> receivedMessage = this.eventQueue.get(index);
                    LOG.info("Received message from channel={} at position={}: message={}", new Object[]{this.getChannelName(), index, receivedMessage});
                    Message interceptedMessage = this.intercept(Message.message((String)receivedMessage.getKey(), (Header)Header.responseHeader(null, (Instant)Instant.now()), (Object)receivedMessage.getPayload()));
                    if (interceptedMessage == null) continue;
                    this.getMessageDispatcher().accept(interceptedMessage);
                    shouldStop = !until.isAfter(interceptedMessage.getHeader().getArrivalTimestamp());
                    continue;
                }
                shouldStop = !until.isAfter(Instant.now());
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while (!shouldStop && !this.stopSignal.get());
            this.publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + this.getChannelName(), durationBehind);
            return ChannelPosition.channelPosition((ShardPosition[])new ShardPosition[]{ShardPosition.fromPosition((String)this.getChannelName(), (String)String.valueOf(pos))});
        }, Executors.newSingleThreadExecutor());
    }

    public CompletableFuture<Void> consume() {
        this.publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + this.getChannelName(), null);
        Message lastMessage = this.eventQueue.isEmpty() ? null : (Message)Iterables.getLast(this.eventQueue);
        ChannelDurationBehind durationBehind = lastMessage != null ? ChannelDurationBehind.channelDurationBehind().with(this.getChannelName(), Duration.between(lastMessage.getHeader().getArrivalTimestamp(), Instant.now())).build() : null;
        this.publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + this.getChannelName(), durationBehind);
        return CompletableFuture.supplyAsync(() -> {
            do {
                if (!this.eventQueue.isEmpty()) {
                    Message<String> receivedMessage = this.eventQueue.remove(0);
                    Message interceptedMessage = this.intercept(Message.message((String)receivedMessage.getKey(), (Header)Header.responseHeader(null, (Instant)Instant.now()), (Object)receivedMessage.getPayload()));
                    if (interceptedMessage == null) continue;
                    this.getMessageDispatcher().accept(interceptedMessage);
                    continue;
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while (!this.stopSignal.get());
            this.publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + this.getChannelName(), durationBehind);
            return null;
        }, Executors.newSingleThreadExecutor());
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    private synchronized boolean hasMessageAfter(int pos) {
        return this.eventQueue.size() > pos + 1;
    }
}

