/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ml.model;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import java.io.File;
import java.nio.file.Path;
import java.security.PrivilegedActionException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.TokenBucket;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.connector.Connector;
import org.opensearch.ml.common.controller.MLController;
import org.opensearch.ml.common.controller.MLRateLimiter;
import org.opensearch.ml.common.exception.MLException;
import org.opensearch.ml.common.exception.MLLimitExceededException;
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
import org.opensearch.ml.common.exception.MLValidationException;
import org.opensearch.ml.common.model.Guardrails;
import org.opensearch.ml.common.model.MLGuard;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.deploy.MLDeployModelAction;
import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest;
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.ml.common.transport.upload_chunk.MLRegisterModelMetaInput;
import org.opensearch.ml.common.utils.StringUtils;
import org.opensearch.ml.engine.MLEngine;
import org.opensearch.ml.engine.MLExecutable;
import org.opensearch.ml.engine.ModelHelper;
import org.opensearch.ml.engine.Predictable;
import org.opensearch.ml.engine.indices.MLIndicesHandler;
import org.opensearch.ml.engine.utils.FileUtils;
import org.opensearch.ml.model.MLModelCacheHelper;
import org.opensearch.ml.profile.MLModelProfile;
import org.opensearch.ml.settings.MLCommonsSettings;
import org.opensearch.ml.stats.ActionName;
import org.opensearch.ml.stats.MLActionLevelStat;
import org.opensearch.ml.stats.MLNodeLevelStat;
import org.opensearch.ml.stats.MLStats;
import org.opensearch.ml.task.MLTaskManager;
import org.opensearch.ml.utils.MLExceptionUtils;
import org.opensearch.ml.utils.MLNodeUtils;
import org.opensearch.script.ScriptService;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.threadpool.ThreadPool;

public class MLModelManager {
    @Generated
    private static final Logger log = LogManager.getLogger(MLModelManager.class);
    public static final int TIMEOUT_IN_MILLIS = 5000;
    public static final long MODEL_FILE_SIZE_LIMIT = 0x100000000L;
    private final Client client;
    private final ClusterService clusterService;
    private final ScriptService scriptService;
    private ThreadPool threadPool;
    private NamedXContentRegistry xContentRegistry;
    private ModelHelper modelHelper;
    private final MLModelCacheHelper modelCacheHelper;
    private final MLStats mlStats;
    private final MLCircuitBreakerService mlCircuitBreakerService;
    private final MLIndicesHandler mlIndicesHandler;
    private final MLTaskManager mlTaskManager;
    private final MLEngine mlEngine;
    private final DiscoveryNodeHelper nodeHelper;
    private volatile Integer maxModelPerNode;
    private volatile Integer maxRegisterTasksPerNode;
    private volatile Integer maxDeployTasksPerNode;
    public static final ImmutableSet MODEL_DONE_STATES = ImmutableSet.of((Object)MLModelState.TRAINED, (Object)MLModelState.REGISTERED, (Object)MLModelState.DEPLOYED, (Object)MLModelState.PARTIALLY_DEPLOYED, (Object)MLModelState.DEPLOY_FAILED, (Object)MLModelState.UNDEPLOYED, (Object[])new MLModelState[0]);

    public MLModelManager(ClusterService clusterService, ScriptService scriptService, Client client, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, ModelHelper modelHelper, Settings settings, MLStats mlStats, MLCircuitBreakerService mlCircuitBreakerService, MLIndicesHandler mlIndicesHandler, MLTaskManager mlTaskManager, MLModelCacheHelper modelCacheHelper, MLEngine mlEngine, DiscoveryNodeHelper nodeHelper) {
        this.client = client;
        this.threadPool = threadPool;
        this.xContentRegistry = xContentRegistry;
        this.modelHelper = modelHelper;
        this.clusterService = clusterService;
        this.scriptService = scriptService;
        this.modelCacheHelper = modelCacheHelper;
        this.mlStats = mlStats;
        this.mlCircuitBreakerService = mlCircuitBreakerService;
        this.mlIndicesHandler = mlIndicesHandler;
        this.mlTaskManager = mlTaskManager;
        this.mlEngine = mlEngine;
        this.nodeHelper = nodeHelper;
        this.maxModelPerNode = (Integer)MLCommonsSettings.ML_COMMONS_MAX_MODELS_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_MODELS_PER_NODE, it -> {
            this.maxModelPerNode = it;
        });
        this.maxRegisterTasksPerNode = (Integer)MLCommonsSettings.ML_COMMONS_MAX_REGISTER_MODEL_TASKS_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_REGISTER_MODEL_TASKS_PER_NODE, it -> {
            this.maxRegisterTasksPerNode = it;
        });
        this.maxDeployTasksPerNode = (Integer)MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE, it -> {
            this.maxDeployTasksPerNode = it;
        });
    }

    public void registerModelMeta(MLRegisterModelMetaInput mlRegisterModelMetaInput, ActionListener<String> listener) {
        block11: {
            try {
                FunctionName functionName = mlRegisterModelMetaInput.getFunctionName();
                this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
                this.mlStats.createCounterStatIfAbsent(functionName, ActionName.REGISTER, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                String modelGroupId = mlRegisterModelMetaInput.getModelGroupId();
                if (Strings.isBlank((String)modelGroupId)) {
                    this.uploadMLModelMeta(mlRegisterModelMetaInput, "1", listener);
                    break block11;
                }
                try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
                    ActionListener wrappedListener = ActionListener.runBefore(listener, () -> context.restore());
                    GetRequest getModelGroupRequest = new GetRequest(".plugins-ml-model-group").id(modelGroupId);
                    this.client.get(getModelGroupRequest, ActionListener.wrap(modelGroup -> {
                        if (modelGroup.isExists()) {
                            Map modelGroupSource = modelGroup.getSourceAsMap();
                            int updatedVersion = this.incrementLatestVersion(modelGroupSource);
                            UpdateRequest updateModelGroupRequest = this.createUpdateModelGroupRequest(modelGroupSource, modelGroupId, modelGroup.getSeqNo(), modelGroup.getPrimaryTerm(), updatedVersion);
                            this.client.update(updateModelGroupRequest, ActionListener.wrap(r -> this.uploadMLModelMeta(mlRegisterModelMetaInput, "" + updatedVersion, (ActionListener<String>)wrappedListener), e -> {
                                log.error("Failed to update model group", (Throwable)e);
                                wrappedListener.onFailure(e);
                            }));
                        } else {
                            log.error("Model group not found");
                            wrappedListener.onFailure((Exception)new MLResourceNotFoundException("Fail to find model group"));
                        }
                    }, e -> {
                        if (e instanceof IndexNotFoundException) {
                            wrappedListener.onFailure((Exception)new MLResourceNotFoundException("Fail to find model group"));
                        } else {
                            log.error("Failed to get model group", (Throwable)e);
                            wrappedListener.onFailure((Exception)new MLValidationException("Failed to get model group"));
                        }
                    }));
                }
                catch (Exception e2) {
                    log.error("Failed to register model", (Throwable)e2);
                    listener.onFailure(e2);
                }
            }
            catch (Exception e3) {
                log.error("Failed to init model index", (Throwable)e3);
                listener.onFailure(e3);
            }
        }
    }

    private void uploadMLModelMeta(MLRegisterModelMetaInput mlRegisterModelMetaInput, String version, ActionListener<String> listener) {
        FunctionName functionName = mlRegisterModelMetaInput.getFunctionName();
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            ActionListener wrappedListener = ActionListener.runBefore(listener, () -> context.restore());
            String modelName = mlRegisterModelMetaInput.getName();
            this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> {
                Instant now = Instant.now();
                MLModel mlModelMeta = MLModel.builder().name(modelName).algorithm(functionName).version(version).modelGroupId(mlRegisterModelMetaInput.getModelGroupId()).description(mlRegisterModelMetaInput.getDescription()).isEnabled(mlRegisterModelMetaInput.getIsEnabled()).rateLimiter(mlRegisterModelMetaInput.getRateLimiter()).modelFormat(mlRegisterModelMetaInput.getModelFormat()).modelState(MLModelState.REGISTERING).modelConfig(mlRegisterModelMetaInput.getModelConfig()).deploySetting(mlRegisterModelMetaInput.getDeploySetting()).totalChunks(mlRegisterModelMetaInput.getTotalChunks()).modelContentHash(mlRegisterModelMetaInput.getModelContentHashValue()).modelContentSizeInBytes(mlRegisterModelMetaInput.getModelContentSizeInBytes()).isHidden(mlRegisterModelMetaInput.getIsHidden()).modelInterface(mlRegisterModelMetaInput.getModelInterface()).createdTime(now).lastUpdateTime(now).build();
                IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                if (mlRegisterModelMetaInput.getIsHidden() != null && mlRegisterModelMetaInput.getIsHidden().booleanValue()) {
                    indexRequest.id(modelName);
                }
                indexRequest.source(mlModelMeta.toXContent(XContentBuilder.builder((XContent)XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                this.client.index(indexRequest, ActionListener.wrap(response -> {
                    log.debug("Index model meta doc successfully {}", (Object)modelName);
                    wrappedListener.onResponse((Object)response.getId());
                }, e -> {
                    this.deleteOrUpdateModelGroup(mlRegisterModelMetaInput.getModelGroupId(), mlRegisterModelMetaInput.getDoesVersionCreateModelGroup(), version);
                    log.error("Failed to index model meta doc", (Throwable)e);
                    wrappedListener.onFailure(e);
                }));
            }, ex -> {
                log.error("Failed to init model index", (Throwable)ex);
                wrappedListener.onFailure(ex);
            }));
        }
        catch (Exception e) {
            log.error("Failed to register model", (Throwable)e);
            listener.onFailure(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerMLRemoteModel(MLRegisterModelInput mlRegisterModelInput, MLTask mlTask, ActionListener<MLRegisterModelResponse> listener) {
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            this.checkAndAddRunningTask(mlTask, this.maxRegisterTasksPerNode);
            this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
            this.mlStats.createCounterStatIfAbsent(mlTask.getFunctionName(), ActionName.REGISTER, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).increment();
            String modelGroupId = mlRegisterModelInput.getModelGroupId();
            GetRequest getModelGroupRequest = new GetRequest(".plugins-ml-model-group").id(modelGroupId);
            this.client.get(getModelGroupRequest, ActionListener.wrap(getModelGroupResponse -> {
                if (getModelGroupResponse.isExists()) {
                    Map modelGroupSourceMap = getModelGroupResponse.getSourceAsMap();
                    int updatedVersion = this.incrementLatestVersion(modelGroupSourceMap);
                    UpdateRequest updateModelGroupRequest = this.createUpdateModelGroupRequest(modelGroupSourceMap, modelGroupId, getModelGroupResponse.getSeqNo(), getModelGroupResponse.getPrimaryTerm(), updatedVersion);
                    this.client.update(updateModelGroupRequest, ActionListener.wrap(r -> this.indexRemoteModel(mlRegisterModelInput, mlTask, "" + updatedVersion, listener), e -> {
                        log.error("Failed to update model group " + modelGroupId, (Throwable)e);
                        this.handleException(mlRegisterModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)e);
                        listener.onFailure(e);
                    }));
                } else {
                    log.error("Model group response is empty");
                    this.handleException(mlRegisterModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)new MLValidationException("Model group not found"));
                    listener.onFailure((Exception)new MLResourceNotFoundException("Model Group Response is empty for " + modelGroupId));
                }
            }, error -> {
                if (error instanceof IndexNotFoundException) {
                    log.error("Model group Index is missing");
                    this.handleException(mlRegisterModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)new MLResourceNotFoundException("Failed to get model group due to index missing"));
                    listener.onFailure(error);
                } else {
                    log.error("Failed to get model group", (Throwable)error);
                    this.handleException(mlRegisterModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)error);
                    listener.onFailure(error);
                }
            }));
        }
        catch (Exception e) {
            log.error("Failed to register remote model", (Throwable)e);
            this.handleException(mlRegisterModelInput.getFunctionName(), mlTask.getTaskId(), e);
            listener.onFailure(e);
        }
        finally {
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerMLModel(MLRegisterModelInput registerModelInput, MLTask mlTask) {
        this.checkAndAddRunningTask(mlTask, this.maxRegisterTasksPerNode);
        try {
            this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
            this.mlStats.createCounterStatIfAbsent(mlTask.getFunctionName(), ActionName.REGISTER, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).increment();
            String modelGroupId = registerModelInput.getModelGroupId();
            GetRequest getModelGroupRequest = new GetRequest(".plugins-ml-model-group").id(modelGroupId);
            try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
                this.client.get(getModelGroupRequest, ActionListener.runBefore((ActionListener)ActionListener.wrap(modelGroup -> {
                    if (modelGroup.isExists()) {
                        Map modelGroupSourceMap = modelGroup.getSourceAsMap();
                        int updatedVersion = this.incrementLatestVersion(modelGroupSourceMap);
                        UpdateRequest updateModelGroupRequest = this.createUpdateModelGroupRequest(modelGroupSourceMap, modelGroupId, modelGroup.getSeqNo(), modelGroup.getPrimaryTerm(), updatedVersion);
                        try (ThreadContext.StoredContext threadContext = this.client.threadPool().getThreadContext().stashContext();){
                            this.client.update(updateModelGroupRequest, ActionListener.wrap(r -> this.uploadModel(registerModelInput, mlTask, "" + updatedVersion), e -> {
                                log.error("Failed to update model group", (Throwable)e);
                                this.handleException(registerModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)e);
                            }));
                        }
                    } else {
                        log.error("Model group not found");
                        this.handleException(registerModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)new MLValidationException("Model group not found"));
                    }
                }, e -> {
                    if (e instanceof IndexNotFoundException) {
                        this.handleException(registerModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)new MLResourceNotFoundException("Failed to get model group"));
                    } else {
                        log.error("Failed to get model group", (Throwable)e);
                        this.handleException(registerModelInput.getFunctionName(), mlTask.getTaskId(), (Exception)e);
                    }
                }), () -> context.restore()));
            }
            catch (Exception e2) {
                log.error("Failed to register model", (Throwable)e2);
                this.handleException(registerModelInput.getFunctionName(), mlTask.getTaskId(), e2);
            }
        }
        catch (Exception e3) {
            this.handleException(registerModelInput.getFunctionName(), mlTask.getTaskId(), e3);
        }
        finally {
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
        }
    }

    private UpdateRequest createUpdateModelGroupRequest(Map<String, Object> modelGroupSourceMap, String modelGroupId, long seqNo, long primaryTerm, int updatedVersion) {
        modelGroupSourceMap.put("latest_version", updatedVersion);
        modelGroupSourceMap.put("last_updated_time", Instant.now().toEpochMilli());
        UpdateRequest updateModelGroupRequest = new UpdateRequest();
        ((UpdateRequest)updateModelGroupRequest.index(".plugins-ml-model-group")).id(modelGroupId).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).doc(modelGroupSourceMap);
        return updateModelGroupRequest;
    }

    private int incrementLatestVersion(Map<String, Object> modelGroupSourceMap) {
        return (Integer)modelGroupSourceMap.get("latest_version") + 1;
    }

    private void indexRemoteModel(MLRegisterModelInput registerModelInput, MLTask mlTask, String modelVersion, ActionListener<MLRegisterModelResponse> listener) {
        String taskId = mlTask.getTaskId();
        FunctionName functionName = mlTask.getFunctionName();
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            String modelName = registerModelInput.getModelName();
            String version = modelVersion == null ? registerModelInput.getVersion() : modelVersion;
            Instant now = Instant.now();
            if (registerModelInput.getConnector() != null) {
                registerModelInput.getConnector().encrypt(arg_0 -> ((MLEngine)this.mlEngine).encrypt(arg_0));
            }
            this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(boolResponse -> {
                MLModel mlModelMeta = MLModel.builder().name(modelName).algorithm(functionName).modelGroupId(registerModelInput.getModelGroupId()).version(version).description(registerModelInput.getDescription()).rateLimiter(registerModelInput.getRateLimiter()).modelFormat(registerModelInput.getModelFormat()).modelState(MLModelState.REGISTERED).connector(registerModelInput.getConnector()).connectorId(registerModelInput.getConnectorId()).modelConfig(registerModelInput.getModelConfig()).deploySetting(registerModelInput.getDeploySetting()).createdTime(now).lastUpdateTime(now).isHidden(registerModelInput.getIsHidden()).guardrails(registerModelInput.getGuardrails()).modelInterface(registerModelInput.getModelInterface()).build();
                IndexRequest indexModelMetaRequest = new IndexRequest(".plugins-ml-model");
                if (registerModelInput.getIsHidden() != null && registerModelInput.getIsHidden().booleanValue()) {
                    indexModelMetaRequest.id(modelName);
                }
                indexModelMetaRequest.source(mlModelMeta.toXContent(XContentBuilder.builder((XContent)XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexModelMetaRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                ActionListener indexListener = ActionListener.wrap(modelMetaRes -> {
                    String modelId = modelMetaRes.getId();
                    mlTask.setModelId(modelId);
                    log.info("create new model meta doc {} for upload task {}", (Object)modelId, (Object)taskId);
                    this.mlTaskManager.updateMLTask(taskId, Map.of("model_id", modelId, "state", MLTaskState.COMPLETED), 5000L, true);
                    if (registerModelInput.isDeployModel()) {
                        this.deployModelAfterRegistering(registerModelInput, modelId);
                    }
                    listener.onResponse((Object)new MLRegisterModelResponse(taskId, MLTaskState.CREATED.name(), modelId));
                }, e -> {
                    log.error("Failed to index model meta doc", (Throwable)e);
                    this.handleException(functionName, taskId, (Exception)e);
                    listener.onFailure(e);
                });
                this.client.index(indexModelMetaRequest, this.threadedActionListener("opensearch_ml_register", indexListener));
            }, error -> {
                log.error("Failed to init model index", (Throwable)error);
                this.handleException(functionName, taskId, (Exception)error);
                listener.onFailure(error);
            }));
        }
    }

    @VisibleForTesting
    void indexRemoteModel(MLRegisterModelInput registerModelInput, MLTask mlTask, String modelVersion) {
        String taskId = mlTask.getTaskId();
        FunctionName functionName = mlTask.getFunctionName();
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            String modelName = registerModelInput.getModelName();
            String version = modelVersion == null ? registerModelInput.getVersion() : modelVersion;
            Instant now = Instant.now();
            if (registerModelInput.getConnector() != null) {
                registerModelInput.getConnector().encrypt(arg_0 -> ((MLEngine)this.mlEngine).encrypt(arg_0));
            }
            this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore((ActionListener)ActionListener.wrap(res -> {
                MLModel mlModelMeta = MLModel.builder().name(modelName).algorithm(functionName).modelGroupId(registerModelInput.getModelGroupId()).version(version).description(registerModelInput.getDescription()).rateLimiter(registerModelInput.getRateLimiter()).modelFormat(registerModelInput.getModelFormat()).modelState(MLModelState.REGISTERED).connector(registerModelInput.getConnector()).connectorId(registerModelInput.getConnectorId()).modelConfig(registerModelInput.getModelConfig()).deploySetting(registerModelInput.getDeploySetting()).createdTime(now).lastUpdateTime(now).isHidden(registerModelInput.getIsHidden()).guardrails(registerModelInput.getGuardrails()).modelInterface(registerModelInput.getModelInterface()).build();
                IndexRequest indexModelMetaRequest = new IndexRequest(".plugins-ml-model");
                if (registerModelInput.getIsHidden() != null && registerModelInput.getIsHidden().booleanValue()) {
                    indexModelMetaRequest.id(modelName);
                }
                indexModelMetaRequest.source(mlModelMeta.toXContent(XContentBuilder.builder((XContent)XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexModelMetaRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                ActionListener indexListener = ActionListener.wrap(modelMetaRes -> {
                    String modelId = modelMetaRes.getId();
                    mlTask.setModelId(modelId);
                    log.info("create new model meta doc {} for upload task {}", (Object)modelId, (Object)taskId);
                    this.mlTaskManager.updateMLTask(taskId, Map.of("model_id", modelId, "state", MLTaskState.COMPLETED), 5000L, true);
                    if (registerModelInput.isDeployModel()) {
                        this.deployModelAfterRegistering(registerModelInput, modelId);
                    }
                }, e -> {
                    log.error("Failed to index model meta doc", (Throwable)e);
                    this.handleException(functionName, taskId, (Exception)e);
                });
                this.client.index(indexModelMetaRequest, this.threadedActionListener("opensearch_ml_register", indexListener));
            }, e -> {
                log.error("Failed to init model index", (Throwable)e);
                this.handleException(functionName, taskId, (Exception)e);
            }), () -> context.restore()));
        }
        catch (Exception e2) {
            MLExceptionUtils.logException("Failed to upload model", e2, log);
            this.handleException(functionName, taskId, e2);
        }
    }

    private void uploadModel(MLRegisterModelInput registerModelInput, MLTask mlTask, String modelVersion) throws PrivilegedActionException {
        if (registerModelInput.getUrl() != null) {
            this.registerModelFromUrl(registerModelInput, mlTask, modelVersion);
        } else if (registerModelInput.getFunctionName() == FunctionName.REMOTE || registerModelInput.getConnectorId() != null) {
            this.indexRemoteModel(registerModelInput, mlTask, modelVersion);
        } else {
            this.registerPrebuiltModel(registerModelInput, mlTask, modelVersion);
        }
    }

    private void registerModelFromUrl(MLRegisterModelInput registerModelInput, MLTask mlTask, String modelVersion) {
        String taskId = mlTask.getTaskId();
        FunctionName functionName = mlTask.getFunctionName();
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            String modelName = registerModelInput.getModelName();
            String version = modelVersion == null ? registerModelInput.getVersion() : modelVersion;
            String modelGroupId = registerModelInput.getModelGroupId();
            Instant now = Instant.now();
            this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore((ActionListener)ActionListener.wrap(res -> {
                MLModel mlModelMeta = MLModel.builder().name(modelName).modelGroupId(modelGroupId).algorithm(functionName).version(version).description(registerModelInput.getDescription()).rateLimiter(registerModelInput.getRateLimiter()).modelFormat(registerModelInput.getModelFormat()).modelState(MLModelState.REGISTERING).modelConfig(registerModelInput.getModelConfig()).deploySetting(registerModelInput.getDeploySetting()).createdTime(now).lastUpdateTime(now).isHidden(registerModelInput.getIsHidden()).guardrails(registerModelInput.getGuardrails()).modelInterface(registerModelInput.getModelInterface()).build();
                IndexRequest indexModelMetaRequest = new IndexRequest(".plugins-ml-model");
                if (functionName == FunctionName.METRICS_CORRELATION) {
                    indexModelMetaRequest.id(functionName.name());
                }
                if (registerModelInput.getIsHidden() != null && registerModelInput.getIsHidden().booleanValue()) {
                    indexModelMetaRequest.id(modelName);
                }
                indexModelMetaRequest.source(mlModelMeta.toXContent(XContentBuilder.builder((XContent)XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexModelMetaRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                ActionListener listener = ActionListener.wrap(modelMetaRes -> {
                    String modelId = modelMetaRes.getId();
                    mlTask.setModelId(modelId);
                    log.info("create new model meta doc {} for register model task {}", (Object)modelId, (Object)taskId);
                    this.registerModel(registerModelInput, taskId, functionName, modelName, version, modelId);
                }, e -> {
                    log.error("Failed to index model meta doc", (Throwable)e);
                    this.handleException(functionName, taskId, (Exception)e);
                });
                this.client.index(indexModelMetaRequest, this.threadedActionListener("opensearch_ml_register", listener));
            }, e -> {
                log.error("Failed to init model index", (Throwable)e);
                this.handleException(functionName, taskId, (Exception)e);
            }), () -> context.restore()));
        }
        catch (Exception e2) {
            MLExceptionUtils.logException("Failed to register model", e2, log);
            this.handleException(functionName, taskId, e2);
        }
    }

    private void registerModel(MLRegisterModelInput registerModelInput, String taskId, FunctionName functionName, String modelName, String version, String modelId) {
        this.modelHelper.downloadAndSplit(registerModelInput.getModelFormat(), modelId, modelName, version, registerModelInput.getUrl(), registerModelInput.getHashValue(), functionName, ActionListener.wrap(result -> {
            Long modelSizeInBytes = (Long)result.get("model_size_in_bytes");
            if (modelSizeInBytes >= 0x100000000L) {
                throw new MLException("Model file size exceeds the limit of 4GB: " + modelSizeInBytes);
            }
            List chunkFiles = (List)result.get("chunk_files");
            String hashValue = (String)result.get("model_file_hash");
            Semaphore semaphore = new Semaphore(1);
            AtomicInteger uploaded = new AtomicInteger(0);
            AtomicBoolean failedToUploadChunk = new AtomicBoolean(false);
            for (String name : chunkFiles) {
                semaphore.tryAcquire(10L, TimeUnit.SECONDS);
                if (failedToUploadChunk.get()) {
                    throw new MLException("Failed to save model chunk");
                }
                File file = new File(name);
                byte[] bytes = Files.toByteArray((File)file);
                int chunkNum = Integer.parseInt(file.getName());
                Instant now = Instant.now();
                MLModel mlModel = MLModel.builder().modelId(modelId).name(modelName).algorithm(functionName).version(version).modelFormat(registerModelInput.getModelFormat()).rateLimiter(registerModelInput.getRateLimiter()).isEnabled(registerModelInput.getIsEnabled()).chunkNumber(Integer.valueOf(chunkNum)).totalChunks(Integer.valueOf(chunkFiles.size())).content(Base64.getEncoder().encodeToString(bytes)).createdTime(now).lastUpdateTime(now).isHidden(registerModelInput.getIsHidden()).guardrails(registerModelInput.getGuardrails()).modelInterface(registerModelInput.getModelInterface()).build();
                IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                if (registerModelInput.getIsHidden() != null && registerModelInput.getIsHidden().booleanValue()) {
                    indexRequest.id(modelName);
                }
                String chunkId = this.getModelChunkId(modelId, chunkNum);
                indexRequest.id(chunkId);
                indexRequest.source(mlModel.toXContent(XContentBuilder.builder((XContent)XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                this.client.index(indexRequest, ActionListener.wrap(r -> {
                    uploaded.getAndIncrement();
                    if (uploaded.get() == chunkFiles.size()) {
                        this.updateModelRegisterStateAsDone(registerModelInput, taskId, modelId, modelSizeInBytes, chunkFiles, hashValue, version);
                    } else {
                        FileUtils.deleteFileQuietly((File)file);
                    }
                    semaphore.release();
                }, e -> {
                    log.error("Failed to index model chunk " + chunkId, (Throwable)e);
                    failedToUploadChunk.set(true);
                    this.handleException(functionName, taskId, (Exception)e);
                    FileUtils.deleteFileQuietly((File)file);
                    this.deleteModel(modelId, registerModelInput, version);
                    semaphore.release();
                    FileUtils.deleteFileQuietly((Path)this.mlEngine.getRegisterModelPath(modelId));
                }));
            }
        }, e -> {
            log.error("Failed to index chunk file", (Throwable)e);
            FileUtils.deleteFileQuietly((Path)this.mlEngine.getRegisterModelPath(modelId));
            this.deleteModel(modelId, registerModelInput, version);
            this.handleException(functionName, taskId, (Exception)e);
        }));
    }

    private void registerPrebuiltModel(MLRegisterModelInput registerModelInput, MLTask mlTask, String modelVersion) throws PrivilegedActionException {
        String taskId = mlTask.getTaskId();
        List modelMetaList = this.modelHelper.downloadPrebuiltModelMetaList(taskId, registerModelInput);
        if (!this.modelHelper.isModelAllowed(registerModelInput, modelMetaList)) {
            throw new IllegalArgumentException("This model is not in the pre-trained model list, please check your parameters.");
        }
        this.modelHelper.downloadPrebuiltModelConfig(taskId, registerModelInput, ActionListener.wrap(mlRegisterModelInput -> {
            mlTask.setFunctionName(mlRegisterModelInput.getFunctionName());
            this.mlTaskManager.updateMLTask(taskId, Map.of("function_name", mlRegisterModelInput.getFunctionName()), 5000L, false);
            this.registerModelFromUrl((MLRegisterModelInput)mlRegisterModelInput, mlTask, modelVersion);
        }, e -> {
            log.error("Failed to register prebuilt model", (Throwable)e);
            this.handleException(registerModelInput.getFunctionName(), taskId, (Exception)e);
        }));
    }

    private <T> ThreadedActionListener<T> threadedActionListener(String threadPoolName, ActionListener<T> listener) {
        return new ThreadedActionListener(log, this.threadPool, threadPoolName, listener, false);
    }

    public void checkAndAddRunningTask(MLTask mlTask, Integer runningTaskLimit) {
        MLNodeUtils.checkOpenCircuitBreaker(this.mlCircuitBreakerService, this.mlStats);
        this.mlTaskManager.checkLimitAndAddRunningTask(mlTask, runningTaskLimit);
    }

    private void updateModelRegisterStateAsDone(MLRegisterModelInput registerModelInput, String taskId, String modelId, Long modelSizeInBytes, List<String> chunkFiles, String hashValue, String version) {
        FunctionName functionName = registerModelInput.getFunctionName();
        FileUtils.deleteFileQuietly((Path)this.mlEngine.getRegisterModelPath(modelId));
        Map<String, Object> updatedFields = Map.of("model_state", MLModelState.REGISTERED, "last_registered_time", Instant.now().toEpochMilli(), "total_chunks", chunkFiles.size(), "model_content_hash_value", hashValue, "model_content_size_in_bytes", modelSizeInBytes);
        log.info("Model registered successfully, model id: {}, task id: {}", (Object)modelId, (Object)taskId);
        this.updateModel(modelId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(updateResponse -> {
            this.mlTaskManager.updateMLTask(taskId, Map.of("state", MLTaskState.COMPLETED, "model_id", modelId), 5000L, true);
            if (registerModelInput.isDeployModel()) {
                this.deployModelAfterRegistering(registerModelInput, modelId);
            }
        }, e -> {
            log.error("Failed to update model", (Throwable)e);
            this.handleException(functionName, taskId, (Exception)e);
            this.deleteModel(modelId, registerModelInput, version);
        }));
    }

    @VisibleForTesting
    void deployModelAfterRegistering(MLRegisterModelInput registerModelInput, String modelId) {
        Object[] modelNodeIds = registerModelInput.getModelNodeIds();
        log.debug("start deploying model after registering, modelId: {} on nodes: {}", (Object)modelId, (Object)Arrays.toString(modelNodeIds));
        MLDeployModelRequest request = new MLDeployModelRequest(modelId, (String[])modelNodeIds, false, true, true);
        ActionListener listener = ActionListener.wrap(r -> log.debug("model deployed, response {}", r), e -> log.error("Failed to deploy model", (Throwable)e));
        this.client.execute((ActionType)MLDeployModelAction.INSTANCE, (ActionRequest)request, listener);
    }

    private void deleteModel(String modelId, MLRegisterModelInput registerModelInput, String modelVersion) {
        DeleteRequest deleteRequest = new DeleteRequest();
        ((DeleteRequest)deleteRequest.index(".plugins-ml-model")).id(modelId).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.delete(deleteRequest);
        DeleteByQueryRequest deleteChunksRequest = (DeleteByQueryRequest)new DeleteByQueryRequest(new String[]{".plugins-ml-model"}).setQuery((QueryBuilder)new TermQueryBuilder("model_id", modelId)).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setAbortOnVersionConflict(false);
        this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteChunksRequest);
        this.deleteOrUpdateModelGroup(registerModelInput.getModelGroupId(), registerModelInput.getDoesVersionCreateModelGroup(), modelVersion);
    }

    private void deleteOrUpdateModelGroup(String modelGroupID, Boolean doesVersionCreateModelGroup, String modelVersion) {
        if (doesVersionCreateModelGroup.booleanValue()) {
            DeleteRequest deleteModelGroupRequest = new DeleteRequest();
            ((DeleteRequest)deleteModelGroupRequest.index(".plugins-ml-model-group")).id(modelGroupID).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            this.client.delete(deleteModelGroupRequest);
        } else {
            this.updateLatestVersionInModelGroup(modelGroupID, Integer.parseInt(modelVersion) - 1, (ActionListener<UpdateResponse>)ActionListener.wrap(r -> log.debug("model group updated, response {}", r), e -> log.error("Failed to update model group", (Throwable)e)));
        }
    }

    private void updateLatestVersionInModelGroup(String modelGroupID, Integer latestVersion, ActionListener<UpdateResponse> listener) {
        HashMap<String, Number> updatedFields = new HashMap<String, Number>();
        updatedFields.put("latest_version", latestVersion);
        updatedFields.put("last_updated_time", Instant.now().toEpochMilli());
        UpdateRequest updateRequest = new UpdateRequest(".plugins-ml-model-group", modelGroupID);
        updateRequest.doc(updatedFields);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            this.client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    private void handleException(FunctionName functionName, String taskId, Exception e) {
        if (!(e instanceof MLLimitExceededException || e instanceof MLResourceNotFoundException || e instanceof IllegalArgumentException)) {
            this.mlStats.createCounterStatIfAbsent(functionName, ActionName.REGISTER, MLActionLevelStat.ML_ACTION_FAILURE_COUNT).increment();
            this.mlStats.getStat(MLNodeLevelStat.ML_FAILURE_COUNT).increment();
        }
        Map<String, MLTaskState> updated = Map.of("error", MLExceptionUtils.getRootCauseMessage(e), "state", MLTaskState.FAILED);
        this.mlTaskManager.updateMLTask(taskId, updated, 5000L, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deployModel(String modelId, String modelContentHash, FunctionName functionName, boolean deployToAllNodes, boolean autoDeployModel, MLTask mlTask, ActionListener<String> listener) {
        this.mlStats.createCounterStatIfAbsent(functionName, ActionName.DEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
        this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).increment();
        this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
        this.mlStats.createModelCounterStatIfAbsent(modelId, ActionName.DEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
        List workerNodes = mlTask.getWorkerNodes();
        if (this.modelCacheHelper.isModelDeployed(modelId)) {
            if (!autoDeployModel && workerNodes != null && workerNodes.size() > 0) {
                log.info("Set new target node ids {} for model {}", (Object)Arrays.toString(workerNodes.toArray(new String[0])), (Object)modelId);
                this.modelCacheHelper.setDeployToAllNodes(modelId, deployToAllNodes);
                this.modelCacheHelper.setTargetWorkerNodes(modelId, workerNodes);
                this.modelCacheHelper.refreshLastAccessTime(modelId);
            }
            listener.onResponse((Object)"successful");
            return;
        }
        if (this.modelCacheHelper.getLocalDeployedModels().length >= this.maxModelPerNode) {
            listener.onFailure((Exception)new IllegalArgumentException("Exceed max local model per node limit"));
            return;
        }
        int eligibleNodeCount = workerNodes.size();
        if (!autoDeployModel) {
            this.modelCacheHelper.initModelState(modelId, MLModelState.DEPLOYING, functionName, workerNodes, deployToAllNodes);
        } else {
            this.modelCacheHelper.initModelStateAutoDeploy(modelId, MLModelState.DEPLOYING, functionName, workerNodes);
        }
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            ActionListener wrappedListener = ActionListener.runBefore(listener, () -> {
                context.restore();
                this.modelCacheHelper.removeAutoDeployModel(modelId);
                this.modelCacheHelper.setIsAutoDeploying(modelId, false);
            });
            if (!autoDeployModel) {
                this.checkAndAddRunningTask(mlTask, this.maxDeployTasksPerNode);
            }
            this.getModel(modelId, (ActionListener<MLModel>)this.threadedActionListener("opensearch_ml_deploy", ActionListener.wrap(mlModel -> {
                this.modelCacheHelper.setIsModelEnabled(modelId, mlModel.getIsEnabled());
                this.modelCacheHelper.setModelInfo(modelId, (MLModel)mlModel);
                if (FunctionName.REMOTE == mlModel.getAlgorithm() || !FunctionName.isDLModel((FunctionName)mlModel.getAlgorithm()) && mlModel.getAlgorithm() != FunctionName.METRICS_CORRELATION) {
                    if (BooleanUtils.isTrue((Boolean)mlModel.getIsControllerEnabled())) {
                        this.getController(modelId, (ActionListener<MLController>)ActionListener.wrap(controller -> {
                            this.setupUserRateLimiterMap(modelId, eligibleNodeCount, controller.getUserRateLimiter());
                            log.info("Successfully redeployed model controller for model " + modelId);
                            log.info("Trying to deploy remote model with model controller configured.");
                            this.deployRemoteOrBuiltInModel((MLModel)mlModel, eligibleNodeCount, (ActionListener<String>)wrappedListener);
                        }, e -> {
                            log.error("Trying to deploy remote model with exceptions in re-deploying its model controller. Model ID: " + modelId, (Throwable)e);
                            this.deployRemoteOrBuiltInModel((MLModel)mlModel, eligibleNodeCount, (ActionListener<String>)wrappedListener);
                        }));
                        return;
                    }
                    log.info("Trying to deploy remote or built-in model without model controller configured.");
                    this.deployRemoteOrBuiltInModel((MLModel)mlModel, eligibleNodeCount, (ActionListener<String>)wrappedListener);
                    return;
                }
                this.setupRateLimiter(modelId, eligibleNodeCount, mlModel.getRateLimiter());
                this.setupMLGuard(modelId, mlModel.getGuardrails());
                this.setupModelInterface(modelId, mlModel.getModelInterface());
                this.deployControllerWithDeployingModel((MLModel)mlModel, eligibleNodeCount);
                MLNodeUtils.checkOpenCircuitBreaker(this.mlCircuitBreakerService, this.mlStats);
                this.retrieveModelChunks((MLModel)mlModel, (ActionListener<File>)ActionListener.wrap(modelZipFile -> {
                    String hash = FileUtils.calculateFileHash((File)modelZipFile);
                    if (modelContentHash != null && !modelContentHash.equals(hash)) {
                        log.error("Model content hash can't match original hash value");
                        this.removeModel(modelId);
                        wrappedListener.onFailure((Exception)new IllegalArgumentException("model content changed"));
                        return;
                    }
                    log.debug("Model content matches original hash value, continue deploying");
                    Map<String, MLEngine> params = Map.of("model_zip_file", modelZipFile, "model_helper", this.modelHelper, "ml_engine", this.mlEngine);
                    if (FunctionName.METRICS_CORRELATION.equals((Object)mlModel.getAlgorithm())) {
                        MLExecutable mlExecutable = this.mlEngine.deployExecute(mlModel, params);
                        try {
                            this.modelCacheHelper.setMLExecutor(modelId, mlExecutable);
                            this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
                            this.modelCacheHelper.setModelState(modelId, MLModelState.DEPLOYED);
                            this.modelCacheHelper.refreshLastAccessTime(modelId);
                            wrappedListener.onResponse((Object)"successful");
                        }
                        catch (Exception e) {
                            log.error("Failed to add predictor to cache", (Throwable)e);
                            mlExecutable.close();
                            wrappedListener.onFailure(e);
                        }
                    } else {
                        Predictable predictable = this.mlEngine.deploy(mlModel, params);
                        try {
                            this.modelCacheHelper.setPredictor(modelId, predictable);
                            this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
                            this.modelCacheHelper.setModelState(modelId, MLModelState.DEPLOYED);
                            this.modelCacheHelper.refreshLastAccessTime(modelId);
                            Long modelContentSizeInBytes = mlModel.getModelContentSizeInBytes();
                            long contentSize = modelContentSizeInBytes == null ? (long)(mlModel.getTotalChunks() * 10000000) : modelContentSizeInBytes;
                            this.modelCacheHelper.setMemSizeEstimation(modelId, mlModel.getModelFormat(), contentSize);
                            wrappedListener.onResponse((Object)"successful");
                        }
                        catch (Exception e) {
                            log.error("Failed to add predictor to cache", (Throwable)e);
                            predictable.close();
                            wrappedListener.onFailure(e);
                        }
                    }
                }, e -> {
                    log.error("Failed to retrieve model " + modelId, (Throwable)e);
                    this.handleDeployModelException(modelId, functionName, (ActionListener<String>)wrappedListener, (Exception)e);
                }));
            }, e -> {
                log.error("Failed to deploy model " + modelId, (Throwable)e);
                this.handleDeployModelException(modelId, functionName, (ActionListener<String>)wrappedListener, (Exception)e);
            })));
        }
        catch (Exception e2) {
            this.handleDeployModelException(modelId, functionName, listener, e2);
        }
        finally {
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
        }
    }

    public void deployRemoteModelToLocal(String modelId, MLModel mlModel, ActionListener<String> listener) {
        if (this.modelCacheHelper.isModelDeployed(modelId)) {
            listener.onResponse((Object)"Success");
            return;
        }
        this.modelCacheHelper.initModelState(modelId, MLModelState.DEPLOYING, FunctionName.REMOTE, new ArrayList<String>(), mlModel.isDeployToAllNodes());
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            ActionListener wrappedListener = ActionListener.runBefore(listener, () -> ((ThreadContext.StoredContext)context).restore());
            this.modelCacheHelper.setIsModelEnabled(modelId, mlModel.getIsEnabled());
            this.deployRemoteOrBuiltInModel(mlModel, 1, (ActionListener<String>)wrappedListener);
        }
        catch (Exception e) {
            log.error("Failed to deploy model to local node" + modelId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    private void deployRemoteOrBuiltInModel(MLModel mlModel, Integer eligibleNodeCount, ActionListener<String> wrappedListener) {
        String modelId = mlModel.getModelId();
        this.setupRateLimiter(modelId, eligibleNodeCount, mlModel.getRateLimiter());
        this.setupMLGuard(modelId, mlModel.getGuardrails());
        this.setupModelInterface(modelId, mlModel.getModelInterface());
        if (mlModel.getConnector() != null || FunctionName.REMOTE != mlModel.getAlgorithm()) {
            this.setupParamsAndPredictable(modelId, mlModel);
            this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
            this.modelCacheHelper.setModelState(modelId, MLModelState.DEPLOYED);
            this.modelCacheHelper.refreshLastAccessTime(modelId);
            wrappedListener.onResponse((Object)"successful");
            return;
        }
        log.info("Set connector {} for the model: {}", (Object)mlModel.getConnectorId(), (Object)modelId);
        this.getConnector(mlModel.getConnectorId(), (ActionListener<Connector>)ActionListener.wrap(connector -> {
            mlModel.setConnector(connector);
            this.setupParamsAndPredictable(modelId, mlModel);
            this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
            this.modelCacheHelper.setModelState(modelId, MLModelState.DEPLOYED);
            this.modelCacheHelper.refreshLastAccessTime(modelId);
            wrappedListener.onResponse((Object)"successful");
            log.info("Completed setting connector {} in the model {}", (Object)mlModel.getConnectorId(), (Object)modelId);
        }, arg_0 -> wrappedListener.onFailure(arg_0)));
    }

    private void setupParamsAndPredictable(String modelId, MLModel mlModel) {
        Map<String, Object> params = this.setUpParameterMap(modelId);
        Predictable predictable = this.mlEngine.deploy(mlModel, params);
        this.modelCacheHelper.setPredictor(modelId, predictable);
    }

    private Map<String, Object> setUpParameterMap(String modelId) {
        TokenBucket rateLimiter = this.getRateLimiter(modelId);
        Map<String, TokenBucket> userRateLimiterMap = this.getUserRateLimiterMap(modelId);
        MLGuard mlGuard = this.getMLGuard(modelId);
        HashMap<String, Object> params = new HashMap<String, Object>();
        params.put("ml_engine", this.mlEngine);
        params.put("script_service", this.scriptService);
        params.put("client", this.client);
        params.put("xcontent_registry", this.xContentRegistry);
        params.put("cluster_service", this.clusterService);
        if (rateLimiter == null && userRateLimiterMap == null) {
            log.info("Setting up basic ML predictor parameters.");
        } else if (rateLimiter != null && userRateLimiterMap == null) {
            params.put("rate_limiter", rateLimiter);
            log.info("Setting up basic ML predictor parameters with model level throttling.");
        } else if (rateLimiter == null) {
            params.put("user_rate_limiter_map", userRateLimiterMap);
            log.info("Setting up basic ML predictor parameters with user level throttling.");
        } else {
            params.put("rate_limiter", rateLimiter);
            params.put("user_rate_limiter_map", userRateLimiterMap);
            log.info("Setting up basic ML predictor parameters with both model and user level throttling.");
        }
        if (mlGuard != null) {
            params.put("guardrails", mlGuard);
            log.info("Setting up ML guard parameter for ML predictor.");
        }
        return Collections.unmodifiableMap(params);
    }

    private void handleDeployModelException(String modelId, FunctionName functionName, ActionListener<String> listener, Exception e) {
        if (!(e instanceof MLLimitExceededException || e instanceof MLResourceNotFoundException || e instanceof IllegalArgumentException)) {
            this.mlStats.createCounterStatIfAbsent(functionName, ActionName.DEPLOY, MLActionLevelStat.ML_ACTION_FAILURE_COUNT).increment();
            this.mlStats.getStat(MLNodeLevelStat.ML_FAILURE_COUNT).increment();
        }
        this.removeModel(modelId);
        listener.onFailure(e);
    }

    public synchronized void updateModelCache(String modelId, ActionListener<String> listener) {
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            ActionListener wrappedListener = ActionListener.runBefore(listener, () -> ((ThreadContext.StoredContext)context).restore());
            this.getModel(modelId, (ActionListener<MLModel>)ActionListener.wrap(mlModel -> {
                int eligibleNodeCount = this.getWorkerNodes(modelId, mlModel.getAlgorithm()).length;
                this.modelCacheHelper.setIsModelEnabled(modelId, mlModel.getIsEnabled());
                this.setupRateLimiter(modelId, eligibleNodeCount, mlModel.getRateLimiter());
                this.setupMLGuard(modelId, mlModel.getGuardrails());
                this.setupModelInterface(modelId, mlModel.getModelInterface());
                if (mlModel.getAlgorithm() == FunctionName.REMOTE) {
                    if (mlModel.getConnector() != null) {
                        this.setupParamsAndPredictable(modelId, (MLModel)mlModel);
                        wrappedListener.onResponse((Object)("Successfully updated model cache for the remote model " + modelId));
                        log.info("Completed the model cache update for the remote model {}", (Object)modelId);
                    } else {
                        this.getConnector(mlModel.getConnectorId(), (ActionListener<Connector>)ActionListener.wrap(connector -> {
                            mlModel.setConnector(connector);
                            this.setupParamsAndPredictable(modelId, (MLModel)mlModel);
                            wrappedListener.onResponse((Object)("Successfully updated model cache for the remote model " + modelId));
                            log.info("Completed the model cache update for the remote model {}", (Object)modelId);
                        }, arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0)));
                    }
                }
                wrappedListener.onResponse((Object)("Successfully updated model cache for the model " + modelId));
                log.info("Completed the model cache update for the model {}", (Object)modelId);
            }, arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0)));
        }
        catch (Exception e) {
            log.error("Failed to updated model cache for the model " + modelId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    public synchronized void deployControllerWithDeployedModel(String modelId, ActionListener<String> listener) {
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            if (!this.modelCacheHelper.isModelDeployed(modelId)) {
                throw new OpenSearchStatusException("The model of this model controller has not deployed yet, please deploy the model first.", RestStatus.CONFLICT, new Object[0]);
            }
            ActionListener wrappedListener = ActionListener.runBefore(listener, () -> ((ThreadContext.StoredContext)context).restore());
            this.getModel(modelId, (ActionListener<MLModel>)ActionListener.wrap(mlModel -> this.getController(modelId, (ActionListener<MLController>)ActionListener.wrap(controller -> {
                int eligibleNodeCount = this.getWorkerNodes(modelId, mlModel.getAlgorithm()).length;
                this.setupUserRateLimiterMap(modelId, eligibleNodeCount, controller.getUserRateLimiter());
                if (mlModel.getAlgorithm() == FunctionName.REMOTE) {
                    if (mlModel.getConnector() != null) {
                        this.setupParamsAndPredictable(modelId, (MLModel)mlModel);
                        wrappedListener.onResponse((Object)("Successfully deployed model controller for the remote model " + modelId));
                        log.info("Deployed model controller for the remote model {}", (Object)modelId);
                    } else {
                        this.getConnector(mlModel.getConnectorId(), (ActionListener<Connector>)ActionListener.wrap(connector -> {
                            mlModel.setConnector(connector);
                            this.setupParamsAndPredictable(modelId, (MLModel)mlModel);
                            wrappedListener.onResponse((Object)("Successfully deployed model controller for the remote model " + modelId));
                            log.info("Deployed model controller for the remote model {}", (Object)modelId);
                        }, arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0)));
                    }
                    return;
                }
                wrappedListener.onResponse((Object)("Successfully deployed model controller for the model " + modelId));
                log.info("Deployed model controller for the model {}", (Object)modelId);
            }, arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0))), arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0)));
        }
        catch (Exception e) {
            log.error("Failed to deploy model controller for the model " + modelId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    public synchronized void undeployController(String modelId, ActionListener<String> listener) {
        if (this.modelCacheHelper.isModelDeployed(modelId)) {
            try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
                ActionListener wrappedListener = ActionListener.runBefore(listener, () -> ((ThreadContext.StoredContext)context).restore());
                this.getModel(modelId, (ActionListener<MLModel>)ActionListener.wrap(mlModel -> {
                    this.removeUserRateLimiterMap(modelId);
                    if (mlModel.getAlgorithm() == FunctionName.REMOTE) {
                        if (mlModel.getConnector() != null) {
                            this.setupParamsAndPredictable(modelId, (MLModel)mlModel);
                            wrappedListener.onResponse((Object)("Successfully undeployed model controller for the remote model " + modelId));
                            log.info("Undeployed model controller for the remote model {}", (Object)modelId);
                        } else {
                            this.getConnector(mlModel.getConnectorId(), (ActionListener<Connector>)ActionListener.wrap(connector -> {
                                mlModel.setConnector(connector);
                                this.setupParamsAndPredictable(modelId, (MLModel)mlModel);
                                wrappedListener.onResponse((Object)("Successfully undeployed model controller for the remote model " + modelId));
                                log.info("Undeployed model controller for the remote model {}", (Object)modelId);
                            }, arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0)));
                        }
                        return;
                    }
                    wrappedListener.onResponse((Object)("Successfully undeployed model controller for the model " + modelId));
                    log.info("Undeployed model controller for the model {}", (Object)modelId);
                }, arg_0 -> ((ActionListener)wrappedListener).onFailure(arg_0)));
            }
            catch (Exception e) {
                log.error("Failed to undeploy model controller for the model " + modelId, (Throwable)e);
                listener.onFailure(e);
            }
        } else if (this.isModelRunningOnNode(modelId)) {
            log.error("Failed to undeploy model controller because model is in ML cache but with a state other than deployed. Please check model: " + modelId, (Throwable)new RuntimeException());
            listener.onFailure((Exception)new RuntimeException("Failed to undeploy model controller because model is in ML cache but with a state other than deployed. Please check model: " + modelId));
        } else {
            log.info("Successfully deployed model controller from cache due to model not exist in cache. Model ID: " + modelId);
            listener.onResponse((Object)("Successfully deployed model controller from cache due to model not exist in cache. Model ID: " + modelId));
        }
    }

    private synchronized void deployControllerWithDeployingModel(MLModel mlModel, Integer eligibleNodeCount, ActionListener<String> listener) {
        String modelId = mlModel.getModelId();
        FetchSourceContext fetchContext = new FetchSourceContext(true);
        GetRequest getRequest = new GetRequest(".plugins-ml-controller").id(modelId).fetchSourceContext(fetchContext);
        this.client.get(getRequest, ActionListener.wrap(r -> {
            if (r != null && r.isExists()) {
                try (XContentParser parser = MLNodeUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    MLController controller = MLController.parse((XContentParser)parser);
                    this.setupUserRateLimiterMap(modelId, eligibleNodeCount, controller.getUserRateLimiter());
                    log.info("Successfully redeployed model controller for model " + modelId);
                    listener.onResponse((Object)("Successfully redeployed model controller for model " + modelId));
                }
                catch (Exception e) {
                    log.error("Failed to parse ml task" + r.getId(), (Throwable)e);
                    listener.onFailure(e);
                }
            } else if (!BooleanUtils.isTrue((Boolean)mlModel.getIsControllerEnabled())) {
                listener.onResponse((Object)("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller. Please use the create controller api to create one if this is unexpected."));
                log.debug("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller.");
            } else {
                listener.onFailure((Exception)new OpenSearchStatusException("Failed to find model controller", RestStatus.NOT_FOUND, new Object[0]));
            }
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                if (!BooleanUtils.isTrue((Boolean)mlModel.getIsControllerEnabled())) {
                    listener.onResponse((Object)("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller. Please use the create model controller api to create one if this is unexpected."));
                    log.debug("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller.");
                } else {
                    listener.onFailure((Exception)new OpenSearchStatusException("Failed to find model controller", RestStatus.NOT_FOUND, new Object[0]));
                }
            } else {
                log.error("Failed to re-deploy the model controller for model: " + modelId, (Throwable)e);
                listener.onFailure(e);
            }
        }));
    }

    public void deployControllerWithDeployingModel(MLModel mlModel, Integer eligibleNodeCount) {
        if (mlModel.getModelState() != MLModelState.DEPLOYING) {
            throw new OpenSearchStatusException("This method should only be called when model is in DEPLOYING state, but the model is in state: " + mlModel.getModelState(), RestStatus.CONFLICT, new Object[0]);
        }
        this.deployControllerWithDeployingModel(mlModel, eligibleNodeCount, (ActionListener<String>)ActionListener.wrap(response -> {
            if (response.startsWith("Successfully")) {
                log.debug(response, (Object)mlModel.getModelId());
            } else if (response.endsWith("is expected not having a model controller. Please use the create model controller api to create one if this is unexpected.")) {
                log.warn(response);
            } else {
                log.error(response);
            }
        }, e -> log.error("Failed to re-deploy the model controller for model: " + mlModel.getModelId(), (Throwable)e)));
    }

    private void setupRateLimiter(String modelId, Integer eligibleNodeCount, MLRateLimiter rateLimiter) {
        if (rateLimiter != null) {
            this.modelCacheHelper.setRateLimiter(modelId, this.createTokenBucket(eligibleNodeCount, rateLimiter));
        } else {
            this.modelCacheHelper.removeRateLimiter(modelId);
        }
    }

    private void setupUserRateLimiterMap(String modelId, Integer eligibleNodeCount, Map<String, MLRateLimiter> userRateLimiter) {
        if (userRateLimiter != null && !userRateLimiter.isEmpty()) {
            HashMap<String, TokenBucket> userRateLimiterMap = new HashMap<String, TokenBucket>();
            userRateLimiter.forEach((user, rateLimiter) -> userRateLimiterMap.put((String)user, this.createTokenBucket(eligibleNodeCount, (MLRateLimiter)rateLimiter)));
            this.modelCacheHelper.setUserRateLimiterMap(modelId, userRateLimiterMap);
        } else {
            this.modelCacheHelper.removeUserRateLimiterMap(modelId);
        }
    }

    private void removeUserRateLimiterMap(String modelId) {
        this.modelCacheHelper.removeUserRateLimiterMap(modelId);
    }

    private TokenBucket createTokenBucket(Integer eligibleNodeCount, MLRateLimiter rateLimiter) {
        if (rateLimiter.isValid()) {
            double limit = Double.parseDouble(rateLimiter.getLimit());
            TimeUnit unit = rateLimiter.getUnit();
            log.info("Initializing the rate limiter with setting {} per {} (TPS limit {}), evenly distributed on {} nodes", (Object)limit, (Object)unit, (Object)(limit / (double)unit.toSeconds(1L)), (Object)eligibleNodeCount);
            return new TokenBucket(System::nanoTime, limit / (double)unit.toNanos(1L) / (double)eligibleNodeCount.intValue(), Math.max(limit / (double)eligibleNodeCount.intValue(), 1.0), Math.max(limit / (double)eligibleNodeCount.intValue(), 1.0));
        }
        return null;
    }

    public TokenBucket getRateLimiter(String modelId) {
        return this.modelCacheHelper.getRateLimiter(modelId);
    }

    public Map<String, TokenBucket> getUserRateLimiterMap(String modelId) {
        return this.modelCacheHelper.getUserRateLimiterMap(modelId);
    }

    private void setupModelInterface(String modelId, Map<String, String> modelInterface) {
        log.debug("Model interface for model: {} loaded into cache.", (Object)modelId);
        if (modelInterface != null) {
            this.modelCacheHelper.setModelInterface(modelId, modelInterface);
        } else {
            this.modelCacheHelper.removeModelInterface(modelId);
        }
    }

    public Map<String, String> getModelInterface(String modelId) {
        return this.modelCacheHelper.getModelInterface(modelId);
    }

    private void setupMLGuard(String modelId, Guardrails guardrails) {
        if (guardrails != null) {
            this.modelCacheHelper.setMLGuard(modelId, this.createMLGuard(guardrails, this.xContentRegistry, this.client));
        } else {
            this.modelCacheHelper.removeMLGuard(modelId);
        }
    }

    private MLGuard createMLGuard(Guardrails guardrails, NamedXContentRegistry xContentRegistry, Client client) {
        return new MLGuard(guardrails, xContentRegistry, client);
    }

    public MLGuard getMLGuard(String modelId) {
        return this.modelCacheHelper.getMLGuard(modelId);
    }

    public void getModel(String modelId, ActionListener<MLModel> listener) {
        this.getModel(modelId, null, null, listener);
    }

    public void getModel(String modelId, String[] includes, String[] excludes, ActionListener<MLModel> listener) {
        GetRequest getRequest = new GetRequest();
        FetchSourceContext fetchContext = new FetchSourceContext(true, includes, excludes);
        ((GetRequest)getRequest.index(".plugins-ml-model")).id(modelId).fetchSourceContext(fetchContext);
        this.client.get(getRequest, ActionListener.wrap(r -> {
            if (r != null && r.isExists()) {
                try (XContentParser parser = MLNodeUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    String algorithmName = r.getSource().get("algorithm").toString();
                    MLModel mlModel = MLModel.parse((XContentParser)parser, (String)algorithmName);
                    mlModel.setModelId(modelId);
                    listener.onResponse((Object)mlModel);
                }
                catch (Exception e) {
                    log.error("Failed to parse ml task" + r.getId(), (Throwable)e);
                    listener.onFailure(e);
                }
            } else {
                listener.onFailure((Exception)new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND, new Object[0]));
            }
        }, arg_0 -> listener.onFailure(arg_0)));
    }

    public void getController(String modelId, ActionListener<MLController> listener) {
        FetchSourceContext fetchContext = new FetchSourceContext(true);
        GetRequest getRequest = new GetRequest(".plugins-ml-controller").id(modelId).fetchSourceContext(fetchContext);
        this.client.get(getRequest, ActionListener.wrap(r -> {
            if (r != null && r.isExists()) {
                try (XContentParser parser = MLNodeUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    MLController controller = MLController.parse((XContentParser)parser);
                    listener.onResponse((Object)controller);
                }
                catch (Exception e) {
                    log.error("Failed to parse ml task" + r.getId(), (Throwable)e);
                    listener.onFailure(e);
                }
            } else {
                listener.onFailure((Exception)new OpenSearchStatusException("Failed to find model controller", RestStatus.NOT_FOUND, new Object[0]));
            }
        }, arg_0 -> listener.onFailure(arg_0)));
    }

    private void getConnector(String connectorId, ActionListener<Connector> listener) {
        GetRequest getRequest = ((GetRequest)new GetRequest().index(".plugins-ml-connector")).id(connectorId);
        this.client.get(getRequest, ActionListener.wrap(r -> {
            if (r != null && r.isExists()) {
                try (XContentParser parser = MLNodeUtils.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    Connector connector = Connector.createConnector((XContentParser)parser);
                    listener.onResponse((Object)connector);
                }
                catch (Exception e) {
                    log.error("Failed to parse connector:" + connectorId);
                    listener.onFailure(e);
                }
            } else {
                listener.onFailure((Exception)new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND, new Object[0]));
            }
        }, e -> {
            log.error("Failed to get connector", (Throwable)e);
            listener.onFailure((Exception)new OpenSearchStatusException("Failed to get connector:" + connectorId, RestStatus.NOT_FOUND, new Object[0]));
        }));
    }

    private void retrieveModelChunks(MLModel mlModelMeta, ActionListener<File> listener) throws InterruptedException {
        String modelId = mlModelMeta.getModelId();
        String modelName = mlModelMeta.getName();
        Integer totalChunks = mlModelMeta.getTotalChunks();
        GetRequest getRequest = new GetRequest();
        getRequest.index(".plugins-ml-model");
        getRequest.id();
        Semaphore semaphore = new Semaphore(1);
        AtomicBoolean stopNow = new AtomicBoolean(false);
        String modelZip = this.mlEngine.getDeployModelZipPath(modelId, modelName);
        ConcurrentLinkedDeque chunkFiles = new ConcurrentLinkedDeque();
        AtomicInteger retrievedChunks = new AtomicInteger(0);
        int i = 0;
        while (i < totalChunks) {
            semaphore.tryAcquire(10L, TimeUnit.SECONDS);
            if (stopNow.get()) {
                throw new MLException("Failed to deploy model");
            }
            String modelChunkId = this.getModelChunkId(modelId, i);
            int currentChunk = i++;
            this.getModel(modelChunkId, (ActionListener<MLModel>)this.threadedActionListener("opensearch_ml_deploy", ActionListener.wrap(model -> {
                Path chunkPath = this.mlEngine.getDeployModelChunkPath(modelId, Integer.valueOf(currentChunk));
                FileUtils.write((byte[])Base64.getDecoder().decode(model.getContent()), (String)chunkPath.toString());
                chunkFiles.add(new File(chunkPath.toUri()));
                retrievedChunks.getAndIncrement();
                if (retrievedChunks.get() == totalChunks.intValue()) {
                    File modelZipFile = new File(modelZip);
                    FileUtils.mergeFiles((Queue)chunkFiles, (File)modelZipFile);
                    listener.onResponse((Object)modelZipFile);
                }
                semaphore.release();
            }, e -> {
                stopNow.set(true);
                semaphore.release();
                log.error("Failed to retrieve model chunk " + modelChunkId, (Throwable)e);
                if (retrievedChunks.get() == totalChunks - 1) {
                    listener.onFailure((Exception)new MLResourceNotFoundException("Fail to find model chunk " + modelChunkId));
                }
            })));
        }
    }

    public void updateModel(String modelId, Boolean isHidden, Map<String, Object> updatedFields) {
        this.updateModel(modelId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(response -> {
            if (response.status() == RestStatus.OK) {
                log.debug(StringUtils.getErrorMessage((String)"Updated ML model successfully: {}", (String)modelId, (Boolean)isHidden), (Object)response.status());
            } else {
                log.error(StringUtils.getErrorMessage((String)"Failed to update provided ML model, status: {}", (String)modelId, (Boolean)isHidden), (Object)response.status());
            }
        }, e -> log.error(StringUtils.getErrorMessage((String)"Failed to update the provided ML model", (String)modelId, (Boolean)isHidden), (Throwable)e)));
    }

    public void updateModel(String modelId, Map<String, Object> updatedFields, ActionListener<UpdateResponse> listener) {
        if (updatedFields == null || updatedFields.size() == 0) {
            listener.onFailure((Exception)new IllegalArgumentException("Updated fields is null or empty"));
            return;
        }
        HashMap<String, Object> newUpdatedFields = new HashMap<String, Object>();
        newUpdatedFields.putAll(updatedFields);
        newUpdatedFields.put("last_updated_time", Instant.now().toEpochMilli());
        UpdateRequest updateRequest = new UpdateRequest(".plugins-ml-model", modelId);
        updateRequest.doc(newUpdatedFields);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        if (newUpdatedFields.containsKey("model_state") && MODEL_DONE_STATES.contains(newUpdatedFields.get("model_state"))) {
            updateRequest.retryOnConflict(3);
        }
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            this.client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    public String getModelChunkId(String modelId, Integer chunkNumber) {
        return modelId + "_" + chunkNumber;
    }

    public void addModelWorkerNode(String modelId, String ... nodeIds) {
        if (nodeIds != null) {
            for (String nodeId : nodeIds) {
                this.modelCacheHelper.addWorkerNode(modelId, nodeId);
            }
        }
    }

    public void addModelWorkerNodes(List<String> nodeIds) {
        if (nodeIds != null) {
            String[] modelIds = this.getAllModelIds();
            for (String nodeId : nodeIds) {
                Arrays.stream(modelIds).forEach(x -> this.modelCacheHelper.addWorkerNode((String)x, nodeId));
            }
        }
    }

    public void removeModelWorkerNode(String modelId, boolean isFromUndeploy, String ... nodeIds) {
        if (nodeIds != null) {
            for (String nodeId : nodeIds) {
                this.modelCacheHelper.removeWorkerNode(modelId, nodeId, isFromUndeploy);
            }
        }
    }

    public void removeWorkerNodes(Set<String> removedNodes, boolean isFromUndeploy) {
        this.modelCacheHelper.removeWorkerNodes(removedNodes, isFromUndeploy);
    }

    public synchronized Map<String, String> undeployModel(String[] modelIds) {
        HashMap<String, String> modelUndeployStatus = new HashMap<String, String>();
        if (modelIds != null && modelIds.length > 0) {
            log.debug("undeploy models {}", (Object)Arrays.toString(modelIds));
            for (String modelId : modelIds) {
                if (this.modelCacheHelper.isModelDeployed(modelId)) {
                    modelUndeployStatus.put(modelId, "undeployed");
                    this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).decrement();
                    this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
                    this.mlStats.createCounterStatIfAbsent(this.getModelFunctionName(modelId), ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                    this.mlStats.createModelCounterStatIfAbsent(modelId, ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                } else {
                    modelUndeployStatus.put(modelId, "not_found");
                }
                this.removeModel(modelId);
            }
        } else {
            log.debug("undeploy all models {}", (Object)Arrays.toString(this.getLocalDeployedModels()));
            for (String modelId : this.getLocalDeployedModels()) {
                modelUndeployStatus.put(modelId, "undeployed");
                this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).decrement();
                this.mlStats.createCounterStatIfAbsent(this.getModelFunctionName(modelId), ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                this.mlStats.createModelCounterStatIfAbsent(modelId, ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                this.removeModel(modelId);
            }
        }
        return modelUndeployStatus;
    }

    private void removeModel(String modelId) {
        this.modelCacheHelper.removeModel(modelId);
        this.modelHelper.deleteFileCache(modelId);
    }

    public String[] getWorkerNodes(String modelId, FunctionName functionName, boolean onlyEligibleNode) {
        String[] workerNodeIds = this.modelCacheHelper.getWorkerNodes(modelId);
        if (!onlyEligibleNode) {
            return workerNodeIds;
        }
        if (workerNodeIds == null || workerNodeIds.length == 0) {
            return workerNodeIds;
        }
        String[] eligibleNodeIds = this.nodeHelper.filterEligibleNodes(functionName, workerNodeIds);
        if (eligibleNodeIds == null || eligibleNodeIds.length == 0) {
            throw new IllegalArgumentException("No eligible worker node found");
        }
        return eligibleNodeIds;
    }

    public int getWorkerNodesSize(String modelId, FunctionName functionName, boolean onlyEligibleNode) {
        return this.getWorkerNodes(modelId, functionName, onlyEligibleNode).length;
    }

    public String[] getWorkerNodes(String modelId, FunctionName functionName) {
        return this.getWorkerNodes(modelId, functionName, false);
    }

    public int getWorkerNodesSize(String modelId, FunctionName functionName) {
        return this.getWorkerNodes(modelId, functionName, false).length;
    }

    public Predictable getPredictor(String modelId) {
        return this.modelCacheHelper.getPredictor(modelId);
    }

    public String[] getAllModelIds() {
        return this.modelCacheHelper.getAllModels();
    }

    public String[] getLocalDeployedModels() {
        return this.modelCacheHelper.getDeployedModels();
    }

    public String[] getExpiredModels() {
        return this.modelCacheHelper.getExpiredModels();
    }

    public synchronized void syncModelWorkerNodes(Map<String, Set<String>> modelWorkerNodes) {
        this.modelCacheHelper.syncWorkerNodes(modelWorkerNodes);
    }

    public void clearRoutingTable() {
        this.modelCacheHelper.clearWorkerNodes();
    }

    public MLModelProfile getModelProfile(String modelId) {
        return this.modelCacheHelper.getModelProfile(modelId);
    }

    public <T> T trackPredictDuration(String modelId, Supplier<T> supplier) {
        long start = System.nanoTime();
        T t = supplier.get();
        long end = System.nanoTime();
        double durationInMs = (double)(end - start) / 1000000.0;
        this.modelCacheHelper.addModelInferenceDuration(modelId, durationInMs);
        return t;
    }

    public void trackPredictDuration(String modelId, long startTime) {
        long end = System.nanoTime();
        double durationInMs = (double)(end - startTime) / 1000000.0;
        this.modelCacheHelper.addModelInferenceDuration(modelId, durationInMs);
    }

    public FunctionName getModelFunctionName(String modelId) {
        return this.modelCacheHelper.getFunctionName(modelId);
    }

    public Optional<FunctionName> getOptionalModelFunctionName(String modelId) {
        return this.modelCacheHelper.getOptionalFunctionName(modelId);
    }

    public boolean isModelRunningOnNode(String modelId) {
        return this.modelCacheHelper.isModelRunningOnNode(modelId);
    }

    public boolean isModelDeployed(String modelId) {
        return this.modelCacheHelper.isModelDeployed(modelId);
    }

    public boolean isNodeEligible(String nodeId, FunctionName functionName) {
        Set allEligibleNodeIds = Arrays.stream(this.nodeHelper.getEligibleNodes(functionName)).map(DiscoveryNode::getId).collect(Collectors.toSet());
        return allEligibleNodeIds.contains(nodeId);
    }

    public MLModel addModelToAutoDeployCache(String modelId, MLModel model) {
        return this.modelCacheHelper.addModelToAutoDeployCache(modelId, model);
    }

    public void removeAutoDeployModel(String modelId) {
        this.modelCacheHelper.removeAutoDeployModel(modelId);
    }
}

