/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.timeseries.ratelimit;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.IndexableResult;
import org.opensearch.timeseries.ratelimit.BatchWorker;
import org.opensearch.timeseries.ratelimit.RequestPriority;
import org.opensearch.timeseries.ratelimit.ResultWriteRequest;
import org.opensearch.timeseries.transport.ResultBulkRequest;
import org.opensearch.timeseries.transport.ResultBulkResponse;
import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler;
import org.opensearch.timeseries.util.ExceptionUtil;

public abstract class ResultWriteWorker<ResultType extends IndexableResult, ResultWriteRequestType extends ResultWriteRequest<ResultType>, BatchRequestType extends ResultBulkRequest<ResultType, ResultWriteRequestType>, IndexType extends Enum<IndexType>, IndexManagementType extends IndexManagement<IndexType>, ResultHandlerType extends IndexMemoryPressureAwareResultHandler<ResultType, ResultWriteRequestType, BatchRequestType, ResultBulkResponse, IndexType, IndexManagementType>>
extends BatchWorker<ResultWriteRequestType, BatchRequestType, ResultBulkResponse> {
    private static final Logger LOG = LogManager.getLogger(ResultWriteWorker.class);
    protected final ResultHandlerType resultHandler;
    protected NamedXContentRegistry xContentRegistry;
    private CheckedFunction<XContentParser, ? extends ResultType, IOException> resultParser;

    public ResultWriteWorker(String queueName, long heapSize, int singleRequestSize, Setting<Float> maxHeapPercentForQueueSetting, ClusterService clusterService, Random random, CircuitBreakerService adCircuitBreakerService, ThreadPool threadPool, String threadPoolName, Settings settings, float maxQueuedTaskRatio, Clock clock, float mediumSegmentPruneRatio, float lowSegmentPruneRatio, int maintenanceFreqConstant, Setting<Integer> concurrencySetting, Duration executionTtl, Setting<Integer> batchSizeSetting, Duration stateTtl, NodeStateManager timeSeriesNodeStateManager, ResultHandlerType resultHandler, NamedXContentRegistry xContentRegistry, CheckedFunction<XContentParser, ? extends ResultType, IOException> resultParser, AnalysisType context) {
        super(queueName, heapSize, singleRequestSize, maxHeapPercentForQueueSetting, clusterService, random, adCircuitBreakerService, threadPool, threadPoolName, settings, maxQueuedTaskRatio, clock, mediumSegmentPruneRatio, lowSegmentPruneRatio, maintenanceFreqConstant, concurrencySetting, executionTtl, batchSizeSetting, stateTtl, timeSeriesNodeStateManager, context);
        this.resultHandler = resultHandler;
        this.xContentRegistry = xContentRegistry;
        this.resultParser = resultParser;
    }

    @Override
    protected void executeBatchRequest(BatchRequestType request, ActionListener<ResultBulkResponse> listener) {
        if (((ResultBulkRequest)((Object)request)).numberOfActions() < 1) {
            listener.onResponse(null);
            return;
        }
        ((IndexMemoryPressureAwareResultHandler)this.resultHandler).flush(request, listener);
    }

    @Override
    protected ActionListener<ResultBulkResponse> getResponseListener(List<ResultWriteRequestType> toProcess, BatchRequestType bulkRequest) {
        return ActionListener.wrap(adResultBulkResponse -> {
            if (adResultBulkResponse == null || !adResultBulkResponse.getRetryRequests().isPresent()) {
                return;
            }
            this.enqueueRetryRequestIteration(adResultBulkResponse.getRetryRequests().get(), 0);
        }, exception -> {
            if (ExceptionUtil.isRetryAble(exception)) {
                super.putAll(toProcess);
            } else if (ExceptionUtil.isOverloaded(exception)) {
                LOG.error("too many get model checkpoint requests or shard not avialble");
                this.setCoolDownStart();
            }
            for (ResultWriteRequest request : toProcess) {
                this.nodeStateManager.setException(request.getConfigId(), (Exception)exception);
            }
            LOG.error("Fail to save results", (Throwable)exception);
        });
    }

    private void enqueueRetryRequestIteration(List<IndexRequest> requestToRetry, int index) {
        if (index >= requestToRetry.size()) {
            return;
        }
        DocWriteRequest currentRequest = (DocWriteRequest)requestToRetry.get(index);
        Optional<ResultType> resultToRetry = this.getResult(currentRequest);
        if (!resultToRetry.isPresent()) {
            this.enqueueRetryRequestIteration(requestToRetry, index + 1);
            return;
        }
        IndexableResult result = (IndexableResult)resultToRetry.get();
        String id = result.getConfigId();
        this.nodeStateManager.getConfig(id, this.context, this.onGetConfig(requestToRetry, index, id, result));
    }

    protected Optional<ResultType> getResult(DocWriteRequest<?> request) {
        Optional<IndexableResult> optional;
        block9: {
            if (!(request instanceof IndexRequest)) {
                LOG.error((Message)new ParameterizedMessage("We should only send IndexRquest, but get [{}].", request));
                return Optional.empty();
            }
            IndexRequest indexRequest = (IndexRequest)request;
            BytesReference indexSource = indexRequest.source();
            MediaType indexContentType = indexRequest.getContentType();
            XContentParser xContentParser = XContentHelper.createParser((NamedXContentRegistry)this.xContentRegistry, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, (BytesReference)indexSource, (MediaType)indexContentType);
            try {
                xContentParser.nextToken();
                optional = Optional.of((IndexableResult)this.resultParser.apply((Object)xContentParser));
                if (xContentParser == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (xContentParser != null) {
                        try {
                            xContentParser.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    LOG.error((Message)new ParameterizedMessage("Fail to parse index request [{}]", request), (Throwable)e);
                    return Optional.empty();
                }
            }
            xContentParser.close();
        }
        return optional;
    }

    private ActionListener<Optional<? extends Config>> onGetConfig(List<IndexRequest> requestToRetry, int index, String id, ResultType resultToRetry) {
        return ActionListener.wrap(configOptional -> {
            if (!configOptional.isPresent()) {
                LOG.warn((Message)new ParameterizedMessage("Config [{}] is not available.", (Object)id));
                this.enqueueRetryRequestIteration(requestToRetry, index + 1);
                return;
            }
            Config config = (Config)configOptional.get();
            super.put(this.createResultWriteRequest(resultToRetry.getExecutionStartTime().toEpochMilli() + config.getIntervalInMilliseconds(), id, resultToRetry.isHighPriority() ? RequestPriority.HIGH : RequestPriority.MEDIUM, resultToRetry, config.getCustomResultIndexOrAlias()));
            this.enqueueRetryRequestIteration(requestToRetry, index + 1);
        }, exception -> {
            LOG.error((Message)new ParameterizedMessage("fail to get config [{}]", (Object)id), (Throwable)exception);
            this.enqueueRetryRequestIteration(requestToRetry, index + 1);
        });
    }

    protected abstract ResultWriteRequestType createResultWriteRequest(long var1, String var3, RequestPriority var4, ResultType var5, String var6);
}

