/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.timeseries.ml;

import com.google.gson.Gson;
import io.protostuff.LinkedBuffer;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkAction;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.get.GetAction;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetAction;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.ml.ModelState;
import org.opensearch.timeseries.ml.Sample;
import org.opensearch.timeseries.util.ClientUtil;

public abstract class CheckpointDao<RCFModelType, IndexType extends Enum<IndexType>, IndexManagementType extends IndexManagement<IndexType>> {
    private static final Logger logger = LogManager.getLogger(CheckpointDao.class);
    public static final String TIMEOUT_LOG_MSG = "Timeout while deleting checkpoints of";
    public static final String BULK_FAILURE_LOG_MSG = "Bulk failure while deleting checkpoints of";
    public static final String SEARCH_FAILURE_LOG_MSG = "Search failure while deleting checkpoints of";
    public static final String DOC_GOT_DELETED_LOG_MSG = "checkpoints docs get deleted";
    public static final String INDEX_DELETED_LOG_MSG = "Checkpoint index has been deleted.  Has nothing to do:";
    protected final Client client;
    protected final ClientUtil clientUtil;
    protected final String indexName;
    protected Gson gson;
    protected final int maxCheckpointBytes;
    protected final GenericObjectPool<LinkedBuffer> serializeRCFBufferPool;
    protected final int serializeRCFBufferSize;
    protected final IndexManagement<IndexType> indexUtil;
    protected final Clock clock;
    public static final String NOT_ABLE_TO_DELETE_CHECKPOINT_MSG = "Cannot delete all checkpoints of detector";

    public CheckpointDao(Client client, ClientUtil clientUtil, String indexName, Gson gson, int maxCheckpointBytes, GenericObjectPool<LinkedBuffer> serializeRCFBufferPool, int serializeRCFBufferSize, IndexManagementType indexUtil, Clock clock) {
        this.client = client;
        this.clientUtil = clientUtil;
        this.indexName = indexName;
        this.gson = gson;
        this.maxCheckpointBytes = maxCheckpointBytes;
        this.serializeRCFBufferPool = serializeRCFBufferPool;
        this.serializeRCFBufferSize = serializeRCFBufferSize;
        this.indexUtil = indexUtil;
        this.clock = clock;
    }

    protected void putModelCheckpoint(String modelId, Map<String, Object> source, ActionListener<Void> listener) {
        if (this.indexUtil.doesCheckpointIndexExist()) {
            this.saveModelCheckpointAsync(source, modelId, listener);
        } else {
            this.onCheckpointNotExist(source, modelId, listener);
        }
    }

    protected void saveModelCheckpointAsync(Map<String, Object> source, String modelId, ActionListener<Void> listener) {
        UpdateRequest updateRequest = new UpdateRequest(this.indexName, modelId);
        updateRequest.doc(source);
        updateRequest.docAsUpsert(true);
        this.clientUtil.asyncRequest(updateRequest, (arg_0, arg_1) -> ((Client)this.client).update(arg_0, arg_1), ActionListener.wrap(r -> listener.onResponse(null), arg_0 -> listener.onFailure(arg_0)));
    }

    protected void onCheckpointNotExist(Map<String, Object> source, String modelId, ActionListener<Void> listener) {
        this.indexUtil.initCheckpointIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(initResponse -> {
            if (!initResponse.isAcknowledged()) {
                throw new RuntimeException("Creating checkpoint with mappings call not acknowledged.");
            }
            this.saveModelCheckpointAsync(source, modelId, listener);
        }, exception -> {
            if (ExceptionsHelper.unwrapCause((Throwable)exception) instanceof ResourceAlreadyExistsException) {
                this.saveModelCheckpointAsync(source, modelId, listener);
            } else {
                logger.error(String.format(Locale.ROOT, "Unexpected error creating index %s", this.indexName), (Throwable)exception);
            }
        }));
    }

    protected Map.Entry<LinkedBuffer, Boolean> checkoutOrNewBuffer() {
        LinkedBuffer buffer = null;
        boolean isCheckout = true;
        try {
            buffer = (LinkedBuffer)this.serializeRCFBufferPool.borrowObject();
        }
        catch (Exception e) {
            logger.warn("Failed to borrow a buffer from pool", (Throwable)e);
        }
        if (buffer == null) {
            buffer = LinkedBuffer.allocate((int)this.serializeRCFBufferSize);
            isCheckout = false;
        }
        return new AbstractMap.SimpleImmutableEntry<LinkedBuffer, Boolean>(buffer, isCheckout);
    }

    public void deleteModelCheckpoint(String modelId, ActionListener<Void> listener) {
        this.clientUtil.asyncRequest(new DeleteRequest(this.indexName, modelId), (arg_0, arg_1) -> ((Client)this.client).delete(arg_0, arg_1), ActionListener.wrap(r -> listener.onResponse(null), arg_0 -> listener.onFailure(arg_0)));
    }

    protected void logFailure(BulkByScrollResponse response, String id) {
        if (response.isTimedOut()) {
            logger.warn("Timeout while deleting checkpoints of {}", (Object)id);
        } else if (!response.getBulkFailures().isEmpty()) {
            logger.warn("Bulk failure while deleting checkpoints of {}", (Object)id);
            for (BulkItemResponse.Failure bulkFailure : response.getBulkFailures()) {
                logger.warn((Object)bulkFailure);
            }
        } else {
            logger.warn("Search failure while deleting checkpoints of {}", (Object)id);
            for (ScrollableHitSource.SearchFailure searchFailure : response.getSearchFailures()) {
                logger.warn((Object)searchFailure);
            }
        }
    }

    public boolean shouldSave(ModelState<RCFModelType> modelState, boolean forceWrite, Duration checkpointInterval, Clock clock) {
        if (modelState == null) {
            return false;
        }
        Instant lastCheckpointTime = modelState.getLastCheckpointTime();
        boolean isTimeForCheckpoint = lastCheckpointTime != null && !lastCheckpointTime.equals(Instant.MIN) && lastCheckpointTime.plus(checkpointInterval).isBefore(clock.instant());
        boolean hasValidSamples = modelState.getSamples() != null && !modelState.getSamples().isEmpty();
        boolean isModelStateValid = modelState.getModel().isPresent() || hasValidSamples;
        return isModelStateValid && (isTimeForCheckpoint || forceWrite);
    }

    public void batchWrite(BulkRequest request, ActionListener<BulkResponse> listener) {
        if (this.indexUtil.doesCheckpointIndexExist()) {
            this.clientUtil.execute(BulkAction.INSTANCE, request, listener);
        } else {
            this.indexUtil.initCheckpointIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(initResponse -> {
                if (initResponse.isAcknowledged()) {
                    this.clientUtil.execute(BulkAction.INSTANCE, request, listener);
                } else {
                    listener.onFailure((Exception)new TimeSeriesException("Creating checkpoint with mappings call not acknowledged."));
                }
            }, exception -> {
                if (ExceptionsHelper.unwrapCause((Throwable)exception) instanceof ResourceAlreadyExistsException) {
                    this.clientUtil.execute(BulkAction.INSTANCE, request, listener);
                } else {
                    logger.error(String.format(Locale.ROOT, "Unexpected error creating checkpoint index", new Object[0]), (Throwable)exception);
                    listener.onFailure(exception);
                }
            }));
        }
    }

    protected Optional<Sample[]> toCheckpoint(Queue<Sample> samples) {
        if (samples == null || samples.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(samples.toArray(new Sample[0]));
    }

    public void batchRead(MultiGetRequest request, ActionListener<MultiGetResponse> listener) {
        this.clientUtil.execute(MultiGetAction.INSTANCE, request, listener);
    }

    public void read(GetRequest request, ActionListener<GetResponse> listener) {
        this.clientUtil.execute(GetAction.INSTANCE, request, listener);
    }

    public void deleteModelCheckpointByConfigId(String configId) {
        DeleteByQueryRequest deleteRequest = this.createDeleteCheckpointRequest(configId);
        logger.info("Delete checkpoints of config {}", (Object)configId);
        this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteRequest, ActionListener.wrap(response -> {
            if (response.isTimedOut() || !response.getBulkFailures().isEmpty() || !response.getSearchFailures().isEmpty()) {
                this.logFailure((BulkByScrollResponse)response, configId);
            }
            logger.info("{} checkpoints docs get deleted", (Object)response.getDeleted());
        }, exception -> {
            if (exception instanceof IndexNotFoundException) {
                logger.info("Checkpoint index has been deleted.  Has nothing to do: {}", (Object)configId);
            } else {
                logger.error(NOT_ABLE_TO_DELETE_CHECKPOINT_MSG, (Throwable)exception);
            }
        }));
    }

    protected Optional<Map<String, Object>> processRawCheckpoint(GetResponse response) {
        try {
            return Optional.ofNullable(response).filter(GetResponse::isExists).map(GetResponse::getSource);
        }
        catch (Exception e) {
            logger.error("Error processing raw checkpoint", (Throwable)e);
            return Optional.empty();
        }
    }

    public ModelState<RCFModelType> processHCGetResponse(GetResponse response, String modelId, String configId) {
        Optional<Map<String, Object>> checkpointString = this.processRawCheckpoint(response);
        if (checkpointString.isPresent()) {
            return this.fromEntityModelCheckpoint(checkpointString.get(), modelId, configId);
        }
        return null;
    }

    public ModelState<RCFModelType> processSingleStreamGetResponse(GetResponse response, String modelId, String configId) {
        Optional<Map<String, Object>> checkpointString = this.processRawCheckpoint(response);
        if (checkpointString.isPresent()) {
            return this.fromSingleStreamModelCheckpoint(checkpointString.get(), modelId, configId);
        }
        return null;
    }

    protected abstract ModelState<RCFModelType> fromEntityModelCheckpoint(Map<String, Object> var1, String var2, String var3);

    protected abstract ModelState<RCFModelType> fromSingleStreamModelCheckpoint(Map<String, Object> var1, String var2, String var3);

    public abstract Map<String, Object> toIndexSource(ModelState<RCFModelType> var1) throws IOException;

    protected abstract DeleteByQueryRequest createDeleteCheckpointRequest(String var1);

    protected Deque<Sample> loadSampleQueue(Map<String, Object> checkpoint, String modelId) {
        ArrayDeque<Sample> sampleQueue = new ArrayDeque<Sample>();
        List samples = (List)checkpoint.get("samples");
        if (samples != null) {
            samples.forEach(sampleMap -> {
                try {
                    Sample sample = Sample.extractSample(sampleMap);
                    if (sample != null) {
                        sampleQueue.add(sample);
                    }
                }
                catch (Exception e) {
                    logger.warn("Exception while deserializing samples for " + modelId, (Throwable)e);
                }
            });
        }
        return sampleQueue;
    }
}

