/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.jms.JmsMessageReceiver;
import ch.squaredesk.nova.comm.jms.JmsMessageSender;
import ch.squaredesk.nova.comm.jms.JmsSpecificInfo;
import ch.squaredesk.nova.comm.rpc.RpcClient;
import ch.squaredesk.nova.comm.sending.MessageSendingInfo;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;

public class JmsRpcClient<InternalMessageType>
extends RpcClient<Destination, InternalMessageType, JmsSpecificInfo> {
    private final JmsMessageSender<InternalMessageType> messageSender;
    private final JmsMessageReceiver<InternalMessageType> messageReceiver;

    public JmsRpcClient(String identifier, JmsMessageReceiver<InternalMessageType> messageReceiver, JmsMessageSender<InternalMessageType> messageSender, Metrics metrics) {
        super(identifier, metrics);
        this.messageSender = messageSender;
        this.messageReceiver = messageReceiver;
    }

    public <RequestType extends InternalMessageType, ReplyType extends InternalMessageType> Single<ReplyType> sendRequest(RequestType request, MessageSendingInfo<Destination, JmsSpecificInfo> messageSendingInfo, long timeout, TimeUnit timeUnit) {
        Objects.requireNonNull(timeUnit, "timeUnit must not be null");
        Single replySingle = this.messageReceiver.messages(((JmsSpecificInfo)messageSendingInfo.transportSpecificInfo).replyDestination).filter(incomingMessage -> incomingMessage.details.transportSpecificDetails != null && ((JmsSpecificInfo)messageSendingInfo.transportSpecificInfo).correlationId.equals(((JmsSpecificInfo)incomingMessage.details.transportSpecificDetails).correlationId)).take(1L).map(incomingMessage -> incomingMessage.message).doOnNext(reply -> this.metricsCollector.rpcCompleted(request, reply)).single(new Object());
        Throwable sendError = this.messageSender.sendMessage(messageSendingInfo.destination, request, messageSendingInfo.transportSpecificInfo).blockingGet();
        if (sendError != null) {
            return Single.error((Throwable)sendError);
        }
        Single timeoutSingle = Single.create(s -> this.metricsCollector.rpcTimedOut(request)).timeout(timeout, timeUnit);
        return replySingle.ambWith((SingleSource)timeoutSingle);
    }
}

