/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.flint.operation;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;

public abstract class FlintIndexOp {
    private static final Logger LOG = LogManager.getLogger();
    private final StateStore stateStore;
    private final String datasourceName;

    public void apply(FlintIndexMetadata metadata) {
        Optional<String> latestId = metadata.getLatestId();
        if (latestId.isEmpty()) {
            this.takeActionWithoutOCC(metadata);
        } else {
            FlintIndexStateModel initialFlintIndexStateModel = this.getFlintIndexStateModel(latestId.get());
            this.validFlintIndexInitialState(initialFlintIndexStateModel);
            FlintIndexStateModel transitionedFlintIndexStateModel = this.moveToTransitioningState(initialFlintIndexStateModel);
            try {
                this.runOp(metadata, transitionedFlintIndexStateModel);
                this.commit(transitionedFlintIndexStateModel);
            }
            catch (Throwable e) {
                LOG.error("Rolling back transient log due to transaction operation failure", e);
                try {
                    StateStore.updateFlintIndexState(this.stateStore, this.datasourceName).apply(transitionedFlintIndexStateModel, initialFlintIndexStateModel.getIndexState());
                }
                catch (Exception ex) {
                    LOG.error("Failed to rollback transient log", (Throwable)ex);
                }
                throw e;
            }
        }
    }

    @NotNull
    private FlintIndexStateModel getFlintIndexStateModel(String latestId) {
        Optional<FlintIndexStateModel> flintIndexOptional = StateStore.getFlintIndexState(this.stateStore, this.datasourceName).apply(latestId);
        if (flintIndexOptional.isEmpty()) {
            String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId);
            LOG.error(errorMsg);
            throw new IllegalStateException(errorMsg);
        }
        return flintIndexOptional.get();
    }

    private void takeActionWithoutOCC(FlintIndexMetadata metadata) {
        FlintIndexStateModel fakeModel = new FlintIndexStateModel(FlintIndexState.REFRESHING, metadata.getAppId(), metadata.getJobId(), "", this.datasourceName, System.currentTimeMillis(), "", -2L, 0L);
        this.runOp(metadata, fakeModel);
    }

    private void validFlintIndexInitialState(FlintIndexStateModel flintIndex) {
        LOG.debug("Validating the state before the transaction.");
        FlintIndexState currentState = flintIndex.getIndexState();
        if (!this.validate(currentState)) {
            String errorMsg = String.format(Locale.ROOT, "validate failed. unexpected state: [%s]", new Object[]{currentState});
            LOG.error(errorMsg);
            throw new IllegalStateException("Transaction failed as flint index is not in a valid state.");
        }
    }

    private FlintIndexStateModel moveToTransitioningState(FlintIndexStateModel flintIndex) {
        LOG.debug("Moving to transitioning state before committing.");
        FlintIndexState transitioningState = this.transitioningState();
        try {
            flintIndex = StateStore.updateFlintIndexState(this.stateStore, this.datasourceName).apply(flintIndex, this.transitioningState());
        }
        catch (Exception e) {
            String errorMsg = String.format(Locale.ROOT, "Moving to transition state:%s failed.", new Object[]{transitioningState});
            LOG.error(errorMsg, (Throwable)e);
            throw new IllegalStateException(errorMsg, e);
        }
        return flintIndex;
    }

    private void commit(FlintIndexStateModel flintIndex) {
        LOG.debug("Committing the transaction and moving to stable state.");
        FlintIndexState stableState = this.stableState();
        try {
            if (stableState == FlintIndexState.NONE) {
                LOG.info("Deleting index state with docId: " + flintIndex.getLatestId());
                StateStore.deleteFlintIndexState(this.stateStore, this.datasourceName).apply(flintIndex.getLatestId());
            } else {
                StateStore.updateFlintIndexState(this.stateStore, this.datasourceName).apply(flintIndex, stableState);
            }
        }
        catch (Exception e) {
            String errorMsg = String.format(Locale.ROOT, "commit failed. target stable state: [%s]", new Object[]{stableState});
            LOG.error(errorMsg, (Throwable)e);
            throw new IllegalStateException(errorMsg, e);
        }
    }

    public void cancelStreamingJob(EMRServerlessClient emrServerlessClient, FlintIndexStateModel flintIndexStateModel) throws InterruptedException, TimeoutException {
        String applicationId = flintIndexStateModel.getApplicationId();
        String jobId = flintIndexStateModel.getJobId();
        try {
            emrServerlessClient.cancelJobRun(flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true);
        }
        catch (ValidationException e) {
            if (e.getMessage().contains("Job run is not in a cancellable state")) {
                LOG.error((Object)e);
                return;
            }
            throw new RuntimeException("Internal Server Error.");
        }
        catch (Exception e) {
            LOG.error((Object)e);
            throw new RuntimeException("Internal Server Error.");
        }
        String jobRunState = "";
        int count = 3;
        while (count-- != 0 && !(jobRunState = emrServerlessClient.getJobRunResult(applicationId, jobId).getJobRun().getState()).equalsIgnoreCase("Cancelled")) {
            TimeUnit.SECONDS.sleep(1L);
        }
        if (!jobRunState.equalsIgnoreCase("Cancelled")) {
            String errMsg = "Cancel job timeout for Application ID: " + applicationId + ", Job ID: " + jobId;
            LOG.error(errMsg);
            throw new TimeoutException("Cancel job operation timed out.");
        }
    }

    abstract boolean validate(FlintIndexState var1);

    abstract FlintIndexState transitioningState();

    abstract void runOp(FlintIndexMetadata var1, FlintIndexStateModel var2);

    abstract FlintIndexState stableState();

    @Generated
    public FlintIndexOp(StateStore stateStore, String datasourceName) {
        this.stateStore = stateStore;
        this.datasourceName = datasourceName;
    }
}

