/*
 * Decompiled with CFR 0.152.
 */
package org.vatplanner.commons.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vatplanner.commons.amqp.AmqpSubscriptionCreator;
import org.vatplanner.commons.amqp.ExchangeParameters;
import org.vatplanner.commons.amqp.Message;
import org.vatplanner.commons.amqp.MessageCodec;
import org.vatplanner.commons.amqp.MessageSupplements;
import org.vatplanner.commons.crypto.CryptoFailure;
import org.vatplanner.commons.crypto.Cryptor;

public class MessageSubscriptionCreator
extends AmqpSubscriptionCreator {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageSubscriptionCreator.class);
    private final Set<Class<? extends Message>> ignoredMessageClasses = new HashSet<Class<? extends Message>>();
    private final Map<Class<? extends Message>, MessageHandler<?>> messageHandlersByClass = new HashMap();
    private final MessageCodec messageCodec = new MessageCodec();
    private Cryptor cryptor;
    private Duration messageTimeout = Duration.ofMinutes(30L);
    private AtomicReference<Instant> minimumMessageTimestampRef;
    private Instant maximumMessageTimestamp;
    private AmqpSubscriptionCreator.ReceiptAction ignoreReceiptAction = AmqpSubscriptionCreator.ReceiptAction.ACKNOWLEDGE;
    private static final Duration CLOCK_DRIFT_ALLOWANCE = Duration.ofSeconds(5L);
    private static final String SIGNATURE_HEADER = "pgpSignature";
    private static final String MIME_PGP_ENCRYPTED = "application/pgp-encrypted";
    private static final String MESSAGE_HEADER_UNENCRYPTED = "jsonMessage";
    private static final String MESSAGE_HEADER_PGP_ENCRYPTED = "pgpJsonMessage";

    private MessageSubscriptionCreator(Channel channel) {
        super(channel);
        this.onMessage(this::handleDelivery);
    }

    public static MessageSubscriptionCreator usingChannel(Channel channel) {
        return new MessageSubscriptionCreator(channel);
    }

    public MessageSubscriptionCreator withCryptor(Cryptor cryptor) {
        this.cryptor = cryptor;
        return this;
    }

    public MessageSubscriptionCreator notOlderThan(Duration messageTimeout) {
        this.messageTimeout = messageTimeout;
        return this;
    }

    public MessageSubscriptionCreator notSentBefore(Instant minimumMessageTimestamp) {
        this.minimumMessageTimestampRef = new AtomicReference<Instant>(minimumMessageTimestamp);
        return this;
    }

    public MessageSubscriptionCreator notSentBefore(AtomicReference<Instant> minimumMessageTimestamp) {
        this.minimumMessageTimestampRef = minimumMessageTimestamp;
        return this;
    }

    public MessageSubscriptionCreator notSentAfter(Instant maximumMessageTimestamp) {
        this.maximumMessageTimestamp = maximumMessageTimestamp;
        return this;
    }

    public <M extends Message> MessageSubscriptionCreator onMessage(Class<M> messageClass, MessageHandler<M> messageHandler) {
        if (this.messageHandlersByClass.put(messageClass, messageHandler) != null) {
            LOGGER.warn("{}Message handler for {} had been set before and is now overridden", (Object)this.logPrefix, messageClass);
        }
        if (this.ignoredMessageClasses.contains(messageClass)) {
            throw new IllegalArgumentException(this.logPrefix + "Message class is configured to be ignored: " + messageClass.getCanonicalName());
        }
        return this;
    }

    @SafeVarargs
    public final MessageSubscriptionCreator ignoringMessages(Class<? extends Message> ... messageClasses) {
        return this.ignoringMessages(Arrays.asList(messageClasses));
    }

    public MessageSubscriptionCreator ignoringMessages(Collection<Class<? extends Message>> messageClasses) {
        for (Class<? extends Message> messageClass : messageClasses) {
            if (this.messageHandlersByClass.containsKey(messageClass)) {
                throw new IllegalArgumentException(this.logPrefix + "Message class is configured to be handled: " + messageClass);
            }
            this.ignoredMessageClasses.add(messageClass);
        }
        return this;
    }

    public MessageSubscriptionCreator onIgnore(AmqpSubscriptionCreator.ReceiptAction receiptAction) {
        this.ignoreReceiptAction = receiptAction;
        return this;
    }

    private <M extends Message> AmqpSubscriptionCreator.ReceiptAction handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Object signatureHeader;
            Object encryptedMessageHeader;
            Instant minimumMessageTimestamp = null;
            if (this.minimumMessageTimestampRef != null && (minimumMessageTimestamp = this.minimumMessageTimestampRef.get()) == null) {
                LOGGER.warn("{}Received message before minimum allowed timestamp was available; ignoring: {}", (Object)this.logPrefix, (Object)body);
                return this.ignoreReceiptAction;
            }
            String bodyContentType = properties.getContentType();
            LOGGER.trace("{}Message body is indicated to have content type {}", (Object)this.logPrefix, (Object)bodyContentType);
            byte[] signedData = body;
            boolean isEncryptedBody = MIME_PGP_ENCRYPTED.equals(bodyContentType);
            if (isEncryptedBody) {
                if (body.length == 0) {
                    LOGGER.debug("{}Message body has zero length, nothing to decrypt.", (Object)this.logPrefix);
                } else {
                    if (this.cryptor == null) {
                        LOGGER.warn("{}Received an encrypted message body although no Cryptor instance was provided, unable to decrypt!", (Object)this.logPrefix);
                        return this.errorReceiptAction;
                    }
                    try {
                        LOGGER.debug("{}Decrypting message body ({} bytes)", (Object)this.logPrefix, (Object)body.length);
                        body = this.cryptor.decrypt(body);
                    }
                    catch (Exception ex) {
                        LOGGER.warn("{}Failed to decrypt message body", (Object)this.logPrefix, (Object)ex);
                        return this.errorReceiptAction;
                    }
                }
            }
            Map headers = properties.getHeaders();
            byte[] serializedMessage = body;
            Object unencryptedMessageHeader = headers != null ? (Object)headers.get(MESSAGE_HEADER_UNENCRYPTED) : null;
            Object object = encryptedMessageHeader = headers != null ? (Object)headers.get(MESSAGE_HEADER_PGP_ENCRYPTED) : null;
            if (unencryptedMessageHeader != null && encryptedMessageHeader != null) {
                LOGGER.warn("{}AMQP message contains an encrypted and an unencrypted message in properties header, unable to decide on message: {}", (Object)this.logPrefix, (Object)headers);
                return this.errorReceiptAction;
            }
            if (unencryptedMessageHeader != null) {
                LOGGER.debug("{}AMQP message contains an unencrypted message header; overriding deserialization input", (Object)this.logPrefix);
                signedData = serializedMessage = unencryptedMessageHeader.toString().getBytes(StandardCharsets.UTF_8);
            } else if (encryptedMessageHeader != null) {
                if (this.cryptor == null) {
                    LOGGER.warn("{}Received an encrypted message although no Cryptor instance was provided, unable to decrypt!", (Object)this.logPrefix);
                    return this.errorReceiptAction;
                }
                LOGGER.debug("{}AMQP message contains an encrypted message header; overriding deserialization input", (Object)this.logPrefix);
                byte[] encryptedMessage = encryptedMessageHeader.toString().getBytes(StandardCharsets.UTF_8);
                signedData = encryptedMessage;
                try {
                    serializedMessage = this.cryptor.decrypt(encryptedMessage);
                }
                catch (Exception ex) {
                    LOGGER.warn("{}Failed to decrypt message from header", (Object)this.logPrefix, (Object)ex);
                    return this.errorReceiptAction;
                }
            }
            Message message = this.messageCodec.deserialize(properties, serializedMessage).orElse(null);
            if (message == null) {
                LOGGER.warn("{}Failed to deserialize message: {}", (Object)this.logPrefix, (Object)serializedMessage);
                return this.errorReceiptAction;
            }
            if (this.ignoredMessageClasses.contains(message.getClass())) {
                LOGGER.trace("{}Ignoring message: {}", (Object)this.logPrefix, (Object)message);
                return this.ignoreReceiptAction;
            }
            Class<?> messageClass = message.getClass();
            MessageHandler<?> messageHandler = this.messageHandlersByClass.get(messageClass);
            if (messageHandler == null) {
                LOGGER.warn("{}Unhandled message: {}", (Object)this.logPrefix, (Object)message);
                return this.errorReceiptAction;
            }
            Set verifiedKeyIds = Collections.emptySet();
            Object object2 = signatureHeader = headers != null ? (Object)headers.get(SIGNATURE_HEADER) : null;
            if (signatureHeader != null) {
                try {
                    verifiedKeyIds = this.cryptor.verify(signedData, Cryptor.Signature.asciiArmored((String)signatureHeader.toString()));
                }
                catch (CryptoFailure ex) {
                    LOGGER.warn("{}Message signature could not be verified", (Object)this.logPrefix, (Object)ex);
                }
            }
            Instant messageTimestamp = message.getTimestamp();
            if (minimumMessageTimestamp != null && messageTimestamp.isBefore(minimumMessageTimestamp.minus(CLOCK_DRIFT_ALLOWANCE))) {
                LOGGER.warn("{}Received message with timestamp {} but expected minimum timestamp is {}; ignoring: {}, AMQP properties: {}", new Object[]{this.logPrefix, messageTimestamp, minimumMessageTimestamp, message, properties});
                return this.ignoreReceiptAction;
            }
            if (this.maximumMessageTimestamp != null && messageTimestamp.isAfter(this.maximumMessageTimestamp.plus(CLOCK_DRIFT_ALLOWANCE))) {
                LOGGER.warn("{}Received message with timestamp {} but expected maximum timestamp is {}; ignoring: {}, AMQP properties: {}", new Object[]{this.logPrefix, messageTimestamp, this.maximumMessageTimestamp, message, properties});
                return this.ignoreReceiptAction;
            }
            Duration messageAge = Duration.between(messageTimestamp, Instant.now());
            if (messageAge.isNegative()) {
                LOGGER.warn("{}Real-time clocks are out of sync: Received message {} with timestamp {} before current local clock (diff {}).", new Object[]{this.logPrefix, message, messageTimestamp, messageAge});
            } else if (messageAge.compareTo(this.messageTimeout.plus(CLOCK_DRIFT_ALLOWANCE)) > 0) {
                LOGGER.warn("{}Ignoring outdated message {} from network (age {} > timeout {})", new Object[]{this.logPrefix, message, messageAge, this.messageTimeout});
                return this.ignoreReceiptAction;
            }
            MessageSupplements.Builder supplementsBuilder = MessageSupplements.builder().setAmqpEnvelope(envelope).setAmqpProperties(properties).setVerifiedKeyIds(verifiedKeyIds);
            supplementsBuilder.onAcknowledge(() -> {
                try {
                    LOGGER.trace("{}Acknowledging message via MessageSupplements callback: {}", (Object)this.logPrefix, (Object)message);
                    this.channel.basicAck(envelope.getDeliveryTag(), false);
                }
                catch (IOException ex) {
                    LOGGER.warn("{}Failed to acknowledge message from MessageSupplements callback: {}", new Object[]{this.logPrefix, message, ex});
                    return false;
                }
                return true;
            });
            if (body != serializedMessage) {
                LOGGER.trace("{}Message contains extra payload ({} bytes)", (Object)body.length);
                supplementsBuilder.setPayload(body);
            }
            LOGGER.trace("{}Dispatch to message handler: {}", (Object)this.logPrefix, (Object)message);
            return messageHandler.handleMessage(message, supplementsBuilder.build());
        }
        catch (Exception ex) {
            LOGGER.warn("{}Failed to process message", (Object)this.logPrefix, (Object)ex);
            return this.errorReceiptAction;
        }
    }

    @Override
    public MessageSubscriptionCreator forExistingQueue(String queueName) {
        super.forExistingQueue(queueName);
        return this;
    }

    @Override
    public MessageSubscriptionCreator onError(AmqpSubscriptionCreator.ReceiptAction receiptAction) {
        super.onError(receiptAction);
        return this;
    }

    @Override
    public MessageSubscriptionCreator withAutoAck(boolean autoAck) {
        super.withAutoAck(autoAck);
        return this;
    }

    @Override
    public MessageSubscriptionCreator withExchangeParameters(ExchangeParameters exchangeParameters) {
        super.withExchangeParameters(exchangeParameters);
        return this;
    }

    @Override
    public MessageSubscriptionCreator withLogPrefix(String logPrefix) {
        super.withLogPrefix(logPrefix);
        return this;
    }

    @Override
    public MessageSubscriptionCreator withQueueRoutingKeys(String ... queueRoutingKeys) {
        super.withQueueRoutingKeys(queueRoutingKeys);
        return this;
    }

    @Override
    public MessageSubscriptionCreator withQueueRoutingKeys(Collection<String> queueRoutingKeys) {
        super.withQueueRoutingKeys(queueRoutingKeys);
        return this;
    }

    @FunctionalInterface
    public static interface MessageHandler<M extends Message> {
        public AmqpSubscriptionCreator.ReceiptAction handleMessage(M var1, MessageSupplements var2);
    }
}

