/*
 * Decompiled with CFR 0.152.
 */
package no.sysco.middleware.alpakka.files.javadsl;

import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.Graph;
import akka.stream.Outlet;
import akka.stream.Shape;
import akka.stream.SourceShape;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.javadsl.Source;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import io.methvin.watcher.DirectoryChangeEvent;
import io.methvin.watcher.DirectoryChangeListener;
import io.methvin.watcher.DirectoryWatcher;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;

public class RecursiveDirectoryChangesSource {
    public static Source<Pair<Path, DirectoryChange>, NotUsed> create(Path directoryPath, Duration pollInterval, int maxBufferSize) {
        return Source.fromGraph((Graph)new DirectoryWatcherSourceStage(directoryPath, pollInterval, maxBufferSize));
    }

    static class DirectoryWatcherSourceStage
    extends GraphStage<SourceShape<Pair<Path, DirectoryChange>>> {
        private final Path directoryPath;
        private final Duration pollInterval;
        private final int maxBufferSize;
        private final Outlet<Pair<Path, DirectoryChange>> outlet = Outlet.create((String)"RecursiveDirectoryChangesSource.out");
        private final SourceShape<Pair<Path, DirectoryChange>> shape = new SourceShape(this.outlet);

        DirectoryWatcherSourceStage(Path directoryPath, Duration pollInterval, int maxBufferSize) {
            this.directoryPath = directoryPath;
            this.pollInterval = pollInterval;
            this.maxBufferSize = maxBufferSize;
        }

        public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
            return new TimerGraphStageLogic((Shape)this.shape){
                private final Queue<Pair<Path, DirectoryChange>> buffer;
                private final DirectoryWatcher watcher;
                {
                    super(arg0);
                    this.buffer = new ArrayDeque<Pair<Path, DirectoryChange>>();
                    this.watcher = DirectoryWatcher.builder().path(directoryPath).listener(this.getDirectoryChangeListener()).fileHashing(false).build();
                    this.setHandler(outlet, (OutHandler)new AbstractOutHandler(){

                        public void onPull() throws Exception {
                            if (!buffer.isEmpty()) {
                                this.pushHead();
                            } else {
                                this.schedulePoll();
                            }
                        }
                    });
                }

                private DirectoryChangeListener getDirectoryChangeListener() {
                    return new DirectoryChangeListener(){

                        public void onException(Exception e) {
                            try {
                                watcher.close();
                            }
                            catch (IOException iOException) {
                                // empty catch block
                            }
                            this.failStage(e);
                        }

                        public void onEvent(DirectoryChangeEvent event) {
                            if (DirectoryChangeEvent.EventType.OVERFLOW.equals((Object)event.eventType())) {
                                throw new RuntimeException("Overflow from watch service: '" + directoryPath + "'");
                            }
                            Path path = event.path();
                            Path absolutePath = directoryPath.resolve(path);
                            DirectoryChangeEvent.EventType change = event.eventType();
                            buffer.add(Pair.create((Object)absolutePath, (Object)this.mapChange(change)));
                            if (buffer.size() > maxBufferSize) {
                                throw new RuntimeException("Max event buffer size " + maxBufferSize + " reached for path: " + absolutePath);
                            }
                        }
                    };
                }

                public void onTimer(Object timerKey) throws Exception {
                    if (!this.isClosed(outlet)) {
                        if (!this.buffer.isEmpty()) {
                            this.pushHead();
                        } else {
                            this.schedulePoll();
                        }
                    }
                }

                public void preStart() throws Exception {
                    this.watcher.watchAsync();
                }

                public void postStop() throws Exception {
                    this.watcher.close();
                }

                private void pushHead() {
                    Pair<Path, DirectoryChange> head = this.buffer.poll();
                    if (head != null) {
                        this.push(outlet, head);
                    }
                }

                private void schedulePoll() {
                    this.scheduleOnce("poll", pollInterval);
                }

                private DirectoryChange mapChange(DirectoryChangeEvent.EventType change) {
                    switch (change) {
                        case CREATE: {
                            return DirectoryChange.Creation;
                        }
                        case DELETE: {
                            return DirectoryChange.Deletion;
                        }
                        case MODIFY: {
                            return DirectoryChange.Modification;
                        }
                    }
                    throw new IllegalStateException("This " + change + " is not supported");
                }
            };
        }

        public SourceShape<Pair<Path, DirectoryChange>> shape() {
            return this.shape;
        }
    }
}

