/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ccr.action;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;

public class ShardFollowTasksExecutor
extends PersistentTasksExecutor<ShardFollowTask> {
    private final Client client;
    private final ThreadPool threadPool;
    private final ClusterService clusterService;
    private final IndexScopedSettings indexScopedSettings;

    public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexScopedSettings indexScopedSettings) {
        super(settings, "xpack/ccr/shard_follow_task", "ccr");
        this.client = client;
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.indexScopedSettings = indexScopedSettings;
    }

    public void validate(ShardFollowTask params, ClusterState clusterState) {
        IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
        if (!routingTable.shard(params.getFollowShardId().id()).primaryShard().started()) {
            throw new IllegalArgumentException("Not all copies of follow shard are started");
        }
    }

    protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress, Map<String, String> headers) {
        final ShardFollowTask params = (ShardFollowTask)taskInProgress.getParams();
        final Client remoteClient = params.getRemoteCluster() != null ? CcrLicenseChecker.wrapClient(this.client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()) : CcrLicenseChecker.wrapClient(this.client, params.getHeaders());
        final Client followerClient = CcrLicenseChecker.wrapClient(this.client, params.getHeaders());
        BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
            try {
                this.threadPool.schedule(delay, "ccr", command);
            }
            catch (EsRejectedExecutionException e) {
                if (e.isExecutorShutdown()) {
                    this.logger.debug("couldn't schedule command, executor is shutting down", (Throwable)e);
                }
                throw e;
            }
        };
        final String recordedLeaderShardHistoryUUID = this.getLeaderShardHistoryUUID(params);
        return new ShardFollowNodeTask(id, type, action, this.getDescription(taskInProgress), parentTaskId, headers, params, scheduler, System::nanoTime){

            @Override
            protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
                Index leaderIndex = params.getLeaderShardId().getIndex();
                Index followIndex = params.getFollowShardId().getIndex();
                ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
                clusterStateRequest.clear();
                clusterStateRequest.metaData(true);
                clusterStateRequest.indices(new String[]{leaderIndex.getName()});
                remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
                    IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
                    if (indexMetaData.getMappings().isEmpty()) {
                        assert (indexMetaData.getMappingVersion() == 1L);
                        handler.accept(indexMetaData.getMappingVersion());
                        return;
                    }
                    assert (indexMetaData.getMappings().size() == 1) : "expected exactly one mapping, but got [" + indexMetaData.getMappings().size() + "]";
                    MappingMetaData mappingMetaData = (MappingMetaData)((ObjectObjectCursor)indexMetaData.getMappings().iterator().next()).value;
                    PutMappingRequest putMappingRequest = new PutMappingRequest(new String[]{followIndex.getName()});
                    putMappingRequest.type(mappingMetaData.type());
                    putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
                    followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), (Consumer)errorHandler));
                }, errorHandler));
            }

            @Override
            protected void innerUpdateSettings(LongConsumer finalHandler, Consumer<Exception> errorHandler) {
                Index leaderIndex = params.getLeaderShardId().getIndex();
                Index followIndex = params.getFollowShardId().getIndex();
                ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
                clusterStateRequest.clear();
                clusterStateRequest.metaData(true);
                clusterStateRequest.indices(new String[]{leaderIndex.getName()});
                CheckedConsumer onResponse = clusterStateResponse -> {
                    Settings settings;
                    IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
                    IndexMetaData followerIMD = ShardFollowTasksExecutor.this.clusterService.state().metaData().getIndexSafe(followIndex);
                    Settings existingSettings = TransportResumeFollowAction.filter(followerIMD.getSettings());
                    if (existingSettings.equals((Object)(settings = TransportResumeFollowAction.filter(leaderIMD.getSettings())))) {
                        finalHandler.accept(leaderIMD.getSettingsVersion());
                    } else {
                        Settings updatedSettings = settings.filter(s -> existingSettings.get(s) == null || !existingSettings.get(s).equals(settings.get(s)));
                        if (updatedSettings.keySet().stream().allMatch(arg_0 -> ((IndexScopedSettings)ShardFollowTasksExecutor.this.indexScopedSettings).isDynamicSetting(arg_0))) {
                            UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(new String[]{followIndex.getName()});
                            updateSettingsRequest.settings(updatedSettings);
                            followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), (Consumer)errorHandler));
                        } else {
                            Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion());
                            this.closeIndexUpdateSettingsAndOpenIndex(followIndex.getName(), updatedSettings, handler, errorHandler);
                        }
                    }
                };
                remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap((CheckedConsumer)onResponse, errorHandler));
            }

            private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, Settings updatedSettings, Runnable handler, Consumer<Exception> onFailure) {
                CloseIndexRequest closeRequest = new CloseIndexRequest(new String[]{followIndex});
                CheckedConsumer onResponse = response -> this.updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure);
                followerClient.admin().indices().close(closeRequest, ActionListener.wrap((CheckedConsumer)onResponse, onFailure));
            }

            private void updateSettingsAndOpenIndex(String followIndex, Settings updatedSettings, Runnable handler, Consumer<Exception> onFailure) {
                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(new String[]{followIndex});
                updateSettingsRequest.settings(updatedSettings);
                CheckedConsumer onResponse = response -> this.openIndex(followIndex, handler, onFailure);
                followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap((CheckedConsumer)onResponse, onFailure));
            }

            private void openIndex(String followIndex, Runnable handler, Consumer<Exception> onFailure) {
                OpenIndexRequest openIndexRequest = new OpenIndexRequest(new String[]{followIndex});
                CheckedConsumer onResponse = response -> handler.run();
                followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap((CheckedConsumer)onResponse, onFailure));
            }

            @Override
            protected void innerSendBulkShardOperationsRequest(String followerHistoryUUID, List<Translog.Operation> operations, long maxSeqNoOfUpdatesOrDeletes, Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler) {
                BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), followerHistoryUUID, operations, maxSeqNoOfUpdatesOrDeletes);
                followerClient.execute((Action)BulkShardOperationsAction.INSTANCE, (ActionRequest)request, ActionListener.wrap(handler::accept, errorHandler));
            }

            @Override
            protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler, Consumer<Exception> errorHandler) {
                ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId(), recordedLeaderShardHistoryUUID);
                request.setFromSeqNo(from);
                request.setMaxOperationCount(maxOperationCount);
                request.setMaxBatchSize(params.getMaxReadRequestSize());
                request.setPollTimeout(params.getReadPollTimeout());
                remoteClient.execute((Action)ShardChangesAction.INSTANCE, (ActionRequest)request, ActionListener.wrap(handler::accept, errorHandler));
            }
        };
    }

    private String getLeaderShardHistoryUUID(ShardFollowTask params) {
        IndexMetaData followIndexMetaData = this.clusterService.state().metaData().index(params.getFollowShardId().getIndex());
        Map ccrIndexMetadata = followIndexMetaData.getCustomData("ccr");
        String[] recordedLeaderShardHistoryUUIDs = TransportResumeFollowAction.extractLeaderShardHistoryUUIDs(ccrIndexMetadata);
        return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()];
    }

    protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, PersistentTaskState state) {
        Client followerClient = CcrLicenseChecker.wrapClient(this.client, params.getHeaders());
        ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask)task;
        this.logger.info("{} Starting to track leader shard {}", (Object)params.getFollowShardId(), (Object)params.getLeaderShardId());
        FollowerStatsInfoHandler handler = (followerHistoryUUID, followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerHistoryUUID, followerGCP, maxSeqNo, followerGCP, maxSeqNo);
        Consumer<Exception> errorHandler = e -> {
            if (shardFollowNodeTask.isStopped()) {
                return;
            }
            if (ShardFollowNodeTask.shouldRetry(e)) {
                this.logger.debug((Message)new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", (Object)shardFollowNodeTask), (Throwable)e);
                this.threadPool.schedule(params.getMaxRetryDelay(), "ccr", () -> this.nodeOperation(task, params, state));
            } else {
                shardFollowNodeTask.markAsFailed((Exception)e);
            }
        };
        this.fetchFollowerShardInfo(followerClient, params.getFollowShardId(), handler, errorHandler);
    }

    private void fetchFollowerShardInfo(Client client, ShardId shardId, FollowerStatsInfoHandler handler, Consumer<Exception> errorHandler) {
        client.admin().indices().stats((IndicesStatsRequest)new IndicesStatsRequest().indices(new String[]{shardId.getIndexName()}), ActionListener.wrap(r -> {
            IndexStats indexStats = r.getIndex(shardId.getIndexName());
            if (indexStats == null) {
                IndexMetaData indexMetaData = this.clusterService.state().metaData().index(shardId.getIndex());
                if (indexMetaData != null) {
                    errorHandler.accept((Exception)new ShardNotFoundException(shardId));
                } else {
                    errorHandler.accept((Exception)new IndexNotFoundException(shardId.getIndex()));
                }
                return;
            }
            Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards()).filter(shardStats -> shardStats.getShardRouting().shardId().equals((Object)shardId)).filter(shardStats -> shardStats.getShardRouting().primary()).findAny();
            if (filteredShardStats.isPresent()) {
                ShardStats shardStats2 = filteredShardStats.get();
                CommitStats commitStats = shardStats2.getCommitStats();
                String historyUUID = (String)commitStats.getUserData().get("history_uuid");
                SeqNoStats seqNoStats = shardStats2.getSeqNoStats();
                long globalCheckpoint = seqNoStats.getGlobalCheckpoint();
                long maxSeqNo = seqNoStats.getMaxSeqNo();
                handler.accept(historyUUID, globalCheckpoint, maxSeqNo);
            } else {
                errorHandler.accept((Exception)new ShardNotFoundException(shardId));
            }
        }, errorHandler));
    }

    static interface FollowerStatsInfoHandler {
        public void accept(String var1, long var2, long var4);
    }
}

