/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.jms.IncomingMessageMetaData;
import ch.squaredesk.nova.comm.jms.MessageReceiver;
import ch.squaredesk.nova.comm.jms.MessageSender;
import ch.squaredesk.nova.comm.jms.OutgoingMessageMetaData;
import ch.squaredesk.nova.comm.jms.RetrieveInfo;
import ch.squaredesk.nova.comm.jms.RpcInvocation;
import ch.squaredesk.nova.comm.jms.SendInfo;
import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Flowable;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.jms.Destination;

public class RpcServer<InternalMessageType>
extends ch.squaredesk.nova.comm.rpc.RpcServer<Destination, RpcInvocation<InternalMessageType>> {
    private final MessageSender<InternalMessageType> messageSender;
    private final MessageReceiver<InternalMessageType> messageReceiver;
    private final Function<Throwable, InternalMessageType> errorReplyFactory;

    RpcServer(String identifier, MessageReceiver<InternalMessageType> messageReceiver, MessageSender<InternalMessageType> messageSender, Function<Throwable, InternalMessageType> errorReplyFactory, Metrics metrics) {
        super(identifier, metrics);
        Objects.requireNonNull(messageSender, "messageSender must not be null");
        Objects.requireNonNull(messageReceiver, "messageReceiver must not be null");
        Objects.requireNonNull(errorReplyFactory, "errorReplyFactory must not be null");
        this.messageSender = messageSender;
        this.messageReceiver = messageReceiver;
        this.errorReplyFactory = errorReplyFactory;
    }

    public Flowable<RpcInvocation<InternalMessageType>> requests(Destination destination) {
        return this.messageReceiver.messages(destination).filter(this::isRpcRequest).map(incomingMessage -> {
            this.metricsCollector.requestReceived(incomingMessage.message);
            Consumer replyConsumer = this.createReplyHandlerFor((IncomingMessage)incomingMessage);
            Consumer<Throwable> errorConsumer = this.createErrorReplyHandlerFor((IncomingMessage<InternalMessageType, IncomingMessageMetaData>)incomingMessage);
            return new RpcInvocation(incomingMessage, reply -> {
                replyConsumer.accept(reply._1);
                this.metricsCollector.requestCompleted(incomingMessage.message, reply);
            }, error -> {
                this.metricsCollector.requestCompletedExceptionally(incomingMessage.message, error);
                errorConsumer.accept((Throwable)error);
            });
        });
    }

    private boolean isRpcRequest(IncomingMessage<InternalMessageType, IncomingMessageMetaData> incomingMessage) {
        return incomingMessage.metaData != null && ((IncomingMessageMetaData)incomingMessage.metaData).details != null && ((RetrieveInfo)((IncomingMessageMetaData)incomingMessage.metaData).details).replyDestination != null && ((RetrieveInfo)((IncomingMessageMetaData)incomingMessage.metaData).details).correlationId != null;
    }

    private <RequestType extends InternalMessageType, ReplyType extends InternalMessageType> Consumer<ReplyType> createReplyHandlerFor(IncomingMessage<RequestType, IncomingMessageMetaData> request) {
        SendInfo sendingInfo = new SendInfo(((RetrieveInfo)((IncomingMessageMetaData)request.metaData).details).correlationId, null, null, null, null, null);
        OutgoingMessageMetaData meta = new OutgoingMessageMetaData(((RetrieveInfo)((IncomingMessageMetaData)request.metaData).details).replyDestination, sendingInfo);
        return reply -> this.messageSender.doSend(reply, meta).subscribe();
    }

    private Consumer<Throwable> createErrorReplyHandlerFor(IncomingMessage<InternalMessageType, IncomingMessageMetaData> request) {
        SendInfo sendingInfo = new SendInfo(((RetrieveInfo)((IncomingMessageMetaData)request.metaData).details).correlationId, null, null, null, null, null);
        OutgoingMessageMetaData meta = new OutgoingMessageMetaData(((RetrieveInfo)((IncomingMessageMetaData)request.metaData).details).replyDestination, sendingInfo);
        return error -> this.messageSender.doSend(this.errorReplyFactory.apply((Throwable)error), meta).subscribe();
    }
}

