package com.github.davidmoten.rx.operators;

import com.github.davidmoten.rx.FileObservable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.StringObservable;
import rx.observers.Subscribers;
import rx.subjects.PublishSubject;

/* loaded from: input_file:com/github/davidmoten/rx/operators/OperatorFileTailer.class */
public class OperatorFileTailer implements Observable.Operator<byte[], Object> {
    private final File file;
    private final AtomicLong currentPosition;
    private final int maxBytesPerEmission;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.github.davidmoten.rx.operators.OperatorFileTailer$1, reason: invalid class name */
    /* loaded from: input_file:com/github/davidmoten/rx/operators/OperatorFileTailer$1.class */
    public static class AnonymousClass1 implements Func1<Object, Observable<byte[]>> {
        final /* synthetic */ AtomicLong val$currentPosition;
        final /* synthetic */ File val$file;
        final /* synthetic */ int val$maxBytesPerEmission;

        AnonymousClass1(AtomicLong atomicLong, File file, int i) {
            this.val$currentPosition = atomicLong;
            this.val$file = file;
            this.val$maxBytesPerEmission = i;
        }

        /* renamed from: call, reason: merged with bridge method [inline-methods] */
        public Observable<byte[]> m2call(Object obj) {
            if ((obj instanceof WatchEvent) && ((WatchEvent) obj).kind().name().equals(StandardWatchEventKinds.ENTRY_CREATE.name())) {
                this.val$currentPosition.set(0L);
            }
            if (this.val$file.length() <= this.val$currentPosition.get()) {
                return Observable.empty();
            }
            try {
                final FileInputStream fileInputStream = new FileInputStream(this.val$file);
                fileInputStream.skip(this.val$currentPosition.get());
                return Observable.using(new Func0<InputStream>() { // from class: com.github.davidmoten.rx.operators.OperatorFileTailer.1.1
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public InputStream m3call() {
                        return fileInputStream;
                    }
                }, new Func1<InputStream, Observable<byte[]>>() { // from class: com.github.davidmoten.rx.operators.OperatorFileTailer.1.2
                    public Observable<byte[]> call(InputStream inputStream) {
                        return StringObservable.from(fileInputStream, AnonymousClass1.this.val$maxBytesPerEmission).doOnNext(new Action1<byte[]>() { // from class: com.github.davidmoten.rx.operators.OperatorFileTailer.1.2.1
                            public void call(byte[] bArr) {
                                AnonymousClass1.this.val$currentPosition.addAndGet(bArr.length);
                            }
                        });
                    }
                }, new Action1<InputStream>() { // from class: com.github.davidmoten.rx.operators.OperatorFileTailer.1.3
                    public void call(InputStream inputStream) {
                        try {
                            inputStream.close();
                        } catch (IOException e) {
                        }
                    }
                });
            } catch (IOException e) {
                return Observable.error(e);
            }
        }
    }

    public OperatorFileTailer(File file, long j, int i) {
        this.currentPosition = new AtomicLong();
        if (file == null) {
            throw new NullPointerException("file cannot be null");
        }
        this.file = file;
        this.currentPosition.set(j);
        this.maxBytesPerEmission = i;
    }

    public OperatorFileTailer(File file, long j) {
        this(file, j, FileObservable.DEFAULT_MAX_BYTES_PER_EMISSION);
    }

    public Subscriber<? super Object> call(Subscriber<? super byte[]> subscriber) {
        PublishSubject create = PublishSubject.create();
        Subscriber<? super Object> from = Subscribers.from(create);
        subscriber.add(from);
        create.concatMap(reportNewLines(this.file, this.currentPosition, this.maxBytesPerEmission)).unsafeSubscribe(subscriber);
        return from;
    }

    private static Func1<Object, Observable<byte[]>> reportNewLines(File file, AtomicLong atomicLong, int i) {
        return new AnonymousClass1(atomicLong, file, i);
    }
}
