/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Iterables;
import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class InMemoryChannel
extends AbstractMessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryChannel.class);
    private final List<Message<String>> eventQueue;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public InMemoryChannel(String channelName) {
        super(channelName, new ObjectMapper().registerModule((Module)new JavaTimeModule()), null);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public InMemoryChannel(String channelName, ObjectMapper objectMapper, ApplicationEventPublisher eventPublisher) {
        super(channelName, objectMapper, eventPublisher);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public void send(Message<String> message) {
        LOG.info("Sending {} to {}", message, (Object)this.getChannelName());
        this.eventQueue.add(message);
    }

    @Override
    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition startFrom, @Nonnull Instant until) {
        this.publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + this.getChannelName(), null);
        Message lastMessage = this.eventQueue.isEmpty() ? null : (Message)Iterables.getLast(this.eventQueue);
        ChannelDurationBehind durationBehind = lastMessage != null ? ChannelDurationBehind.channelDurationBehind().with(this.getChannelName(), Duration.between(lastMessage.getHeader().getArrivalTimestamp(), Instant.now())).build() : null;
        this.publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + this.getChannelName(), durationBehind);
        return CompletableFuture.supplyAsync(() -> {
            int pos;
            boolean shouldStop = false;
            int n = pos = startFrom.shard(this.getChannelName()).startFrom() == StartFrom.HORIZON ? -1 : Integer.valueOf(startFrom.shard(this.getChannelName()).position());
            do {
                if (this.hasMessageAfter(pos)) {
                    Message<String> receivedMessage;
                    Message<String> interceptedMessage;
                    if ((interceptedMessage = this.intercept(Message.message((receivedMessage = this.eventQueue.get(++pos)).getKey(), Header.responseHeader(null, Instant.now()), receivedMessage.getPayload()))) == null) continue;
                    this.getMessageDispatcher().accept(interceptedMessage);
                    shouldStop = !until.isAfter(interceptedMessage.getHeader().getArrivalTimestamp());
                    continue;
                }
                shouldStop = !until.isAfter(Instant.now());
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while (!shouldStop && !this.stopSignal.get());
            this.publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + this.getChannelName(), durationBehind);
            return ChannelPosition.channelPosition(ShardPosition.fromPosition(this.getChannelName(), String.valueOf(pos)));
        });
    }

    @Override
    public void stop() {
        this.stopSignal.set(true);
    }

    private boolean hasMessageAfter(int pos) {
        return this.eventQueue.size() > pos + 1;
    }
}

