/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.index.translog;

import com.carrotsearch.hppc.LongArrayList;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.bytes.ReleasableBytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.Channels;
import org.opensearch.common.io.DiskIoBufferPool;
import org.opensearch.common.io.stream.ReleasableBytesStreamOutput;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.Assertions;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.BaseTranslogReader;
import org.opensearch.index.translog.BufferedChecksumStreamInput;
import org.opensearch.index.translog.ChannelFactory;
import org.opensearch.index.translog.Checkpoint;
import org.opensearch.index.translog.TragicExceptionHolder;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.TranslogHeader;
import org.opensearch.index.translog.TranslogReader;
import org.opensearch.index.translog.TranslogSnapshot;

public class TranslogWriter
extends BaseTranslogReader
implements Closeable {
    private final ShardId shardId;
    private final FileChannel checkpointChannel;
    private final Path checkpointPath;
    private final BigArrays bigArrays;
    private volatile Checkpoint lastSyncedCheckpoint;
    private volatile int operationCounter;
    private final TragicExceptionHolder tragedy;
    private volatile long totalOffset;
    private volatile long minSeqNo;
    private volatile long maxSeqNo;
    private final LongSupplier globalCheckpointSupplier;
    private final LongSupplier minTranslogGenerationSupplier;
    private final LongConsumer persistedSequenceNumberConsumer;
    protected final AtomicBoolean closed = new AtomicBoolean(false);
    private final ReleasableLock writeLock = new ReleasableLock(new ReentrantLock());
    private final Object syncLock = new Object();
    private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64);
    private final int forceWriteThreshold;
    private volatile long bufferedBytes;
    private ReleasableBytesStreamOutput buffer;
    private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
    private final Boolean remoteTranslogEnabled;

    private TranslogWriter(ShardId shardId, Checkpoint initialCheckpoint, FileChannel channel, FileChannel checkpointChannel, Path path, Path checkpointPath, ByteSizeValue bufferSize, LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, BigArrays bigArrays, Boolean remoteTranslogEnabled) throws IOException {
        super(initialCheckpoint.generation, channel, path, header);
        assert (initialCheckpoint.offset == channel.position()) : "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" + channel.position() + "]";
        this.forceWriteThreshold = Math.toIntExact(bufferSize.getBytes());
        this.shardId = shardId;
        this.checkpointChannel = checkpointChannel;
        this.checkpointPath = checkpointPath;
        this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
        this.lastSyncedCheckpoint = initialCheckpoint;
        this.totalOffset = initialCheckpoint.offset;
        assert (initialCheckpoint.minSeqNo == -1L) : initialCheckpoint.minSeqNo;
        this.minSeqNo = initialCheckpoint.minSeqNo;
        assert (initialCheckpoint.maxSeqNo == -1L) : initialCheckpoint.maxSeqNo;
        this.maxSeqNo = initialCheckpoint.maxSeqNo;
        assert (initialCheckpoint.trimmedAboveSeqNo == -2L) : initialCheckpoint.trimmedAboveSeqNo;
        this.globalCheckpointSupplier = globalCheckpointSupplier;
        this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
        this.bigArrays = bigArrays;
        this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap() : null;
        this.tragedy = tragedy;
        this.remoteTranslogEnabled = remoteTranslogEnabled;
    }

    public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, long initialMinTranslogGen, long initialGlobalCheckpoint, LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, BigArrays bigArrays, Boolean remoteTranslogEnabled) throws IOException {
        Path checkpointFile = file.getParent().resolve("translog.ckp");
        FileChannel channel = channelFactory.open(file);
        FileChannel checkpointChannel = null;
        try {
            checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE);
            TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm);
            header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled));
            Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(header.sizeInBytes(), fileGeneration, initialGlobalCheckpoint, initialMinTranslogGen);
            TranslogWriter.writeCheckpoint(checkpointChannel, checkpointFile, checkpoint, remoteTranslogEnabled);
            LongSupplier writerGlobalCheckpointSupplier = Assertions.ENABLED ? () -> {
                long gcp = globalCheckpointSupplier.getAsLong();
                assert (gcp >= initialGlobalCheckpoint || remoteTranslogEnabled == Boolean.TRUE) : "global checkpoint [" + gcp + "] lower than initial gcp [" + initialGlobalCheckpoint + "]";
                return gcp;
            } : globalCheckpointSupplier;
            return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize, writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer, bigArrays, remoteTranslogEnabled);
        }
        catch (Exception exception) {
            IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{channel, checkpointChannel});
            throw exception;
        }
    }

    private synchronized void closeWithTragicEvent(Exception ex) {
        this.tragedy.setTragicException(ex);
        try {
            this.close();
        }
        catch (IOException | RuntimeException e) {
            ex.addSuppressed(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Translog.Location add(BytesReference data, long seqNo) throws IOException {
        Translog.Location location;
        long bufferedBytesBeforeAdd = this.bufferedBytes;
        if (bufferedBytesBeforeAdd >= (long)this.forceWriteThreshold) {
            this.writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= (long)(this.forceWriteThreshold * 4));
        }
        TranslogWriter translogWriter = this;
        synchronized (translogWriter) {
            this.ensureOpen();
            if (this.buffer == null) {
                this.buffer = new ReleasableBytesStreamOutput(this.bigArrays);
            }
            assert (this.bufferedBytes == (long)this.buffer.size());
            long offset = this.totalOffset;
            this.totalOffset += (long)data.length();
            data.writeTo((OutputStream)((Object)this.buffer));
            assert (this.minSeqNo != -1L || this.operationCounter == 0);
            assert (this.maxSeqNo != -1L || this.operationCounter == 0);
            this.minSeqNo = SequenceNumbers.min(this.minSeqNo, seqNo);
            this.maxSeqNo = SequenceNumbers.max(this.maxSeqNo, seqNo);
            this.nonFsyncedSequenceNumbers.add(seqNo);
            ++this.operationCounter;
            assert (this.assertNoSeqNumberConflict(seqNo, data));
            location = new Translog.Location(this.generation, offset, data.length());
            this.bufferedBytes = this.buffer.size();
        }
        return location;
    }

    private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException {
        if (seqNo != -2L) {
            if (this.seenSequenceNumbers.containsKey(seqNo)) {
                Tuple<BytesReference, Exception> previous = this.seenSequenceNumbers.get(seqNo);
                if (!((BytesReference)previous.v1()).equals(data)) {
                    boolean sameOp;
                    Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
                    Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(((BytesReference)previous.v1()).streamInput(), "assertion"));
                    if (newOp instanceof Translog.Index && prvOp instanceof Translog.Index) {
                        Translog.Index o1 = (Translog.Index)prvOp;
                        Translog.Index o2 = (Translog.Index)newOp;
                        sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing()) && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version();
                    } else if (newOp instanceof Translog.Delete && prvOp instanceof Translog.Delete) {
                        Translog.Delete o1 = (Translog.Delete)newOp;
                        Translog.Delete o2 = (Translog.Delete)prvOp;
                        sameOp = Objects.equals(o1.id(), o2.id()) && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version();
                    } else {
                        sameOp = false;
                    }
                    if (!sameOp) {
                        throw new AssertionError("seqNo [" + seqNo + "] was processed twice in generation [" + this.generation + "], with different data. prvOp [" + prvOp + "], newOp [" + newOp + "]", (Throwable)previous.v2());
                    }
                }
            } else {
                this.seenSequenceNumbers.put(seqNo, (Tuple<BytesReference, Exception>)new Tuple((Object)new BytesArray(data.toBytesRef(), true), (Object)new RuntimeException("stack capture previous op")));
            }
        }
        return true;
    }

    synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
        this.seenSequenceNumbers.entrySet().stream().filter(e -> (Long)e.getKey() > aboveSeqNo).forEach(e -> {
            Translog.Operation op;
            try {
                op = Translog.readOperation(new BufferedChecksumStreamInput(((BytesReference)((Tuple)e.getValue()).v1()).streamInput(), "assertion"));
            }
            catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            long seqNo = op.seqNo();
            long primaryTerm = op.primaryTerm();
            if (primaryTerm < belowTerm) {
                throw new AssertionError((Object)("current should not have any operations with seq#:primaryTerm [" + seqNo + ":" + primaryTerm + "] > " + aboveSeqNo + ":" + belowTerm));
            }
        });
        return true;
    }

    public boolean sync() throws IOException {
        return this.syncUpTo(Long.MAX_VALUE);
    }

    public boolean syncNeeded() {
        return this.totalOffset != this.lastSyncedCheckpoint.offset || this.globalCheckpointSupplier.getAsLong() != this.lastSyncedCheckpoint.globalCheckpoint || this.minTranslogGenerationSupplier.getAsLong() != this.lastSyncedCheckpoint.minTranslogGeneration;
    }

    @Override
    public int totalOperations() {
        return this.operationCounter;
    }

    @Override
    synchronized Checkpoint getCheckpoint() {
        return new Checkpoint(this.totalOffset, this.operationCounter, this.generation, this.minSeqNo, this.maxSeqNo, this.globalCheckpointSupplier.getAsLong(), this.minTranslogGenerationSupplier.getAsLong(), -2L);
    }

    @Override
    public long sizeInBytes() {
        return this.totalOffset;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public TranslogReader closeIntoReader() throws IOException {
        Object object = this.syncLock;
        synchronized (object) {
            try (ReleasableLock toClose = this.writeLock.acquire();){
                TranslogWriter translogWriter = this;
                synchronized (translogWriter) {
                    try {
                        this.sync();
                    }
                    catch (Exception ex) {
                        this.closeWithTragicEvent(ex);
                        throw ex;
                    }
                    assert (this.buffer == null);
                    assert (this.checkChannelPositionWhileHandlingException(this.totalOffset));
                    assert (this.totalOffset == this.lastSyncedCheckpoint.offset);
                    if (this.closed.compareAndSet(false, true)) {
                        try {
                            this.checkpointChannel.close();
                        }
                        catch (Exception ex) {
                            this.closeWithTragicEvent(ex);
                            throw ex;
                        }
                        TranslogReader translogReader = new TranslogReader(this.getLastSyncedCheckpoint(), this.channel, this.path, this.header);
                        return translogReader;
                    }
                    throw new AlreadyClosedException("translog [" + this.getGeneration() + "] is already closed (path [" + this.path + "]", (Throwable)this.tragedy.get());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public TranslogSnapshot newSnapshot() {
        Object object = this.syncLock;
        synchronized (object) {
            try (ReleasableLock toClose = this.writeLock.acquire();){
                TranslogWriter translogWriter = this;
                synchronized (translogWriter) {
                    this.ensureOpen();
                    try {
                        this.sync();
                    }
                    catch (IOException e) {
                        throw new TranslogException(this.shardId, "exception while syncing before creating a snapshot", e);
                    }
                    assert (this.buffer == null);
                    assert (this.checkChannelPositionWhileHandlingException(this.totalOffset));
                    assert (this.totalOffset == this.lastSyncedCheckpoint.offset);
                    TranslogSnapshot translogSnapshot = super.newSnapshot();
                    return translogSnapshot;
                }
            }
        }
    }

    private long getWrittenOffset() throws IOException {
        return this.channel.position();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final boolean syncUpTo(long offset) throws IOException {
        if (this.lastSyncedCheckpoint.offset < offset && this.syncNeeded()) {
            Object object = this.syncLock;
            synchronized (object) {
                if (this.lastSyncedCheckpoint.offset < offset && this.syncNeeded()) {
                    LongArrayList flushedSequenceNumbers;
                    Checkpoint checkpointToSync;
                    try (ReleasableLock toClose = this.writeLock.acquire();){
                        ReleasableBytesReference toWrite;
                        TranslogWriter translogWriter = this;
                        synchronized (translogWriter) {
                            this.ensureOpen();
                            checkpointToSync = this.getCheckpoint();
                            toWrite = this.pollOpsToWrite();
                            flushedSequenceNumbers = this.nonFsyncedSequenceNumbers;
                            this.nonFsyncedSequenceNumbers = new LongArrayList(64);
                        }
                        try {
                            this.writeAndReleaseOps(toWrite);
                        }
                        catch (Exception ex) {
                            this.closeWithTragicEvent(ex);
                            throw ex;
                        }
                    }
                    try {
                        if (!Boolean.TRUE.equals(this.remoteTranslogEnabled)) {
                            this.channel.force(false);
                        }
                        TranslogWriter.writeCheckpoint(this.checkpointChannel, this.checkpointPath, checkpointToSync, this.remoteTranslogEnabled);
                    }
                    catch (Exception ex) {
                        this.closeWithTragicEvent(ex);
                        throw ex;
                    }
                    flushedSequenceNumbers.forEach(this.persistedSequenceNumberConsumer::accept);
                    assert (this.lastSyncedCheckpoint.offset <= checkpointToSync.offset) : "illegal state: " + this.lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
                    this.lastSyncedCheckpoint = checkpointToSync;
                    return true;
                }
            }
        }
        return false;
    }

    private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws IOException {
        try (ReleasableLock locked = blockOnExistingWriter ? this.writeLock.acquire() : this.writeLock.tryAcquire();){
            try {
                if (locked != null && offset > this.getWrittenOffset()) {
                    this.writeAndReleaseOps(this.pollOpsToWrite());
                }
            }
            catch (Exception e) {
                this.closeWithTragicEvent(e);
                throw e;
            }
        }
    }

    private synchronized ReleasableBytesReference pollOpsToWrite() {
        this.ensureOpen();
        if (this.buffer != null) {
            ReleasableBytesStreamOutput toWrite = this.buffer;
            this.buffer = null;
            this.bufferedBytes = 0L;
            return new ReleasableBytesReference(toWrite.bytes(), toWrite);
        }
        return ReleasableBytesReference.wrap((BytesReference)BytesArray.EMPTY);
    }

    private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
        try (ReleasableBytesReference toClose = toWrite;){
            BytesRef current;
            assert (this.writeLock.isHeldByCurrentThread());
            ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer();
            BytesRefIterator iterator = toWrite.iterator();
            while ((current = iterator.next()) != null) {
                int nBytesToWrite;
                for (int currentBytesConsumed = 0; currentBytesConsumed != current.length; currentBytesConsumed += nBytesToWrite) {
                    nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());
                    ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);
                    if (ioBuffer.hasRemaining()) continue;
                    ioBuffer.flip();
                    this.writeToFile(ioBuffer);
                    ioBuffer.clear();
                }
            }
            ioBuffer.flip();
            this.writeToFile(ioBuffer);
        }
    }

    @SuppressForbidden(reason="Channel#write")
    private void writeToFile(ByteBuffer ioBuffer) throws IOException {
        while (ioBuffer.remaining() > 0) {
            this.channel.write(ioBuffer);
        }
    }

    @Override
    protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
        try {
            if (position + (long)targetBuffer.remaining() > this.getWrittenOffset()) {
                this.writeBufferedOps(position + (long)targetBuffer.remaining(), true);
            }
        }
        catch (Exception ex) {
            this.closeWithTragicEvent(ex);
            throw ex;
        }
        Channels.readFromFileChannelWithEofException(this.channel, position, targetBuffer);
    }

    private static void writeCheckpoint(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint, Boolean remoteTranslogEnabled) throws IOException {
        Checkpoint.write(fileChannel, checkpointFile, checkpoint, !Boolean.TRUE.equals(remoteTranslogEnabled));
    }

    Checkpoint getLastSyncedCheckpoint() {
        return this.lastSyncedCheckpoint;
    }

    protected final void ensureOpen() {
        if (this.isClosed()) {
            throw new AlreadyClosedException("translog [" + this.getGeneration() + "] is already closed", (Throwable)this.tragedy.get());
        }
    }

    private boolean checkChannelPositionWhileHandlingException(long expectedOffset) {
        try {
            return expectedOffset == this.channel.position();
        }
        catch (IOException e) {
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            TranslogWriter translogWriter = this;
            synchronized (translogWriter) {
                Releasables.closeWhileHandlingException((Releasable[])new Releasable[]{this.buffer});
                this.buffer = null;
                this.bufferedBytes = 0L;
            }
            IOUtils.close((Closeable[])new Closeable[]{this.checkpointChannel, this.channel});
        }
    }

    protected final boolean isClosed() {
        return this.closed.get();
    }
}

