/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.integration.platform.engine.service.debugger.sessions;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.qubership.integration.platform.engine.model.Session;
import org.qubership.integration.platform.engine.model.opensearch.QueueElement;
import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic;
import org.qubership.integration.platform.engine.opensearch.OpenSearchClientSupplier;
import org.qubership.integration.platform.engine.service.ExecutionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
public class OpenSearchWriter
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(OpenSearchWriter.class);
    private final int queueMaxSizeBytes;
    private final int bulkRequestMaxSizeBytes;
    private final int bulkRequestPayloadSizeThresholdBytes;
    private final int bulkRequestElementsCountThreshold;
    private final OpenSearchClientSupplier openSearchClientSupplier;
    private final ObjectMapper mapper;
    private final BlockingQueue<QueueElement> sessionElementsQueue;
    private final AtomicLong queueTotalPayloadSize = new AtomicLong(0L);
    private final ConcurrentMap<String, Pair<ReadWriteLock, Session>> sessionsCache = new ConcurrentHashMap();
    private final ConcurrentMap<String, ConcurrentMap<String, SessionElementElastic>> sessionElementsCache = new ConcurrentHashMap();
    private final ConcurrentMap<String, SessionElementElastic> singleElementCache = new ConcurrentHashMap();
    private long currentWriteTimeout = 0L;
    @Value(value="${qip.opensearch.write.batch.count}")
    private int queueDrainThreshold;
    @Value(value="${qip.opensearch.write.retry.timeout.minimum}")
    private long writeTimeoutDefaultMin;
    @Value(value="${qip.opensearch.write.retry.timeout.maximum}")
    private long writeTimeoutDefaultMax;
    @Value(value="${qip.opensearch.index.elements.name}-session-elements")
    private String indexName;
    private static final int EXCEPTION_COOLDOWN_DELAY = 10000;
    private static final int WRITE_TIMEOUT_MULTIPLIER = 2;
    private static final int ERROR_MESSAGE_COUNT_THRESHOLD = 3;
    private static final int RETRY_COUNT_ON_WRITE_ERROR = 5;
    private static final double REPEATED_ELEMENTS_RATIO = 2.2;

    @Autowired
    public OpenSearchWriter(@Value(value="${qip.sessions.queue.capacity}") int sessionBufferCapacity, @Value(value="${qip.sessions.queue.max-size-mb}") int queueMaxSizeMb, @Value(value="${qip.sessions.bulk-request.max-size-kb}") int bulkRequestMaxSizeKb, @Value(value="${qip.sessions.bulk-request.payload-size-threshold-kb}") int bulkRequestPayloadSizeThresholdKb, @Value(value="${qip.sessions.bulk-request.elements-count-threshold}") int bulkRequestElementsCountThreshold, OpenSearchClientSupplier openSearchClientSupplier, @Qualifier(value="jsonMapper") ObjectMapper mapper) {
        this.sessionElementsQueue = new LinkedBlockingQueue(sessionBufferCapacity);
        this.queueMaxSizeBytes = (int)((double)(queueMaxSizeMb * 1024 * 1024) * 2.2);
        this.bulkRequestMaxSizeBytes = bulkRequestMaxSizeKb * 1024;
        this.bulkRequestPayloadSizeThresholdBytes = bulkRequestPayloadSizeThresholdKb * 1024;
        this.bulkRequestElementsCountThreshold = bulkRequestElementsCountThreshold;
        this.openSearchClientSupplier = openSearchClientSupplier;
        this.mapper = mapper;
        new Thread((Runnable)this).start();
    }

    @Override
    public void run() {
        ArrayList<QueueElement> elementsToSave = new ArrayList<QueueElement>(this.queueDrainThreshold);
        this.resetWriteTimeout();
        while (true) {
            try {
                while (true) {
                    try {
                        elementsToSave.add((QueueElement)this.sessionElementsQueue.take());
                    }
                    catch (InterruptedException ignored) {
                        continue;
                    }
                    this.sessionElementsQueue.drainTo(elementsToSave, this.queueDrainThreshold - 1);
                    elementsToSave.forEach(element -> this.queueTotalPayloadSize.addAndGet(-element.getCalculatedPayloadSize()));
                    LinkedHashSet filteredElements = new LinkedHashSet(elementsToSave);
                    if (!CollectionUtils.isEmpty(filteredElements)) {
                        this.saveElements(filteredElements);
                    }
                    elementsToSave.clear();
                }
            }
            catch (Exception e) {
                log.error("Failed to commit sessions to opensearch", (Throwable)e);
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    private void saveElements(LinkedHashSet<QueueElement> sessionElements) {
        int currentRetry = 0;
        int bulkRequestSize = 0;
        boolean needToExecuteBulk = false;
        ArrayList<BulkOperation> updateRequests = new ArrayList<BulkOperation>();
        Iterator iterator = sessionElements.iterator();
        while (iterator.hasNext()) {
            byte[] payload;
            SessionElementElastic element = ((QueueElement)iterator.next()).getElement();
            try {
                payload = this.mapper.writeValueAsBytes((Object)element);
            }
            catch (JsonProcessingException e) {
                log.error("Failed to parse sessions write request. Element skipped");
                this.resetWriteTimeout();
                continue;
            }
            int payloadSize = payload.length;
            BulkOperation request = (BulkOperation)new BulkOperation.Builder().index(IndexOperation.of(io -> ((IndexOperation.Builder)((IndexOperation.Builder)((IndexOperation.Builder)io.index(this.openSearchClientSupplier.normalize(this.indexName))).id(element.getId())).requireAlias(Boolean.valueOf(true))).document((Object)element))).build();
            do {
                block13: {
                    try {
                        if (payloadSize >= this.bulkRequestPayloadSizeThresholdBytes || sessionElements.size() <= this.bulkRequestElementsCountThreshold) {
                            this.waitBeforeRequest();
                            this.executeBulk(new ArrayList<BulkOperation>(List.of(request)));
                        } else if (currentRetry == 0) {
                            updateRequests.add(request);
                            bulkRequestSize += payloadSize;
                        }
                        boolean bl = needToExecuteBulk = bulkRequestSize >= this.bulkRequestMaxSizeBytes || !iterator.hasNext() && !updateRequests.isEmpty();
                        if (needToExecuteBulk) {
                            this.waitBeforeRequest();
                            if (this.executeBulk(updateRequests)) {
                                throw new RuntimeException();
                            }
                            bulkRequestSize = 0;
                            needToExecuteBulk = false;
                        }
                    }
                    catch (Exception e) {
                        log.error("While sessions writing an error has occurred", (Throwable)e);
                        this.increaseWriteTimeout();
                        if (currentRetry < 5) {
                            ++currentRetry;
                            continue;
                        }
                        if (!needToExecuteBulk) break block13;
                        bulkRequestSize = 0;
                        updateRequests.clear();
                        needToExecuteBulk = false;
                    }
                }
                if (currentRetry < 5) {
                    this.resetWriteTimeout();
                }
                currentRetry = 0;
            } while (currentRetry > 0);
        }
    }

    private boolean executeBulk(List<BulkOperation> updateRequests) throws IOException {
        BulkRequest bulkRequest = new BulkRequest.Builder().index(this.openSearchClientSupplier.normalize(this.indexName)).requireAlias(Boolean.valueOf(true)).operations(updateRequests).build();
        BulkResponse bulk = this.openSearchClientSupplier.getClient().bulk(bulkRequest);
        updateRequests.clear();
        return this.checkAndLogFailedElements(bulk);
    }

    private boolean checkAndLogFailedElements(BulkResponse response) {
        int errCount = 0;
        String separator = System.lineSeparator();
        StringBuilder errorMessages = new StringBuilder(separator);
        for (BulkResponseItem bulkItemResponse : response.items()) {
            if (bulkItemResponse.error() == null) continue;
            if (errCount < 3) {
                errorMessages.append(bulkItemResponse.error().reason());
                errorMessages.append(separator);
            }
            ++errCount;
        }
        if (errCount > 0) {
            errorMessages.insert(0, "Some sessions elements can't be saved to opensearch:");
            if (errCount > 3) {
                errorMessages.append("...and {} more");
                log.error(errorMessages.toString(), (Object)(errCount - 3));
            } else {
                log.error(errorMessages.toString());
            }
        }
        return errCount > 0;
    }

    private void resetWriteTimeout() {
        this.currentWriteTimeout = this.writeTimeoutDefaultMin;
        log.trace("OpenSearch write timeout has been reset to {}", (Object)this.currentWriteTimeout);
    }

    private void increaseWriteTimeout() {
        if (this.currentWriteTimeout == this.writeTimeoutDefaultMax) {
            return;
        }
        this.currentWriteTimeout = Math.max(this.writeTimeoutDefaultMin, this.currentWriteTimeout);
        this.currentWriteTimeout *= 2L;
        this.currentWriteTimeout = Math.min(this.writeTimeoutDefaultMax, this.currentWriteTimeout);
        log.info("OpenSearch write timeout has been increased to {}", (Object)this.currentWriteTimeout);
    }

    private void waitBeforeRequest() {
        try {
            Thread.sleep(this.currentWriteTimeout);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public void scheduleElementToLog(SessionElementElastic element) {
        this.scheduleElementToLog(element, false);
    }

    private void scheduleElementToLog(SessionElementElastic element, boolean addToCache) {
        long payloadSize = this.calculatePayloadSizeInBytes(element);
        if (this.queueTotalPayloadSize.get() >= (long)this.queueMaxSizeBytes || !this.sessionElementsQueue.offer(QueueElement.builder().element(element).calculatedPayloadSize(payloadSize).build())) {
            log.error("Queue of opensearch elements is full, element is not added");
        } else {
            this.queueTotalPayloadSize.addAndGet(payloadSize);
        }
        if (addToCache) {
            this.putSessionElementToCache(element);
        }
    }

    public void scheduleElementToLogAndCache(SessionElementElastic element) {
        Pair sessionPair = (Pair)this.sessionsCache.get(element.getSessionId());
        if (sessionPair != null) {
            ((ReadWriteLock)sessionPair.getLeft()).readLock().lock();
            try {
                if (this.sessionsCache.containsKey(element.getSessionId())) {
                    this.scheduleElementToLog(element, true);
                }
                element.setExecutionStatus(ExecutionStatus.CANCELLED_OR_UNKNOWN);
                this.scheduleElementToLog(element, false);
            }
            finally {
                ((ReadWriteLock)sessionPair.getLeft()).readLock().unlock();
            }
        } else {
            element.setExecutionStatus(ExecutionStatus.CANCELLED_OR_UNKNOWN);
            this.scheduleElementToLog(element, false);
        }
    }

    public void putSessionToCache(Session session) {
        String sessionId = session.getId();
        this.sessionsCache.put(sessionId, Pair.of((Object)new ReentrantReadWriteLock(), (Object)session));
    }

    @Nullable
    public Pair<ReadWriteLock, Session> getSessionFromCache(String sessionId) {
        Pair sessionPair = (Pair)this.sessionsCache.get(sessionId);
        if (sessionPair == null || sessionPair.getRight() == null) {
            log.warn("Unable to get session from cache {}", (Object)sessionId);
        }
        return sessionPair;
    }

    private void putSessionElementToCache(SessionElementElastic sessionElement) {
        String sessionId = sessionElement.getSessionId();
        if (!this.sessionElementsCache.containsKey(sessionId)) {
            this.sessionElementsCache.put(sessionId, new ConcurrentHashMap());
        }
        ((ConcurrentMap)this.sessionElementsCache.get(sessionId)).put(sessionElement.getId(), sessionElement);
    }

    @Nullable
    public SessionElementElastic getSessionElementFromCache(String sessionId, String elementId) {
        Map elements = (Map)this.sessionElementsCache.get(sessionId);
        return elements != null ? (SessionElementElastic)elements.get(elementId) : null;
    }

    public Collection<SessionElementElastic> getSessionElementsFromCache(String sessionId) {
        Map elements = (Map)this.sessionElementsCache.get(sessionId);
        return elements != null ? elements.values() : Collections.emptyList();
    }

    public void putToSingleElementCache(String sessionId, SessionElementElastic sessionElement) {
        this.runWithSessionReadLock(sessionId, () -> this.singleElementCache.put(sessionId, sessionElement));
    }

    public SessionElementElastic moveFromSingleElementCacheToElementCache(String sessionId) {
        AtomicReference elementRef = new AtomicReference();
        this.runWithSessionReadLock(sessionId, () -> {
            elementRef.set((SessionElementElastic)this.singleElementCache.remove(sessionId));
            if (elementRef.get() != null) {
                this.putSessionElementToCache((SessionElementElastic)elementRef.get());
            }
        });
        return (SessionElementElastic)elementRef.get();
    }

    public void clearSessionCache(String sessionId) {
        this.sessionsCache.remove(sessionId);
        this.sessionElementsCache.remove(sessionId);
        this.singleElementCache.remove(sessionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runWithSessionReadLock(String sessionId, Runnable runnable) {
        Pair sessionPair = (Pair)this.sessionsCache.get(sessionId);
        if (sessionPair != null) {
            ((ReadWriteLock)sessionPair.getLeft()).readLock().lock();
            try {
                if (this.sessionsCache.containsKey(sessionId)) {
                    runnable.run();
                    return;
                }
            }
            finally {
                ((ReadWriteLock)sessionPair.getLeft()).readLock().unlock();
            }
        }
        log.debug("Session {} is not alive, skip sessions cache update", (Object)sessionId);
    }

    private long calculatePayloadSizeInBytes(SessionElementElastic element) {
        long size = 0L;
        String bodyBefore = element.getBodyBefore();
        String bodyAfter = element.getBodyAfter();
        if (bodyBefore != null) {
            size += (long)bodyBefore.length();
        }
        if (bodyAfter != null) {
            size += (long)bodyAfter.length();
        }
        return size;
    }
}

