/*
 * Decompiled with CFR 0.152.
 */
package org.somda.sdc.dpws.soap.wseventing;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.name.Named;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.somda.sdc.common.logging.InstanceLogger;
import org.somda.sdc.common.util.ExecutorWrapperService;
import org.somda.sdc.dpws.CommunicationLogContext;
import org.somda.sdc.dpws.TransportBinding;
import org.somda.sdc.dpws.factory.TransportBindingFactory;
import org.somda.sdc.dpws.guice.NetworkJobThreadPool;
import org.somda.sdc.dpws.http.HttpException;
import org.somda.sdc.dpws.http.HttpHandler;
import org.somda.sdc.dpws.http.HttpServerRegistry;
import org.somda.sdc.dpws.http.HttpUriBuilder;
import org.somda.sdc.dpws.soap.CommunicationContext;
import org.somda.sdc.dpws.soap.NotificationSink;
import org.somda.sdc.dpws.soap.RequestResponseClient;
import org.somda.sdc.dpws.soap.SoapMarshalling;
import org.somda.sdc.dpws.soap.SoapMessage;
import org.somda.sdc.dpws.soap.SoapUtil;
import org.somda.sdc.dpws.soap.exception.MalformedSoapMessageException;
import org.somda.sdc.dpws.soap.factory.RequestResponseClientFactory;
import org.somda.sdc.dpws.soap.wsaddressing.WsAddressingUtil;
import org.somda.sdc.dpws.soap.wsaddressing.model.EndpointReferenceType;
import org.somda.sdc.dpws.soap.wseventing.EventSink;
import org.somda.sdc.dpws.soap.wseventing.SinkSubscriptionManager;
import org.somda.sdc.dpws.soap.wseventing.SubscribeResult;
import org.somda.sdc.dpws.soap.wseventing.exception.SubscriptionNotFoundException;
import org.somda.sdc.dpws.soap.wseventing.exception.SubscriptionRequestResponseClientNotFoundException;
import org.somda.sdc.dpws.soap.wseventing.factory.SubscriptionManagerFactory;
import org.somda.sdc.dpws.soap.wseventing.model.DeliveryType;
import org.somda.sdc.dpws.soap.wseventing.model.FilterType;
import org.somda.sdc.dpws.soap.wseventing.model.GetStatus;
import org.somda.sdc.dpws.soap.wseventing.model.GetStatusResponse;
import org.somda.sdc.dpws.soap.wseventing.model.ObjectFactory;
import org.somda.sdc.dpws.soap.wseventing.model.Renew;
import org.somda.sdc.dpws.soap.wseventing.model.RenewResponse;
import org.somda.sdc.dpws.soap.wseventing.model.Subscribe;
import org.somda.sdc.dpws.soap.wseventing.model.SubscribeResponse;
import org.somda.sdc.dpws.soap.wseventing.model.Unsubscribe;

public class EventSinkImpl
implements EventSink {
    private static final Logger LOG = LogManager.getLogger(EventSinkImpl.class);
    private static final String EVENT_SINK_CONTEXT_PREFIX = "/EventSink/";
    private static final String EVENT_SINK_NOTIFY_TO_CONTEXT_PREFIX = "/EventSink/NotifyTo/";
    private static final String EVENT_SINK_END_TO_CONTEXT_PREFIX = "/EventSink/EndTo/";
    private final RequestResponseClient requestResponseClient;
    private final TransportBindingFactory transportBindingFactory;
    private final RequestResponseClientFactory requestResponseClientFactory;
    private final String hostAddress;
    private final HttpServerRegistry httpServerRegistry;
    private final ObjectFactory wseFactory;
    private final WsAddressingUtil wsaUtil;
    private final SoapMarshalling marshalling;
    private final SoapUtil soapUtil;
    private final ExecutorWrapperService<ListeningExecutorService> executorService;
    private final SubscriptionManagerFactory subscriptionManagerFactory;
    private final Map<String, SinkSubscriptionManager> subscriptionManagers;
    private final Map<String, RequestResponseClient> subscriptionClients;
    private final Lock subscriptionsLock;
    private final Duration maxWaitForFutures;
    @Nullable
    private final CommunicationLogContext communicationLogContext;
    private final Logger instanceLogger;
    private final HttpUriBuilder httpUriBuilder;

    @AssistedInject
    EventSinkImpl(@Assisted RequestResponseClient requestResponseClient, @Assisted String hostAddress, @Assisted @Nullable CommunicationLogContext communicationLogContext, @Named(value="Dpws.MaxWaitForFutures") Duration maxWaitForFutures, HttpServerRegistry httpServerRegistry, ObjectFactory wseFactory, WsAddressingUtil wsaUtil, SoapMarshalling marshalling, SoapUtil soapUtil, @NetworkJobThreadPool ExecutorWrapperService<ListeningExecutorService> executorService, SubscriptionManagerFactory subscriptionManagerFactory, @Named(value="Common.InstanceIdentifier") String frameworkIdentifier, TransportBindingFactory transportBindingFactory, RequestResponseClientFactory requestResponseClientFactory) {
        this.instanceLogger = InstanceLogger.wrapLogger((Logger)LOG, (String)frameworkIdentifier);
        this.requestResponseClient = requestResponseClient;
        this.transportBindingFactory = transportBindingFactory;
        this.requestResponseClientFactory = requestResponseClientFactory;
        this.hostAddress = hostAddress;
        this.communicationLogContext = communicationLogContext;
        this.maxWaitForFutures = maxWaitForFutures;
        this.httpServerRegistry = httpServerRegistry;
        this.wseFactory = wseFactory;
        this.wsaUtil = wsaUtil;
        this.marshalling = marshalling;
        this.soapUtil = soapUtil;
        this.executorService = executorService;
        this.subscriptionManagerFactory = subscriptionManagerFactory;
        this.subscriptionManagers = new ConcurrentHashMap<String, SinkSubscriptionManager>();
        this.subscriptionClients = new ConcurrentHashMap<String, RequestResponseClient>();
        this.subscriptionsLock = new ReentrantLock();
        this.httpUriBuilder = new HttpUriBuilder();
    }

    @Override
    public ListenableFuture<SubscribeResult> subscribe(String filterDialect, List<Object> filters, @Nullable Duration expires, final NotificationSink notificationSink) {
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            String contextSuffix = UUID.randomUUID().toString();
            String endToContext = EVENT_SINK_END_TO_CONTEXT_PREFIX + contextSuffix;
            String endToUri = this.httpServerRegistry.registerContext(this.hostAddress, true, endToContext, null, this.communicationLogContext, new HttpHandler(){

                @Override
                public void handle(InputStream inStream, OutputStream outStream, CommunicationContext communicationContext) throws HttpException {
                    EventSinkImpl.this.processIncomingNotification(notificationSink, inStream, outStream, communicationContext);
                }
            });
            String notifyToContext = EVENT_SINK_NOTIFY_TO_CONTEXT_PREFIX + contextSuffix;
            String notifyToUri = this.httpServerRegistry.registerContext(this.hostAddress, true, notifyToContext, null, this.communicationLogContext, new HttpHandler(){

                @Override
                public void handle(InputStream inStream, OutputStream outStream, CommunicationContext communicationContext) throws HttpException {
                    EventSinkImpl.this.processIncomingNotification(notificationSink, inStream, outStream, communicationContext);
                }
            });
            Subscribe subscribeBody = this.wseFactory.createSubscribe();
            DeliveryType deliveryType = this.wseFactory.createDeliveryType();
            deliveryType.setMode("http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push");
            EndpointReferenceType notifyToEpr = this.wsaUtil.createEprWithAddress(notifyToUri);
            deliveryType.setContent(Collections.singletonList(this.wseFactory.createNotifyTo(notifyToEpr)));
            subscribeBody.setDelivery(deliveryType);
            EndpointReferenceType endToEpr = this.wsaUtil.createEprWithAddress(endToUri);
            subscribeBody.setEndTo(endToEpr);
            FilterType filterType = this.wseFactory.createFilterType();
            filterType.setDialect(filterDialect);
            filterType.setContent(filters);
            subscribeBody.setExpires(expires);
            subscribeBody.setFilter(filterType);
            SoapMessage subscribeRequest = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe", subscribeBody);
            SoapMessage soapResponse = this.requestResponseClient.sendRequestResponse(subscribeRequest);
            SubscribeResponse responseBody = this.soapUtil.getBody(soapResponse, SubscribeResponse.class).orElseThrow(() -> new MalformedSoapMessageException("Cannot read WS-Eventing Subscribe response"));
            SinkSubscriptionManager sinkSubMan = this.subscriptionManagerFactory.createSinkSubscriptionManager(responseBody.getSubscriptionManager(), responseBody.getExpires(), notifyToEpr, endToEpr, filters, filterDialect);
            TransportBinding tBinding = this.transportBindingFactory.createTransportBinding(responseBody.getSubscriptionManager().getAddress().getValue(), null);
            RequestResponseClient rrClient = this.requestResponseClientFactory.createRequestResponseClient(tBinding);
            this.subscriptionsLock.lock();
            try {
                this.subscriptionManagers.put(sinkSubMan.getSubscriptionId(), sinkSubMan);
                this.subscriptionClients.put(sinkSubMan.getSubscriptionId(), rrClient);
            }
            finally {
                this.subscriptionsLock.unlock();
            }
            return new SubscribeResult(sinkSubMan.getSubscriptionId(), sinkSubMan.getExpires());
        });
    }

    @Override
    public ListenableFuture<Duration> renew(String subscriptionId, Duration expires) {
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            SinkSubscriptionManager subMan = this.getSubscriptionManagerProxy(subscriptionId);
            RequestResponseClient subscriptionRequestResponseClient = this.getSubscriptionRequestResponseClient(subscriptionId);
            Renew renew = this.wseFactory.createRenew();
            renew.setExpires(expires);
            String subManAddress = this.wsaUtil.getAddressUri(subMan.getSubscriptionManagerEpr()).orElseThrow(() -> new RuntimeException("No subscription manager EPR found"));
            SoapMessage renewMsg = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew", subManAddress, renew, subMan.getSubscriptionManagerEpr().getReferenceParameters());
            SoapMessage renewResMsg = subscriptionRequestResponseClient.sendRequestResponse(renewMsg);
            RenewResponse renewResponse = this.soapUtil.getBody(renewResMsg, RenewResponse.class).orElseThrow(() -> new MalformedSoapMessageException("WS-Eventing RenewResponse message is malformed"));
            Duration newExpires = renewResponse.getExpires();
            subMan.renew(newExpires);
            return newExpires;
        });
    }

    @Override
    public ListenableFuture<Duration> getStatus(String subscriptionId) {
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            SinkSubscriptionManager subMan = this.getSubscriptionManagerProxy(subscriptionId);
            RequestResponseClient subscriptionRequestResponseClient = this.getSubscriptionRequestResponseClient(subscriptionId);
            GetStatus getStatus = this.wseFactory.createGetStatus();
            String subManAddress = this.wsaUtil.getAddressUri(subMan.getSubscriptionManagerEpr()).orElseThrow(() -> new RuntimeException("No subscription manager EPR found"));
            SoapMessage getStatusMsg = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus", subManAddress, getStatus, subMan.getSubscriptionManagerEpr().getReferenceParameters());
            SoapMessage getStatusResMsg = subscriptionRequestResponseClient.sendRequestResponse(getStatusMsg);
            GetStatusResponse getStatusResponse = this.soapUtil.getBody(getStatusResMsg, GetStatusResponse.class).orElseThrow(() -> new MalformedSoapMessageException("WS-Eventing GetStatusResponse message is malformed"));
            return getStatusResponse.getExpires();
        });
    }

    @Override
    public ListenableFuture<?> unsubscribe(String subscriptionId) {
        SinkSubscriptionManager subMan = this.getSubscriptionManagerProxy(subscriptionId);
        RequestResponseClient subscriptionRequestResponseClient = this.getSubscriptionRequestResponseClient(subscriptionId);
        this.removeSubscriptionManager(subscriptionId);
        this.removeSubscriptionRequestResponseClient(subscriptionId);
        Optional<URI> endToUri = subMan.getEndTo().map(it -> it.getAddress().getValue()).map(URI::create);
        URI notifyToUri = URI.create(subMan.getNotifyTo().getAddress().getValue());
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            Unsubscribe unsubscribe = this.wseFactory.createUnsubscribe();
            String subManAddress = this.wsaUtil.getAddressUri(subMan.getSubscriptionManagerEpr()).orElseThrow(() -> new RuntimeException("No subscription manager EPR found"));
            SoapMessage unsubscribeMsg = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe", subManAddress, unsubscribe, subMan.getSubscriptionManagerEpr().getReferenceParameters());
            try {
                subscriptionRequestResponseClient.sendRequestResponse(unsubscribeMsg);
            }
            finally {
                endToUri.ifPresent(this::unregisterUri);
                this.unregisterUri(notifyToUri);
            }
            return new Object();
        });
    }

    @Override
    public void unsubscribeAll() {
        for (SinkSubscriptionManager subscriptionManager : new ArrayList<SinkSubscriptionManager>(this.subscriptionManagers.values())) {
            ListenableFuture<?> future = this.unsubscribe(subscriptionManager.getSubscriptionId());
            try {
                future.get(this.maxWaitForFutures.toSeconds(), TimeUnit.SECONDS);
            }
            catch (InterruptedException | ExecutionException e) {
                this.instanceLogger.warn("Subscription {} could not be unsubscribed. Ignore.", (Object)subscriptionManager.getSubscriptionId());
                this.instanceLogger.trace("Subscription {} could not be unsubscribed", (Object)subscriptionManager.getSubscriptionId(), (Object)e);
            }
            catch (TimeoutException e) {
                this.instanceLogger.warn("Subscription {} could not be unsubscribed, timeout after {}s. Ignore.", (Object)subscriptionManager.getSubscriptionId(), (Object)this.maxWaitForFutures.toSeconds());
                this.instanceLogger.trace("Subscription {} could not be unsubscribed, timeout after {}s", (Object)subscriptionManager.getSubscriptionId(), (Object)this.maxWaitForFutures.toSeconds(), (Object)e);
                future.cancel(true);
            }
        }
    }

    private void unregisterUri(URI fullUri) {
        String uriWithoutPath = this.httpUriBuilder.buildUri(fullUri.getScheme(), fullUri.getHost(), fullUri.getPort());
        this.httpServerRegistry.unregisterContext(uriWithoutPath, fullUri.getPath());
    }

    private void processIncomingNotification(NotificationSink notificationSink, InputStream inputStream, OutputStream outputStream, CommunicationContext communicationContext) throws HttpException {
        try {
            SoapMessage soapMsg = this.soapUtil.createMessage(this.marshalling.unmarshal(inputStream));
            inputStream.close();
            this.instanceLogger.debug("Received incoming notification {}", (Object)soapMsg);
            notificationSink.receiveNotification(soapMsg, communicationContext);
            outputStream.close();
        }
        catch (Exception e) {
            throw new HttpException(500, e.getMessage());
        }
    }

    private SinkSubscriptionManager getSubscriptionManagerProxy(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            SinkSubscriptionManager sinkSubscriptionManager = Optional.ofNullable(this.subscriptionManagers.get(subscriptionId)).orElseThrow(SubscriptionNotFoundException::new);
            return sinkSubscriptionManager;
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }

    private void removeSubscriptionManager(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            this.subscriptionManagers.remove(subscriptionId);
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }

    private RequestResponseClient getSubscriptionRequestResponseClient(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            RequestResponseClient requestResponseClient = Optional.ofNullable(this.subscriptionClients.get(subscriptionId)).orElseThrow(SubscriptionRequestResponseClientNotFoundException::new);
            return requestResponseClient;
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }

    private void removeSubscriptionRequestResponseClient(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            this.subscriptionClients.remove(subscriptionId);
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }
}

