/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.filesystem.streaming;

import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.executor.streaming.Batch;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MetadataLog;
import org.opensearch.sql.executor.streaming.Offset;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.filesystem.storage.split.FileSystemSplit;
import org.opensearch.sql.filesystem.streaming.FileMetaData;
import org.opensearch.sql.storage.split.Split;

public class FileSystemStreamSource
implements StreamingSource {
    private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class);
    private final MetadataLog<FileMetaData> fileMetaDataLog;
    private Set<Path> seenFiles;
    private final FileSystem fs;
    private final Path basePath;

    public FileSystemStreamSource(FileSystem fs, Path basePath) {
        this.fs = fs;
        this.basePath = basePath;
        this.fileMetaDataLog = new DefaultMetadataLog();
        this.seenFiles = new HashSet<Path>();
    }

    public Optional<Offset> getLatestOffset() {
        Set allFiles = Arrays.stream(this.fs.listStatus(this.basePath)).filter(status -> !status.isDirectory()).map(FileStatus::getPath).collect(Collectors.toSet());
        log.debug("all files {}", allFiles);
        Sets.SetView unread = Sets.difference(allFiles, this.seenFiles);
        this.seenFiles = allFiles;
        log.debug("seen files {}", this.seenFiles);
        Optional<Long> latestBatchIdOptional = this.fileMetaDataLog.getLatest().map(Pair::getKey);
        if (!unread.isEmpty()) {
            long latestBatchId = latestBatchIdOptional.map(id -> id + 1L).orElse(0L);
            this.fileMetaDataLog.add(Long.valueOf(latestBatchId), (Object)new FileMetaData(latestBatchId, (Set<Path>)unread));
            log.debug("latestBatchId {}", (Object)latestBatchId);
            return Optional.of(new Offset(Long.valueOf(latestBatchId)));
        }
        log.debug("no unread data");
        Optional<Offset> offset = latestBatchIdOptional.isEmpty() ? Optional.empty() : Optional.of(new Offset(latestBatchIdOptional.get()));
        log.debug("return empty offset {}", offset);
        return offset;
    }

    public Batch getBatch(Optional<Offset> start, Offset end) {
        Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1L).orElse(0L);
        Long endBatchId = end.getOffset();
        Set<Path> paths = this.fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream().map(FileMetaData::getPaths).flatMap(Collection::stream).collect(Collectors.toSet());
        log.debug("fetch files {} with id from: {} to: {}.", paths, start, (Object)end);
        return new Batch((Split)new FileSystemSplit(paths));
    }
}

