/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryChannel
extends MessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryChannel.class);
    private final List<Message<String>> eventQueue;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public InMemoryChannel(String channelName) {
        super(channelName, new ObjectMapper().registerModule((Module)new JavaTimeModule()));
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public InMemoryChannel(String channelName, ObjectMapper objectMapper) {
        super(channelName, objectMapper);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public void send(Message<String> message) {
        LOG.info("Sending {} to {}", message, (Object)this.getChannelName());
        this.eventQueue.add(message);
    }

    @Override
    @Nonnull
    public ChannelPosition consume(@Nonnull ChannelPosition startFrom, @Nonnull Predicate<Message<?>> stopCondition) {
        int pos;
        boolean shouldStop = false;
        int n = pos = startFrom.shard(this.getChannelName()).startFrom() == StartFrom.HORIZON ? -1 : Integer.valueOf(startFrom.shard(this.getChannelName()).position());
        do {
            if (this.hasMessageAfter(pos)) {
                Message<String> receivedMessage;
                Message<String> interceptedMessage;
                if ((interceptedMessage = this.intercept(Message.message((receivedMessage = this.eventQueue.get(++pos)).getKey(), Header.responseHeader(null, Instant.now()), receivedMessage.getPayload()))) == null) continue;
                this.getMessageDispatcher().accept(interceptedMessage);
                shouldStop = stopCondition.test(interceptedMessage);
                continue;
            }
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (!shouldStop && !this.stopSignal.get());
        return ChannelPosition.channelPosition(ShardPosition.fromPosition(this.getChannelName(), String.valueOf(pos)));
    }

    @Override
    public void stop() {
        this.stopSignal.set(true);
    }

    private boolean hasMessageAfter(int pos) {
        return this.eventQueue.size() > pos + 1;
    }
}

