package com.baidu.hugegraph.loader.reader.hdfs;

import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.progress.FileItemProgress;
import com.baidu.hugegraph.loader.progress.InputItemProgress;
import com.baidu.hugegraph.loader.reader.Readable;
import com.baidu.hugegraph.loader.reader.file.FileLineFetcher;
import com.baidu.hugegraph.loader.reader.file.FileReader;
import com.baidu.hugegraph.loader.reader.file.OrcFileLineFetcher;
import com.baidu.hugegraph.loader.reader.file.ParquetFileLineFetcher;
import com.baidu.hugegraph.loader.source.file.Compression;
import com.baidu.hugegraph.loader.source.file.FileFilter;
import com.baidu.hugegraph.loader.source.hdfs.HDFSSource;
import com.baidu.hugegraph.loader.source.hdfs.KerberosConfig;
import com.baidu.hugegraph.util.Log;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/loader/reader/hdfs/HDFSFileReader.class */
public class HDFSFileReader extends FileReader {
    private static final Logger LOG;
    private final FileSystem hdfs;
    private final Configuration conf;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/baidu/hugegraph/loader/reader/hdfs/HDFSFileReader$HDFSFile.class */
    private static class HDFSFile implements Readable {
        private final FileSystem hdfs;
        private final Path path;

        private HDFSFile(FileSystem fileSystem, Path path) {
            this.hdfs = fileSystem;
            this.path = path;
        }

        public FileSystem hdfs() {
            return this.hdfs;
        }

        @Override // com.baidu.hugegraph.loader.reader.Readable
        public String name() {
            return this.path.getName();
        }

        @Override // com.baidu.hugegraph.loader.reader.Readable
        public Path path() {
            return this.path;
        }

        @Override // com.baidu.hugegraph.loader.reader.Readable
        public InputStream open() throws IOException {
            return this.hdfs.open(this.path);
        }

        @Override // com.baidu.hugegraph.loader.reader.Readable
        public InputItemProgress inputItemProgress() {
            String name = this.path.getName();
            try {
                long modificationTime = this.hdfs.getFileStatus(this.path).getModificationTime();
                try {
                    FileChecksum fileChecksum = this.hdfs.getFileChecksum(this.path);
                    if (fileChecksum == null) {
                        throw new LoadException("The checksum of HDFS path '%s' is null", this.path);
                    }
                    return new FileItemProgress(name, modificationTime, new String(fileChecksum.getBytes(), Constants.CHARSET), 0L);
                } catch (IOException e) {
                    throw new LoadException("Failed to calculate checksum for HDFS path '%s'", e, this.path);
                }
            } catch (IOException e2) {
                throw new LoadException("Failed to get last modified time for HDFS path '%s'", e2, this.path);
            }
        }

        public String toString() {
            return "HDFS: " + this.path;
        }
    }

    public HDFSFileReader(HDFSSource hDFSSource) {
        super(hDFSSource);
        this.conf = loadConfiguration();
        try {
            enableKerberos(hDFSSource);
            this.hdfs = FileSystem.get(this.conf);
            checkExist(this.hdfs, new Path(hDFSSource.path()));
        } catch (IOException e) {
            throw new LoadException("Failed to create HDFS file system", e);
        }
    }

    private void enableKerberos(HDFSSource hDFSSource) throws IOException {
        KerberosConfig kerberosConfig = hDFSSource.kerberosConfig();
        if (kerberosConfig == null || !kerberosConfig.enable()) {
            return;
        }
        System.setProperty("java.security.krb5.conf", kerberosConfig.krb5Conf());
        UserGroupInformation.setConfiguration(this.conf);
        UserGroupInformation.loginUserFromKeytab(kerberosConfig.principal(), kerberosConfig.keyTab());
    }

    public FileSystem fileSystem() {
        return this.hdfs;
    }

    @Override // com.baidu.hugegraph.loader.reader.file.FileReader
    public HDFSSource source() {
        return (HDFSSource) super.source();
    }

    @Override // com.baidu.hugegraph.loader.reader.file.FileReader, com.baidu.hugegraph.loader.reader.InputReader, java.lang.AutoCloseable
    public void close() {
        super.close();
        try {
            this.hdfs.close();
        } catch (IOException e) {
            LOG.warn("Failed to close reader for {} with exception {}", source(), e.getMessage(), e);
        }
    }

    @Override // com.baidu.hugegraph.loader.reader.file.FileReader
    protected List<Readable> scanReadables() throws IOException {
        Path path = new Path(source().path());
        FileFilter filter = source().filter();
        ArrayList arrayList = new ArrayList();
        if (this.hdfs.isFile(path)) {
            if (!filter.reserved(path.getName())) {
                throw new LoadException("Please check path name and extensions, ensure that at least one path is available for reading");
            }
            arrayList.add(new HDFSFile(this.hdfs, path));
        } else {
            if (!$assertionsDisabled && !this.hdfs.isDirectory(path)) {
                throw new AssertionError();
            }
            for (Path path2 : FileUtil.stat2Paths(this.hdfs.listStatus(path))) {
                if (filter.reserved(path2.getName())) {
                    arrayList.add(new HDFSFile(this.hdfs, path2));
                }
            }
        }
        return arrayList;
    }

    @Override // com.baidu.hugegraph.loader.reader.file.FileReader
    protected FileLineFetcher createLineFetcher() {
        return Compression.ORC == source().compression() ? new OrcFileLineFetcher(source(), this.conf) : Compression.PARQUET == source().compression() ? new ParquetFileLineFetcher(source(), this.conf) : new FileLineFetcher(source());
    }

    private Configuration loadConfiguration() {
        Configuration configuration = new Configuration();
        configuration.addResource(new Path(source().coreSitePath()));
        if (source().hdfsSitePath() != null) {
            configuration.addResource(new Path(source().hdfsSitePath()));
        }
        return configuration;
    }

    private static void checkExist(FileSystem fileSystem, Path path) {
        try {
            if (fileSystem.exists(path)) {
            } else {
                throw new LoadException("Please ensure the file or directory exists: '%s'", path);
            }
        } catch (IOException e) {
            throw new LoadException("An exception occurred while checking HDFS path: '%s'", e, path);
        }
    }

    static {
        $assertionsDisabled = !HDFSFileReader.class.desiredAssertionStatus();
        LOG = Log.logger((Class<?>) HDFSFileReader.class);
    }
}
