/*
 * Decompiled with CFR 0.152.
 */
package org.pipservices.messaging.queues;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.pipservices.components.auth.CredentialParams;
import org.pipservices.components.connect.ConnectionParams;
import org.pipservices.messaging.queues.IMessageReceiver;
import org.pipservices.messaging.queues.MessageEnvelop;
import org.pipservices.messaging.queues.MessageQueue;
import org.pipservices.messaging.queues.MessagingCapabilities;

public class MemoryMessageQueue
extends MessageQueue {
    private final long _defaultLockTimeout = 30000L;
    private final long _defaultWaitTimeout = 5000L;
    private List<MessageEnvelop> _messages = new ArrayList<MessageEnvelop>();
    private int _lockTokenSequence = 0;
    private Map<Integer, LockedMessage> _lockedMessages = new HashMap<Integer, LockedMessage>();
    private boolean _listening;
    private boolean _opened = false;

    public MemoryMessageQueue() {
        this(null);
    }

    public MemoryMessageQueue(String name) {
        super(name);
        this._capabilities = new MessagingCapabilities(true, true, true, true, true, true, true, false, true);
    }

    @Override
    public boolean isOpen() {
        return this._opened;
    }

    @Override
    public void open(String correlationId, ConnectionParams connection, CredentialParams credential) {
        this._logger.trace(correlationId, "Opened queue %s", new Object[]{this});
        this._opened = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(String correlationId) {
        Object object = this._lock;
        synchronized (object) {
            this._listening = false;
            this._opened = false;
            this._lock.notifyAll();
        }
        this._logger.trace(correlationId, "Closed queue %s", new Object[]{this});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clear(String correlationId) {
        Object object = this._lock;
        synchronized (object) {
            this._messages.clear();
            this._lockedMessages.clear();
        }
        this._logger.trace(correlationId, "Cleared queue %s", new Object[]{this});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Long getMessageCount() {
        Object object = this._lock;
        synchronized (object) {
            return this._messages.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(String correlationId, MessageEnvelop message) {
        if (message == null) {
            return;
        }
        Object object = this._lock;
        synchronized (object) {
            message.setSentTime(ZonedDateTime.now(ZoneOffset.UTC));
            this._messages.add(message);
            this._lock.notify();
        }
        this._counters.incrementOne("queue." + this.getName() + ".sent_messages");
        this._logger.debug(correlationId, "Sent message %s via %s", new Object[]{message, this});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageEnvelop peek(String correlationId) {
        MessageEnvelop message = null;
        Object object = this._lock;
        synchronized (object) {
            if (this._messages.size() > 0) {
                message = this._messages.get(0);
            }
        }
        if (message != null) {
            this._logger.trace(correlationId, "Peeked message %s on %s", new Object[]{message, this});
        }
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<MessageEnvelop> peekBatch(String correlationId, int messageCount) {
        ArrayList<MessageEnvelop> messages = new ArrayList<MessageEnvelop>();
        Object object = this._lock;
        synchronized (object) {
            for (int index = 0; index < this._messages.size() && index < messageCount; ++index) {
                messages.add(this._messages.get(index));
            }
        }
        this._logger.trace(correlationId, "Peeked %d messages on %s", new Object[]{messages.size(), this});
        return messages;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageEnvelop receive(String correlationId, long waitTimeout) {
        MessageEnvelop message = null;
        Object object = this._lock;
        synchronized (object) {
            if (this._messages.size() > 0) {
                message = this._messages.get(0);
                this._messages.remove(0);
            }
            if (message == null) {
                try {
                    this._lock.wait(waitTimeout);
                }
                catch (InterruptedException ex) {
                    return null;
                }
            }
            if (message == null && this._messages.size() > 0) {
                message = this._messages.get(0);
                this._messages.remove(0);
            }
            if (message == null) {
                return null;
            }
            int lockedToken = this._lockTokenSequence++;
            message.setReference(lockedToken);
            LockedMessage lockedMessage = new LockedMessage();
            lockedMessage.lockExpiration = System.currentTimeMillis() + 30000L;
            this._lockedMessages.put(lockedToken, lockedMessage);
        }
        this._counters.incrementOne("queue." + this.getName() + ".received_messages");
        this._logger.debug(message.getCorrelationId(), "Received message %s via %s", new Object[]{message, this});
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void renewLock(MessageEnvelop message, long lockTimeout) {
        if (message == null || message.getReference() == null) {
            return;
        }
        Object object = this._lock;
        synchronized (object) {
            int lockedToken = (Integer)message.getReference();
            LockedMessage lockedMessage = this._lockedMessages.get(lockedToken);
            if (lockedMessage != null) {
                lockedMessage.lockExpiration = System.currentTimeMillis() + lockTimeout;
            }
        }
        this._logger.trace(message.getCorrelationId(), "Renewed lock for message %s at %s", new Object[]{message, this});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abandon(MessageEnvelop message) {
        if (message == null || message.getReference() == null) {
            return;
        }
        Object object = this._lock;
        synchronized (object) {
            int lockedToken = (Integer)message.getReference();
            LockedMessage lockedMessage = this._lockedMessages.get(lockedToken);
            if (lockedMessage != null) {
                this._lockedMessages.remove(lockedToken);
                message.setReference(null);
                if (lockedMessage.lockExpiration <= System.currentTimeMillis()) {
                    return;
                }
            } else {
                return;
            }
        }
        this._logger.trace(message.getCorrelationId(), "Abandoned message %s at %s", new Object[]{message, this});
        this.send(message.getCorrelationId(), message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void complete(MessageEnvelop message) {
        if (message == null || message.getReference() == null) {
            return;
        }
        Object object = this._lock;
        synchronized (object) {
            int lockKey = (Integer)message.getReference();
            this._lockedMessages.remove(lockKey);
            message.setReference(null);
        }
        this._logger.trace(message.getCorrelationId(), "Completed message %s at %s", new Object[]{message, this});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void moveToDeadLetter(MessageEnvelop message) {
        if (message == null || message.getReference() == null) {
            return;
        }
        Object object = this._lock;
        synchronized (object) {
            int lockKey = (Integer)message.getReference();
            this._lockedMessages.remove(lockKey);
            message.setReference(null);
        }
        this._counters.incrementOne("queue." + this.getName() + ".dead_messages");
        this._logger.trace(message.getCorrelationId(), "Moved to dead message %s at %s", new Object[]{message, this});
    }

    @Override
    public void listen(String correlationId, IMessageReceiver receiver) {
        if (this._listening) {
            this._logger.error(correlationId, "Already listening queue %s", new Object[]{this});
            return;
        }
        this._logger.trace(correlationId, "Started listening messages at %s", new Object[]{this});
        this._listening = true;
        while (this._listening) {
            MessageEnvelop message = this.receive(correlationId, 5000L);
            if (!this._listening || message == null) continue;
            try {
                receiver.receiveMessage(message, this);
            }
            catch (Exception ex) {
                this._logger.error(correlationId, ex, "Failed to process the message", new Object[0]);
            }
        }
        this._logger.trace(correlationId, "Stopped listening messages at %s", new Object[]{this});
    }

    @Override
    public void endListen(String correlationId) {
        this._listening = false;
    }

    @Override
    public String toString() {
        return "[" + this.getName() + "]";
    }

    private class LockedMessage {
        public long lockExpiration;

        private LockedMessage() {
        }
    }
}

