/*
 * Decompiled with CFR 0.152.
 */
package org.piangles.gateway.requests.processors;

import java.util.Optional;
import org.piangles.backbone.services.Locator;
import org.piangles.backbone.services.logging.LoggingService;
import org.piangles.core.stream.PassThruStreamProcessor;
import org.piangles.core.stream.Stream;
import org.piangles.core.stream.StreamProcessor;
import org.piangles.core.util.coding.JSON;
import org.piangles.gateway.client.ClientDetails;
import org.piangles.gateway.requests.ResponseSender;
import org.piangles.gateway.requests.dto.Request;
import org.piangles.gateway.requests.dto.Response;
import org.piangles.gateway.requests.dto.SimpleResponse;
import org.piangles.gateway.requests.dto.StatusCode;
import org.piangles.gateway.requests.processors.AbstractRequestProcessor;
import org.piangles.gateway.requests.processors.StreamAcquirer;

public final class DefaultStreamRequestProcessor<AppReq, SI, SO>
extends AbstractRequestProcessor<AppReq, SimpleResponse> {
    private LoggingService logger = Locator.getInstance().getLoggingService();
    private StreamAcquirer<AppReq, SI> srp = null;
    private StreamProcessor<SI, SO> processor = null;

    public DefaultStreamRequestProcessor(StreamAcquirer<AppReq, SI> srp) {
        this(srp, (StreamProcessor<SI, SO>)new PassThruStreamProcessor());
    }

    public DefaultStreamRequestProcessor(StreamAcquirer<AppReq, SI> srp, StreamProcessor<SI, SO> processor) {
        super(srp.getEndpoint(), srp.getRequestClass(), SimpleResponse.class);
        this.srp = srp;
        this.processor = processor;
    }

    @Override
    protected SimpleResponse processRequest(ClientDetails clientDetails, Request request, AppReq appRequest) throws Exception {
        Stream<SI> stream = this.srp.acquireStream(appRequest);
        stream.processAsync(payload -> {
            try {
                Optional output = this.processor.process(payload);
                String appResponseAsStr = null;
                if (output.isPresent()) {
                    appResponseAsStr = new String(JSON.getEncoder().encode(output.get()));
                } else {
                    this.logger.info((Object)("Reached EndOfStream for Request : " + request.getTraceId().toString()));
                }
                Response response = new Response(request.getTraceId(), request.getEndpoint(), request.getReceiptTime(), request.getTransitTime(), StatusCode.Success, appResponseAsStr);
                ResponseSender.sendResponse(clientDetails, response);
            }
            catch (Exception e) {
                this.logger.error((Object)("Error while processing stream data because of : " + e.getMessage()), (Throwable)e);
            }
            return null;
        });
        return new SimpleResponse(true);
    }
}

