/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Key;
import de.otto.synapse.message.Message;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class InMemoryChannel
extends AbstractMessageLogReceiverEndpoint
implements MessageLogReceiverEndpoint,
MessageQueueReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryChannel.class);
    private final List<Message<String>> eventQueue;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public InMemoryChannel(String channelName, MessageInterceptorRegistry interceptorRegistry) {
        super(channelName, interceptorRegistry, null);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public InMemoryChannel(String channelName, MessageInterceptorRegistry interceptorRegistry, ApplicationEventPublisher eventPublisher) {
        super(channelName, interceptorRegistry, eventPublisher);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public synchronized void send(Message<String> message) {
        int position = this.eventQueue.size();
        LOG.info("Sending {} to {} at position{}", new Object[]{message, this.getChannelName(), position});
        ImmutableMap attributes = ImmutableMap.builder().putAll((Map)message.getHeader().getAll()).put((Object)"synapse_msg_arrival_ts", (Object)Instant.now().toString()).build();
        this.eventQueue.add((Message<String>)Message.message((Key)message.getKey(), (Header)Header.of((ShardPosition)ShardPosition.fromPosition((String)this.getChannelName(), (String)String.valueOf(position)), (ImmutableMap)attributes), (Object)message.getPayload()));
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition startFrom, @Nonnull Predicate<ShardResponse> stopCondition) {
        this.publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + this.getChannelName(), null);
        this.publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + this.getChannelName(), null);
        return CompletableFuture.supplyAsync(() -> {
            ImmutableList messages;
            boolean shouldStop;
            ShardPosition shardPosition = startFrom.shard(this.getChannelName());
            AtomicInteger pos = new AtomicInteger(this.positionOf(shardPosition));
            do {
                if (this.hasMessageAfter(pos.get())) {
                    int index = pos.incrementAndGet();
                    Message<String> receivedMessage = this.eventQueue.get(index);
                    messages = ImmutableList.of(receivedMessage);
                    LOG.info("Received message from channel={} at position={}: message={}", new Object[]{this.getChannelName(), index, receivedMessage});
                    Message interceptedMessage = this.intercept(receivedMessage);
                    if (interceptedMessage == null) continue;
                    this.getMessageDispatcher().accept(interceptedMessage);
                    continue;
                }
                messages = ImmutableList.of();
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while (!(shouldStop = stopCondition.test(ShardResponse.shardResponse((ShardPosition)(shardPosition = ShardPosition.fromPosition((String)this.getChannelName(), (String)String.valueOf(pos))), (Duration)this.durationBehind(pos.get()), (ImmutableList)messages))) && !this.stopSignal.get());
            this.publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + this.getChannelName(), null);
            return ChannelPosition.channelPosition((ShardPosition[])new ShardPosition[]{shardPosition});
        }, Executors.newSingleThreadExecutor());
    }

    private int positionOf(ShardPosition shardPosition) {
        return shardPosition.startFrom() == StartFrom.HORIZON ? -1 : Integer.valueOf(shardPosition.position());
    }

    private Duration durationBehind(int currentPos) {
        if (currentPos == -1 && this.eventQueue.size() > 0) {
            return Duration.between(((Message)Iterables.getLast(this.eventQueue)).getHeader().getAsInstant("synapse_msg_arrival_ts"), this.eventQueue.get(this.eventQueue.size() - 1).getHeader().getAsInstant("synapse_msg_arrival_ts")).abs();
        }
        if (currentPos >= 0 && currentPos <= this.eventQueue.size()) {
            return Duration.between(((Message)Iterables.getLast(this.eventQueue)).getHeader().getAsInstant("synapse_msg_arrival_ts"), this.eventQueue.get(currentPos).getHeader().getAsInstant("synapse_msg_arrival_ts")).abs();
        }
        return Duration.ZERO;
    }

    public CompletableFuture<Void> consume() {
        this.publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + this.getChannelName(), null);
        Message lastMessage = this.eventQueue.isEmpty() ? null : (Message)Iterables.getLast(this.eventQueue);
        ChannelDurationBehind durationBehind = lastMessage != null ? ChannelDurationBehind.channelDurationBehind().with(this.getChannelName(), Duration.between(lastMessage.getHeader().getAsInstant("synapse_msg_arrival_ts"), Instant.now())).build() : null;
        this.publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + this.getChannelName(), durationBehind);
        return CompletableFuture.supplyAsync(() -> {
            do {
                if (!this.eventQueue.isEmpty()) {
                    Message<String> receivedMessage = this.eventQueue.remove(0);
                    Message interceptedMessage = this.intercept(Message.message((Key)receivedMessage.getKey(), (Header)Header.of(null, (ImmutableMap)receivedMessage.getHeader().getAll()), (Object)receivedMessage.getPayload()));
                    if (interceptedMessage == null) continue;
                    this.getMessageDispatcher().accept(interceptedMessage);
                    continue;
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while (!this.stopSignal.get());
            this.publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + this.getChannelName(), durationBehind);
            return null;
        }, Executors.newSingleThreadExecutor());
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    private synchronized boolean hasMessageAfter(int pos) {
        return this.eventQueue.size() > pos + 1;
    }
}

