/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ml.autoredeploy;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.deploy.MLDeployModelAction;
import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest;
import org.opensearch.ml.model.MLModelManager;
import org.opensearch.ml.settings.MLCommonsSettings;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;

public class MLModelAutoReDeployer {
    @Generated
    private static final Logger log = LogManager.getLogger(MLModelAutoReDeployer.class);
    private final ClusterService clusterService;
    private final Client client;
    private final Settings settings;
    private boolean enableAutoReDeployModel;
    private boolean onlyRunOnMlNode;
    private int autoDeployMaxRetryTimes;
    private boolean allowCustomDeploymentPlan;
    private final MLModelManager mlModelManager;
    private final Queue<ModelAutoRedeployArrangement> modelAutoRedeployArrangements = new ConcurrentLinkedQueue<ModelAutoRedeployArrangement>();
    private final SearchRequestBuilderFactory searchRequestBuilderFactory;
    private ActionListener<Boolean> startCronJobListener;

    public MLModelAutoReDeployer(ClusterService clusterService, Client client, Settings settings, MLModelManager mlModelManager, SearchRequestBuilderFactory searchRequestBuilderFactory) {
        this.clusterService = clusterService;
        this.client = client;
        this.settings = settings;
        this.mlModelManager = mlModelManager;
        this.searchRequestBuilderFactory = searchRequestBuilderFactory;
        this.enableAutoReDeployModel = (Boolean)MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE.get(settings);
        this.onlyRunOnMlNode = (Boolean)MLCommonsSettings.ML_COMMONS_ONLY_RUN_ON_ML_NODE.get(settings);
        this.autoDeployMaxRetryTimes = (Integer)MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES.get(settings);
        this.allowCustomDeploymentPlan = (Boolean)MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE, it -> {
            this.enableAutoReDeployModel = it;
        });
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_ONLY_RUN_ON_ML_NODE, this.undeployModelsOnDataNodesConsumer());
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES, it -> {
            this.autoDeployMaxRetryTimes = it;
        });
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN, it -> {
            this.allowCustomDeploymentPlan = it;
        });
    }

    private void undeployModelsOnDataNodes() {
        ArrayList<String> dataNodeIds = new ArrayList<String>();
        this.clusterService.state().nodes().getDataNodes().values().iterator().forEachRemaining(x -> dataNodeIds.add(x.getId()));
        if (dataNodeIds.size() > 0) {
            this.triggerUndeployModelsOnDataNodes(dataNodeIds);
        }
    }

    @VisibleForTesting
    Consumer<Boolean> undeployModelsOnDataNodesConsumer() {
        return x -> {
            this.onlyRunOnMlNode = x;
            if (this.onlyRunOnMlNode) {
                this.undeployModelsOnDataNodes();
            }
        };
    }

    public void buildAutoReloadArrangement(List<String> addedNodes, String clusterManagerNodeId) {
        if (!this.enableAutoReDeployModel) {
            log.info("Model auto reload configuration is false, not performing auto reloading!");
            this.startCronjobAndClearListener();
            return;
        }
        String localNodeId = this.clusterService.localNode().getId();
        if (Strings.isNullOrEmpty((String)localNodeId) || !localNodeId.equals(clusterManagerNodeId)) {
            log.info("model auto reloading should be initialized by cluster manager node only, current node id is empty or current node not cluster manager!");
            return;
        }
        this.triggerAutoDeployModels(addedNodes);
    }

    public void redeployAModel() {
        if (!this.enableAutoReDeployModel) {
            log.info("Model auto reload configuration is false, not performing auto reloading!");
            this.startCronjobAndClearListener();
            return;
        }
        if (this.modelAutoRedeployArrangements.size() == 0) {
            log.info("No models needs to be auto redeployed!");
            this.startCronjobAndClearListener();
            return;
        }
        ModelAutoRedeployArrangement modelAutoRedeployArrangement = this.modelAutoRedeployArrangements.poll();
        this.triggerModelRedeploy(modelAutoRedeployArrangement);
    }

    private void triggerAutoDeployModels(List<String> addedNodes) {
        ActionListener listener = ActionListener.wrap(res -> {
            if (res != null && res.getHits() != null && res.getHits().getTotalHits() != null && res.getHits().getTotalHits().value > 0L) {
                Arrays.stream(res.getHits().getHits()).filter(x -> x != null && x.getSourceAsMap() != null && Optional.ofNullable(x.getSourceAsMap().get("auto_redeploy_retry_times")).orElse(0) < this.autoDeployMaxRetryTimes).forEach(x -> {
                    ModelAutoRedeployArrangement modelAutoRedeployArrangement = ModelAutoRedeployArrangement.builder().addedNodes(addedNodes).searchResponse((SearchHit)x).build();
                    boolean notExist = this.modelAutoRedeployArrangements.stream().noneMatch(y -> y.equals(modelAutoRedeployArrangement));
                    if (notExist) {
                        this.modelAutoRedeployArrangements.add(modelAutoRedeployArrangement);
                    }
                });
                this.redeployAModel();
            }
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                log.info("Index not found, not performing auto reloading!");
            } else if (e instanceof ClusterBlockException) {
                log.info("Cluster status not ready, not performing auto reloading now!");
            } else {
                log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", (Object)addedNodes, e);
            }
            this.startCronjobAndClearListener();
        });
        this.queryRunningModels((ActionListener<SearchResponse>)listener);
    }

    private void triggerUndeployModelsOnDataNodes(List<String> dataNodeIds) {
        ArrayList modelIds = new ArrayList();
        ActionListener listener = ActionListener.wrap(res -> {
            if (res != null && res.getHits() != null && res.getHits().getTotalHits() != null && res.getHits().getTotalHits().value > 0L) {
                Arrays.stream(res.getHits().getHits()).forEach(x -> modelIds.add(x.getId()));
                if (modelIds.size() > 0) {
                    ActionListener undeployModelListener = ActionListener.wrap(r -> log.info("Undeploy models on data nodes successfully!"), e -> log.error("Failed to undeploy models on data nodes, error is: {}", (Object)e.getMessage(), e));
                    MLUndeployModelNodesRequest undeployModelNodesRequest = new MLUndeployModelNodesRequest(dataNodeIds.toArray(new String[0]), modelIds.toArray(new String[0]));
                    this.client.execute((ActionType)MLUndeployModelAction.INSTANCE, (ActionRequest)undeployModelNodesRequest, undeployModelListener);
                }
            }
        }, e -> log.error("Failed to query need undeploy models, no action will be performed"));
        this.queryRunningModels((ActionListener<SearchResponse>)listener);
    }

    private void queryRunningModels(ActionListener<SearchResponse> listener) {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        TermsQueryBuilder builder = new TermsQueryBuilder("model_state", Arrays.asList(MLModelState.LOADING.name(), MLModelState.PARTIALLY_LOADED.name(), MLModelState.LOADED.name(), MLModelState.DEPLOYING.name(), MLModelState.PARTIALLY_DEPLOYED.name(), MLModelState.DEPLOYED.name()));
        FieldSortBuilder sortBuilder = (FieldSortBuilder)SortBuilders.fieldSort((String)"last_deployed_time").order(SortOrder.ASC);
        String[] includes = new String[]{"auto_redeploy_retry_times", "planning_worker_nodes", "deploy_to_all_nodes"};
        String[] excludes = new String[]{"model_content", "content"};
        FetchSourceContext fetchContext = new FetchSourceContext(true, includes, excludes);
        searchSourceBuilder.query((QueryBuilder)builder).sort((SortBuilder)sortBuilder).fetchSource(fetchContext);
        SearchRequestBuilder searchRequestBuilder = this.searchRequestBuilderFactory.getSearchRequestBuilder((OpenSearchClient)this.client, SearchAction.INSTANCE).setIndices(new String[]{".plugins-ml-model"}).setSource(searchSourceBuilder).setSize(10000);
        searchRequestBuilder.execute(listener);
    }

    private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeployArrangement) {
        String modelId = modelAutoRedeployArrangement.getSearchResponse().getId();
        List<String> addedNodes = modelAutoRedeployArrangement.getAddedNodes();
        List planningWorkerNodes = (List)modelAutoRedeployArrangement.getSearchResponse().getSourceAsMap().get("planning_worker_nodes");
        Integer autoRedeployRetryTimes = (Integer)modelAutoRedeployArrangement.getSearchResponse().getSourceAsMap().get("auto_redeploy_retry_times");
        Boolean deployToAllNodes = Optional.ofNullable(modelAutoRedeployArrangement.getSearchResponse().getSourceAsMap().get("deploy_to_all_nodes")).orElse(false);
        String[] nodeIds = null;
        if (deployToAllNodes.booleanValue() || !this.allowCustomDeploymentPlan) {
            nodeIds = new String[]{};
        } else if (planningWorkerNodes != null && planningWorkerNodes.size() > 0) {
            List needRedeployPlanningWorkerNodes = Arrays.stream(planningWorkerNodes.toArray(new String[0])).filter(addedNodes::contains).collect(Collectors.toList());
            String[] stringArray = nodeIds = needRedeployPlanningWorkerNodes.size() > 0 ? planningWorkerNodes.toArray(new String[0]) : null;
        }
        if (nodeIds == null) {
            log.info("Allow custom deployment plan is true and deploy to all nodes is false and added nodes are not in planning worker nodes list, not to auto redeploy the model to the new nodes!");
            return;
        }
        ActionListener listener = ActionListener.wrap(res -> log.info("Triggered model auto redeploy, task id is: {}, task status is: {}", (Object)res.getTaskId(), (Object)res.getStatus()), e -> {
            log.error("Exception occurred when auto redeploying the model, model id is: {}, exception is: {}, skipping current model auto redeploy and starting next model redeploy!", (Object)modelId, (Object)e.getMessage(), e);
            this.redeployAModel();
        });
        this.mlModelManager.updateModel(modelId, (Map<String, Object>)ImmutableMap.of((Object)"auto_redeploy_retry_times", (Object)(Optional.ofNullable(autoRedeployRetryTimes).orElse(0) + 1)));
        MLDeployModelRequest deployModelRequest = new MLDeployModelRequest(modelId, nodeIds, false, true, false);
        this.client.execute((ActionType)MLDeployModelAction.INSTANCE, (ActionRequest)deployModelRequest, listener);
    }

    private void startCronjobAndClearListener() {
        boolean managerNode = this.clusterService.localNode().isClusterManagerNode();
        if (managerNode && this.startCronJobListener != null) {
            this.startCronJobListener.onResponse((Object)true);
            this.startCronJobListener = null;
        }
    }

    @Generated
    public void setStartCronJobListener(ActionListener<Boolean> startCronJobListener) {
        this.startCronJobListener = startCronJobListener;
    }

    public static class SearchRequestBuilderFactory {
        public SearchRequestBuilder getSearchRequestBuilder(OpenSearchClient client, SearchAction action) {
            return new SearchRequestBuilder(client, action);
        }
    }

    static class ModelAutoRedeployArrangement {
        private List<String> addedNodes;
        private SearchHit searchResponse;

        @Generated
        ModelAutoRedeployArrangement(List<String> addedNodes, SearchHit searchResponse) {
            this.addedNodes = addedNodes;
            this.searchResponse = searchResponse;
        }

        @Generated
        public static ModelAutoRedeployArrangementBuilder builder() {
            return new ModelAutoRedeployArrangementBuilder();
        }

        @Generated
        public List<String> getAddedNodes() {
            return this.addedNodes;
        }

        @Generated
        public SearchHit getSearchResponse() {
            return this.searchResponse;
        }

        @Generated
        public void setAddedNodes(List<String> addedNodes) {
            this.addedNodes = addedNodes;
        }

        @Generated
        public void setSearchResponse(SearchHit searchResponse) {
            this.searchResponse = searchResponse;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ModelAutoRedeployArrangement)) {
                return false;
            }
            ModelAutoRedeployArrangement other = (ModelAutoRedeployArrangement)o;
            if (!other.canEqual(this)) {
                return false;
            }
            List<String> this$addedNodes = this.getAddedNodes();
            List<String> other$addedNodes = other.getAddedNodes();
            if (this$addedNodes == null ? other$addedNodes != null : !((Object)this$addedNodes).equals(other$addedNodes)) {
                return false;
            }
            SearchHit this$searchResponse = this.getSearchResponse();
            SearchHit other$searchResponse = other.getSearchResponse();
            return !(this$searchResponse == null ? other$searchResponse != null : !this$searchResponse.equals(other$searchResponse));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof ModelAutoRedeployArrangement;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            List<String> $addedNodes = this.getAddedNodes();
            result = result * 59 + ($addedNodes == null ? 43 : ((Object)$addedNodes).hashCode());
            SearchHit $searchResponse = this.getSearchResponse();
            result = result * 59 + ($searchResponse == null ? 43 : $searchResponse.hashCode());
            return result;
        }

        @Generated
        public String toString() {
            return "MLModelAutoReDeployer.ModelAutoRedeployArrangement(addedNodes=" + this.getAddedNodes() + ", searchResponse=" + this.getSearchResponse() + ")";
        }

        @Generated
        public static class ModelAutoRedeployArrangementBuilder {
            @Generated
            private List<String> addedNodes;
            @Generated
            private SearchHit searchResponse;

            @Generated
            ModelAutoRedeployArrangementBuilder() {
            }

            @Generated
            public ModelAutoRedeployArrangementBuilder addedNodes(List<String> addedNodes) {
                this.addedNodes = addedNodes;
                return this;
            }

            @Generated
            public ModelAutoRedeployArrangementBuilder searchResponse(SearchHit searchResponse) {
                this.searchResponse = searchResponse;
                return this;
            }

            @Generated
            public ModelAutoRedeployArrangement build() {
                return new ModelAutoRedeployArrangement(this.addedNodes, this.searchResponse);
            }

            @Generated
            public String toString() {
                return "MLModelAutoReDeployer.ModelAutoRedeployArrangement.ModelAutoRedeployArrangementBuilder(addedNodes=" + this.addedNodes + ", searchResponse=" + this.searchResponse + ")";
            }
        }
    }
}

