/*
 * Decompiled with CFR 0.152.
 */
package de.galan.dmsexchange.exchange.read;

import de.galan.commons.logging.Logr;
import de.galan.dmsexchange.exchange.DefaultExchange;
import de.galan.dmsexchange.exchange.DmsReader;
import de.galan.dmsexchange.exchange.container.ContainerDeserializer;
import de.galan.dmsexchange.exchange.read.CountingDocumentConsumer;
import de.galan.dmsexchange.exchange.read.DocumentReadInvalidEvent;
import de.galan.dmsexchange.exchange.read.WrappingDocumentConsumer;
import de.galan.dmsexchange.meta.Document;
import de.galan.dmsexchange.util.DmsExchangeException;
import de.galan.verjson.util.ReadException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.function.Consumer;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.logging.log4j.Logger;

public class DefaultDmsReader
extends DefaultExchange
implements DmsReader {
    private static final Logger LOG = Logr.get();
    private InputStream inputstream;
    private ContainerDeserializer deserializer;

    public DefaultDmsReader(InputStream inputstream) {
        this.inputstream = inputstream;
        this.deserializer = new ContainerDeserializer();
    }

    @Override
    public void registerSubscriber(Object ... listeners) {
        Arrays.asList(listeners).stream().forEach(this::registerListener);
    }

    @Override
    public void readDocuments(Consumer<Document> consumer) throws DmsExchangeException {
        if (consumer == null) {
            throw new NullPointerException("Consumer is null");
        }
        WrappingDocumentConsumer wrapper = new WrappingDocumentConsumer(consumer);
        this.registerListener(wrapper);
        try {
            this.readDocuments();
        }
        finally {
            this.unregisterListener(wrapper);
        }
    }

    @Override
    public void readDocuments() throws DmsExchangeException {
        CountingDocumentConsumer counter = new CountingDocumentConsumer();
        this.registerListener(counter);
        try {
            this.readArchive();
        }
        finally {
            this.unregisterListener(counter);
        }
        LOG.info("Finished reading export-archive with {} documents", new Object[]{counter.getCountedDocuments()});
    }

    private void readArchive() throws DmsExchangeException {
        try (TarArchiveInputStream tar = new TarArchiveInputStream((InputStream)new GzipCompressorInputStream(this.inputstream));){
            TarArchiveEntry entry = null;
            while ((entry = tar.getNextTarEntry()) != null) {
                if (entry.isDirectory()) continue;
                if (entry.isFile() && entry.getName().endsWith(".tar")) {
                    try {
                        Document document = this.deserializer.unarchive((InputStream)tar, false);
                        this.postEvent(document);
                    }
                    catch (DmsExchangeException ex) {
                        this.postEvent(new DocumentReadInvalidEvent(entry.getName()));
                    }
                    catch (ReadException ex) {
                        this.postEvent(new DocumentReadInvalidEvent(entry.getName()));
                    }
                    continue;
                }
                LOG.warn("Unrecognized element: {}", new Object[]{entry.getName()});
                this.postEvent(new DocumentReadInvalidEvent(entry.getName()));
            }
        }
        catch (IOException ex) {
            throw new DmsExchangeException("Unable to read container tar", ex);
        }
    }
}

