/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.action.support.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChildTaskActionRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse>
extends TransportAction<Request, Response> {
    protected final TransportService transportService;
    protected final ClusterService clusterService;
    protected final IndicesService indicesService;
    protected final ShardStateAction shardStateAction;
    protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
    protected final TransportRequestOptions transportOptions;
    protected final MappingUpdatedAction mappingUpdatedAction;
    final String transportReplicaAction;
    final String transportPrimaryAction;
    final String executor;
    final boolean checkWriteConsistency;

    protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Class<Request> request, Class<ReplicaRequest> replicaRequest, String executor) {
        super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
        this.transportService = transportService;
        this.clusterService = clusterService;
        this.indicesService = indicesService;
        this.shardStateAction = shardStateAction;
        this.mappingUpdatedAction = mappingUpdatedAction;
        this.transportPrimaryAction = actionName + "[p]";
        this.transportReplicaAction = actionName + "[r]";
        this.executor = executor;
        this.checkWriteConsistency = this.checkWriteConsistency();
        transportService.registerRequestHandler(actionName, request, "same", new OperationTransportHandler());
        transportService.registerRequestHandler(this.transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
        transportService.registerRequestHandler(this.transportReplicaAction, replicaRequest, executor, true, true, new ReplicaOperationTransportHandler());
        this.transportOptions = this.transportOptions();
        this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
    }

    @Override
    protected final void doExecute(Request request, ActionListener<Response> listener) {
        throw new UnsupportedOperationException("the task parameter is required for this operation");
    }

    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        new ReroutePhase(this, (ReplicationTask)task, request, listener).run();
    }

    protected abstract Response newResponseInstance();

    protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) {
    }

    protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(MetaData var1, Request var2) throws Throwable;

    protected abstract void shardOperationOnReplica(ReplicaRequest var1);

    protected boolean checkWriteConsistency() {
        return true;
    }

    protected ClusterBlockLevel globalBlockLevel() {
        return ClusterBlockLevel.WRITE;
    }

    protected ClusterBlockLevel indexBlockLevel() {
        return ClusterBlockLevel.WRITE;
    }

    protected boolean resolveIndex() {
        return true;
    }

    protected TransportRequestOptions transportOptions() {
        return TransportRequestOptions.EMPTY;
    }

    protected boolean retryPrimaryException(Throwable e) {
        return e.getClass() == RetryOnPrimaryException.class || TransportActions.isShardNotAvailableException(e);
    }

    protected boolean ignoreReplicaException(Throwable e) {
        if (TransportActions.isShardNotAvailableException(e)) {
            return true;
        }
        return this.isConflictException(e);
    }

    protected boolean mustFailReplica(Throwable e) {
        return !this.ignoreReplicaException(e);
    }

    protected boolean isConflictException(Throwable e) {
        Throwable cause = ExceptionsHelper.unwrapCause(e);
        if (cause instanceof VersionConflictEngineException) {
            return true;
        }
        return cause instanceof DocumentAlreadyExistsException;
    }

    protected Releasable getIndexShardOperationsCounter(ShardId shardId) {
        IndexService indexService = this.indicesService.indexServiceSafe(shardId.index().getName());
        IndexShard indexShard = indexService.shardSafe(shardId.id());
        return new IndexShardReference(indexShard);
    }

    protected boolean shouldExecuteReplication(Settings settings) {
        return !IndexMetaData.isIndexUsingShadowReplicas(settings);
    }

    protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {
        if (refresh) {
            try {
                indexShard.refresh("refresh_flag_index");
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
            indexShard.sync(location);
        }
    }

    static void setPhase(ReplicationTask task, String phase) {
        if (task != null) {
            task.setPhase(phase);
        }
    }

    static class IndexShardReference
    implements Releasable {
        private final IndexShard counter;
        private final AtomicBoolean closed = new AtomicBoolean();

        IndexShardReference(IndexShard counter) {
            counter.incrementOperationCounter();
            this.counter = counter;
        }

        @Override
        public void close() {
            if (this.closed.compareAndSet(false, true)) {
                this.counter.decrementOperationCounter();
            }
        }
    }

    static final class ReplicationPhase
    extends AbstractRunnable {
        private final ReplicationTask task;
        private final ReplicaRequest replicaRequest;
        private final Response finalResponse;
        private final TransportChannel channel;
        private final ShardId shardId;
        private final List<ShardRouting> shards;
        private final DiscoveryNodes nodes;
        private final boolean executeOnReplica;
        private final String indexUUID;
        private final AtomicBoolean finished = new AtomicBoolean();
        private final AtomicInteger success = new AtomicInteger(1);
        private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
        private final AtomicInteger pending;
        private final int totalShards;
        private final Releasable indexShardReference;
        final /* synthetic */ TransportReplicationAction this$0;

        public ReplicationPhase(ReplicationTask task, ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId, TransportChannel channel, Releasable indexShardReference) {
            this.this$0 = var1_1;
            this.task = task;
            this.replicaRequest = replicaRequest;
            this.channel = channel;
            this.finalResponse = finalResponse;
            this.indexShardReference = indexShardReference;
            this.shardId = shardId;
            ClusterState state = var1_1.clusterService.state();
            IndexRoutingTable index = state.getRoutingTable().index(shardId.getIndex());
            IndexShardRoutingTable shardRoutingTable = index != null ? index.shard(shardId.id()) : null;
            IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
            this.shards = shardRoutingTable != null ? shardRoutingTable.shards() : Collections.emptyList();
            this.executeOnReplica = indexMetaData == null || var1_1.shouldExecuteReplication(indexMetaData.getSettings());
            this.indexUUID = indexMetaData != null ? indexMetaData.getIndexUUID() : null;
            this.nodes = state.getNodes();
            if (this.shards.isEmpty()) {
                ((TransportReplicationAction)var1_1).logger.debug("replication phase for request [{}] on [{}] is skipped due to index deletion after primary operation", replicaRequest, shardId);
            }
            int numberOfIgnoredShardInstances = 0;
            int numberOfPendingShardInstances = 0;
            for (ShardRouting shard : this.shards) {
                if (shard.state() != ShardRoutingState.STARTED) {
                    ((ReplicationRequest)replicaRequest).setCanHaveDuplicates();
                }
                if (!shard.primary() && !this.executeOnReplica) {
                    ++numberOfIgnoredShardInstances;
                    continue;
                }
                if (shard.unassigned()) {
                    ++numberOfIgnoredShardInstances;
                    continue;
                }
                if (!shard.currentNodeId().equals(this.nodes.localNodeId())) {
                    ++numberOfPendingShardInstances;
                }
                if (!shard.relocating()) continue;
                ++numberOfPendingShardInstances;
            }
            this.totalShards = 1 + numberOfPendingShardInstances + numberOfIgnoredShardInstances;
            this.pending = new AtomicInteger(numberOfPendingShardInstances);
            if (((TransportReplicationAction)var1_1).logger.isTraceEnabled()) {
                ((TransportReplicationAction)var1_1).logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", this.pending.get(), var1_1.transportReplicaAction, replicaRequest, state.version());
            }
        }

        int totalShards() {
            return this.totalShards;
        }

        int successful() {
            return this.success.get();
        }

        int pending() {
            return this.pending.get();
        }

        @Override
        public void onFailure(Throwable t) {
            this.this$0.logger.error("unexpected error while replicating for action [{}]. shard [{}]. ", t, this.this$0.actionName, this.shardId);
            this.forceFinishAsFailed(t);
        }

        @Override
        protected void doRun() {
            TransportReplicationAction.setPhase(this.task, "replicating");
            if (this.pending.get() == 0) {
                this.doFinish();
                return;
            }
            for (ShardRouting shard : this.shards) {
                if (!shard.primary() && !this.executeOnReplica || shard.unassigned()) continue;
                if (!this.nodes.localNodeId().equals(shard.currentNodeId())) {
                    this.performOnReplica(shard);
                }
                if (!shard.relocating()) continue;
                this.performOnReplica(shard.buildTargetRelocatingShard());
            }
        }

        void performOnReplica(final ShardRouting shard) {
            final String nodeId = shard.currentNodeId();
            if (!this.nodes.nodeExists(nodeId)) {
                this.this$0.logger.trace("failed to send action [{}] on replica [{}] for request [{}] due to unknown node [{}]", this.this$0.transportReplicaAction, shard.shardId(), this.replicaRequest, nodeId);
                this.onReplicaFailure(nodeId, null);
                return;
            }
            if (this.this$0.logger.isTraceEnabled()) {
                this.this$0.logger.trace("send action [{}] on replica [{}] for request [{}] to [{}]", this.this$0.transportReplicaAction, shard.shardId(), this.replicaRequest, nodeId);
            }
            final DiscoveryNode node = this.nodes.get(nodeId);
            this.this$0.transportService.sendRequest(node, this.this$0.transportReplicaAction, (TransportRequest)this.replicaRequest, this.this$0.transportOptions, new EmptyTransportResponseHandler("same"){

                @Override
                public void handleResponse(TransportResponse.Empty vResponse) {
                    ReplicationPhase.this.onReplicaSuccess();
                }

                @Override
                public void handleException(TransportException exp) {
                    ReplicationPhase.this.onReplicaFailure(nodeId, exp);
                    ReplicationPhase.this.this$0.logger.trace("[{}] transport failure during replica request [{}], action [{}]", exp, node, ReplicationPhase.this.replicaRequest, ReplicationPhase.this.this$0.transportReplicaAction);
                    if (ReplicationPhase.this.this$0.mustFailReplica(exp)) {
                        assert (!ReplicationPhase.this.this$0.ignoreReplicaException(exp));
                        ReplicationPhase.this.this$0.logger.warn("{} failed to perform {} on node {}", exp, ReplicationPhase.this.shardId, ReplicationPhase.this.this$0.transportReplicaAction, node);
                        ReplicationPhase.this.this$0.shardStateAction.shardFailed(shard, ReplicationPhase.this.indexUUID, "failed to perform " + ReplicationPhase.this.this$0.actionName + " on replica on node " + node, exp);
                    }
                }
            });
        }

        void onReplicaFailure(String nodeId, @Nullable Throwable e) {
            if (e != null && !this.this$0.ignoreReplicaException(e)) {
                this.shardReplicaFailures.put(nodeId, e);
            }
            this.decPendingAndFinishIfNeeded();
        }

        void onReplicaSuccess() {
            this.success.incrementAndGet();
            this.decPendingAndFinishIfNeeded();
        }

        private void decPendingAndFinishIfNeeded() {
            if (this.pending.decrementAndGet() <= 0) {
                this.doFinish();
            }
        }

        private void forceFinishAsFailed(Throwable t) {
            TransportReplicationAction.setPhase(this.task, "failed");
            if (this.finished.compareAndSet(false, true)) {
                Releasables.close(this.indexShardReference);
                try {
                    this.channel.sendResponse(t);
                }
                catch (IOException responseException) {
                    this.this$0.logger.warn("failed to send error message back to client for action [{}]", responseException, this.this$0.transportReplicaAction);
                    this.this$0.logger.warn("actual Exception", t, new Object[0]);
                }
            }
        }

        private void doFinish() {
            if (this.finished.compareAndSet(false, true)) {
                ActionWriteResponse.ShardInfo.Failure[] failuresArray;
                TransportReplicationAction.setPhase(this.task, "finished");
                Releasables.close(this.indexShardReference);
                if (!this.shardReplicaFailures.isEmpty()) {
                    int slot = 0;
                    failuresArray = new ActionWriteResponse.ShardInfo.Failure[this.shardReplicaFailures.size()];
                    for (Map.Entry entry : this.shardReplicaFailures.entrySet()) {
                        RestStatus restStatus = ExceptionsHelper.status((Throwable)entry.getValue());
                        failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure(this.shardId.getIndex(), this.shardId.getId(), (String)entry.getKey(), (Throwable)entry.getValue(), restStatus, false);
                    }
                } else {
                    failuresArray = ActionWriteResponse.EMPTY;
                }
                ((ActionWriteResponse)this.finalResponse).setShardInfo(new ActionWriteResponse.ShardInfo(this.totalShards, this.success.get(), failuresArray));
                try {
                    this.channel.sendResponse((TransportResponse)this.finalResponse);
                }
                catch (IOException responseException) {
                    this.this$0.logger.warn("failed to send error message back to client for action [" + this.this$0.transportReplicaAction + "]", responseException, new Object[0]);
                }
                if (this.this$0.logger.isTraceEnabled()) {
                    this.this$0.logger.trace("action [{}] completed on all replicas [{}] for request [{}]", this.this$0.transportReplicaAction, this.shardId, this.replicaRequest);
                }
            }
        }
    }

    static final class PrimaryPhase
    extends AbstractRunnable {
        private final ReplicationTask task;
        private final Request request;
        private final TransportChannel channel;
        private final ClusterState state;
        private final AtomicBoolean finished = new AtomicBoolean();
        private Releasable indexShardReference;
        final /* synthetic */ TransportReplicationAction this$0;

        PrimaryPhase(ReplicationTask task, Request request, TransportChannel channel) {
            this.this$0 = var1_1;
            this.state = var1_1.clusterService.state();
            this.task = task;
            this.request = request;
            this.channel = channel;
        }

        @Override
        public void onFailure(Throwable e) {
            this.finishAsFailed(e);
        }

        @Override
        protected void doRun() throws Exception {
            ReplicationPhase replicationPhase;
            TransportReplicationAction.setPhase(this.task, "primary");
            assert (((ReplicationRequest)this.request).shardId() != null) : "request shardID must be set prior to primary phase";
            ShardId shardId = ((ReplicationRequest)this.request).shardId();
            String writeConsistencyFailure = this.checkWriteConsistency(shardId);
            if (writeConsistencyFailure != null) {
                this.finishBecauseUnavailable(shardId, writeConsistencyFailure);
                return;
            }
            try {
                this.indexShardReference = this.this$0.getIndexShardOperationsCounter(shardId);
                Tuple primaryResponse = this.this$0.shardOperationOnPrimary(this.state.metaData(), this.request);
                if (this.this$0.logger.isTraceEnabled()) {
                    this.this$0.logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", this.this$0.transportPrimaryAction, shardId, this.request, this.state.version());
                }
                replicationPhase = new ReplicationPhase(this.this$0, this.task, (ReplicationRequest)primaryResponse.v2(), (ActionWriteResponse)primaryResponse.v1(), shardId, this.channel, this.indexShardReference);
            }
            catch (Throwable e) {
                ((ReplicationRequest)this.request).setCanHaveDuplicates();
                if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
                    if (this.this$0.logger.isTraceEnabled()) {
                        this.this$0.logger.trace("failed to execute [{}] on [{}]", e, this.request, shardId);
                    }
                } else if (this.this$0.logger.isDebugEnabled()) {
                    this.this$0.logger.debug("failed to execute [{}] on [{}]", e, this.request, shardId);
                }
                this.finishAsFailed(e);
                return;
            }
            this.finishAndMoveToReplication(replicationPhase);
        }

        String checkWriteConsistency(ShardId shardId) {
            int requiredNumber;
            int sizeActive;
            if (!this.this$0.checkWriteConsistency) {
                return null;
            }
            WriteConsistencyLevel consistencyLevel = ((ReplicationRequest)this.request).consistencyLevel() != WriteConsistencyLevel.DEFAULT ? ((ReplicationRequest)this.request).consistencyLevel() : this.this$0.defaultWriteConsistencyLevel;
            IndexRoutingTable indexRoutingTable = this.state.getRoutingTable().index(shardId.getIndex());
            if (indexRoutingTable != null) {
                IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
                if (shardRoutingTable != null) {
                    sizeActive = shardRoutingTable.activeShards().size();
                    requiredNumber = consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2 ? shardRoutingTable.getSize() / 2 + 1 : (consistencyLevel == WriteConsistencyLevel.ALL ? shardRoutingTable.getSize() : 1);
                } else {
                    sizeActive = 0;
                    requiredNumber = 1;
                }
            } else {
                sizeActive = 0;
                requiredNumber = 1;
            }
            if (sizeActive < requiredNumber) {
                this.this$0.logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry. action [{}], request [{}]", new Object[]{shardId, consistencyLevel, sizeActive, requiredNumber, this.this$0.transportPrimaryAction, this.request});
                return "Not enough active copies to meet write consistency of [" + (Object)((Object)consistencyLevel) + "] (have " + sizeActive + ", needed " + requiredNumber + ").";
            }
            return null;
        }

        void finishAndMoveToReplication(ReplicationPhase replicationPhase) {
            if (this.finished.compareAndSet(false, true)) {
                replicationPhase.run();
            } else assert (false) : "finishAndMoveToReplication called but operation is already finished";
        }

        void finishAsFailed(Throwable failure) {
            if (this.finished.compareAndSet(false, true)) {
                TransportReplicationAction.setPhase(this.task, "failed");
                Releasables.close(this.indexShardReference);
                this.this$0.logger.trace("operation failed", failure, new Object[0]);
                try {
                    this.channel.sendResponse(failure);
                }
                catch (IOException responseException) {
                    this.this$0.logger.warn("failed to send error message back to client for action [{}]", responseException, this.this$0.transportPrimaryAction);
                }
            } else assert (false) : "finishAsFailed called but operation is already finished";
        }

        void finishBecauseUnavailable(ShardId shardId, String message) {
            this.finishAsFailed(new UnavailableShardsException(shardId, "{} Timeout: [{}], request: [{}]", message, ((ReplicationRequest)this.request).timeout(), this.request));
        }
    }

    static final class ReroutePhase
    extends AbstractRunnable {
        private final ActionListener<Response> listener;
        private final Request request;
        private final ReplicationTask task;
        private final ClusterStateObserver observer;
        private final AtomicBoolean finished = new AtomicBoolean();
        final /* synthetic */ TransportReplicationAction this$0;

        ReroutePhase(ReplicationTask task, Request request, ActionListener<Response> listener) {
            this.this$0 = var1_1;
            this.request = request;
            if (task != null) {
                ((ChildTaskActionRequest)this.request).setParentTask(var1_1.clusterService.localNode().getId(), task.getId());
            }
            this.listener = listener;
            this.task = task;
            this.observer = new ClusterStateObserver(var1_1.clusterService, ((ReplicationRequest)request).timeout(), ((TransportReplicationAction)var1_1).logger);
        }

        @Override
        public void onFailure(Throwable e) {
            this.finishWithUnexpectedFailure(e);
        }

        @Override
        protected void doRun() {
            TransportReplicationAction.setPhase(this.task, "routing");
            ClusterState state = this.observer.observedState();
            ClusterBlockException blockException = state.blocks().globalBlockedException(this.this$0.globalBlockLevel());
            if (blockException != null) {
                this.handleBlockException(blockException);
                return;
            }
            String concreteIndex = this.this$0.resolveIndex() ? this.this$0.indexNameExpressionResolver.concreteSingleIndex(state, (IndicesRequest)this.request) : ((ReplicationRequest)this.request).index();
            blockException = state.blocks().indexBlockedException(this.this$0.indexBlockLevel(), concreteIndex);
            if (blockException != null) {
                this.handleBlockException(blockException);
                return;
            }
            this.this$0.resolveRequest(state.metaData(), concreteIndex, this.request);
            assert (((ReplicationRequest)this.request).shardId() != null) : "request shardId must be set in resolveRequest";
            IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(((ReplicationRequest)this.request).shardId().getIndex(), ((ReplicationRequest)this.request).shardId().id());
            ShardRouting primary = indexShard.primaryShard();
            if (primary == null || !primary.active()) {
                this.this$0.logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", ((ReplicationRequest)this.request).shardId(), this.this$0.actionName, this.request, state.version());
                this.retryBecauseUnavailable(((ReplicationRequest)this.request).shardId(), "primary shard is not active");
                return;
            }
            if (!state.nodes().nodeExists(primary.currentNodeId())) {
                this.this$0.logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", ((ReplicationRequest)this.request).shardId(), primary.currentNodeId(), this.this$0.actionName, this.request, state.version());
                this.retryBecauseUnavailable(((ReplicationRequest)this.request).shardId(), "primary shard isn't assigned to a known node.");
                return;
            }
            DiscoveryNode node = state.nodes().get(primary.currentNodeId());
            this.this$0.taskManager.registerChildTask(this.task, node.getId());
            if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
                TransportReplicationAction.setPhase(this.task, "waiting_on_primary");
                if (this.this$0.logger.isTraceEnabled()) {
                    this.this$0.logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", this.this$0.transportPrimaryAction, ((ReplicationRequest)this.request).shardId(), this.request, state.version(), primary.currentNodeId());
                }
                this.performAction(node, this.this$0.transportPrimaryAction, true);
            } else {
                if (state.version() < ((ReplicationRequest)this.request).routedBasedOnClusterVersion()) {
                    this.this$0.logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", ((ReplicationRequest)this.request).shardId(), this.request, state.version(), ((ReplicationRequest)this.request).routedBasedOnClusterVersion());
                    this.retryBecauseUnavailable(((ReplicationRequest)this.request).shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + ((ReplicationRequest)this.request).routedBasedOnClusterVersion() + "]");
                    return;
                }
                ((ReplicationRequest)this.request).routedBasedOnClusterVersion(state.version());
                if (this.this$0.logger.isTraceEnabled()) {
                    this.this$0.logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", this.this$0.actionName, ((ReplicationRequest)this.request).shardId(), this.request, state.version(), primary.currentNodeId());
                }
                TransportReplicationAction.setPhase(this.task, "rerouted");
                this.performAction(node, this.this$0.actionName, false);
            }
        }

        private void handleBlockException(ClusterBlockException blockException) {
            if (blockException.retryable()) {
                this.this$0.logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                this.retry(blockException);
            } else {
                this.finishAsFailed(blockException);
            }
        }

        private void performAction(final DiscoveryNode node, String action, final boolean isPrimaryAction) {
            this.this$0.transportService.sendRequest(node, action, (TransportRequest)this.request, this.this$0.transportOptions, new BaseTransportResponseHandler<Response>(){

                @Override
                public Response newInstance() {
                    return ReroutePhase.this.this$0.newResponseInstance();
                }

                @Override
                public String executor() {
                    return "same";
                }

                @Override
                public void handleResponse(Response response) {
                    ReroutePhase.this.finishOnSuccess(response);
                }

                @Override
                public void handleException(TransportException exp) {
                    try {
                        if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || isPrimaryAction && ReroutePhase.this.this$0.retryPrimaryException(exp.unwrapCause())) {
                            ReroutePhase.this.this$0.logger.trace("received an error from node [{}] for request [{}], scheduling a retry", exp, node.id(), ReroutePhase.this.request);
                            ReroutePhase.this.request.setCanHaveDuplicates();
                            ReroutePhase.this.retry(exp);
                        } else {
                            ReroutePhase.this.finishAsFailed(exp);
                        }
                    }
                    catch (Throwable t) {
                        ReroutePhase.this.finishWithUnexpectedFailure(t);
                    }
                }
            });
        }

        void retry(Throwable failure) {
            assert (failure != null);
            if (this.observer.isTimedOut()) {
                this.finishAsFailed(failure);
                return;
            }
            TransportReplicationAction.setPhase(this.task, "waiting_for_retry");
            this.observer.waitForNextChange(new ClusterStateObserver.Listener(){

                @Override
                public void onNewClusterState(ClusterState state) {
                    ReroutePhase.this.run();
                }

                @Override
                public void onClusterServiceClose() {
                    ReroutePhase.this.finishAsFailed(new NodeClosedException(ReroutePhase.this.this$0.clusterService.localNode()));
                }

                @Override
                public void onTimeout(TimeValue timeout) {
                    ReroutePhase.this.run();
                }
            });
        }

        void finishAsFailed(Throwable failure) {
            if (this.finished.compareAndSet(false, true)) {
                TransportReplicationAction.setPhase(this.task, "failed");
                this.this$0.logger.trace("operation failed. action [{}], request [{}]", failure, this.this$0.actionName, this.request);
                this.listener.onFailure(failure);
            } else assert (false) : "finishAsFailed called but operation is already finished";
        }

        void finishWithUnexpectedFailure(Throwable failure) {
            this.this$0.logger.warn("unexpected error during the primary phase for action [{}], request [{}]", failure, this.this$0.actionName, this.request);
            if (this.finished.compareAndSet(false, true)) {
                TransportReplicationAction.setPhase(this.task, "failed");
                this.listener.onFailure(failure);
            } else assert (false) : "finishWithUnexpectedFailure called but operation is already finished";
        }

        void finishOnSuccess(Response response) {
            if (this.finished.compareAndSet(false, true)) {
                TransportReplicationAction.setPhase(this.task, "finished");
                if (this.this$0.logger.isTraceEnabled()) {
                    this.this$0.logger.trace("operation succeeded. action [{}],request [{}]", this.this$0.actionName, this.request);
                }
                this.listener.onResponse(response);
            } else assert (false) : "finishOnSuccess called but operation is already finished";
        }

        void retryBecauseUnavailable(ShardId shardId, String message) {
            this.retry(new UnavailableShardsException(shardId, "{} Timeout: [{}], request: [{}]", message, ((ReplicationRequest)this.request).timeout(), this.request));
        }
    }

    public static class RetryOnPrimaryException
    extends ElasticsearchException {
        public RetryOnPrimaryException(ShardId shardId, String msg) {
            super(msg, new Object[0]);
            this.setShard(shardId);
        }

        public RetryOnPrimaryException(StreamInput in) throws IOException {
            super(in);
        }
    }

    private static final class AsyncReplicaAction
    extends AbstractRunnable {
        private final ReplicaRequest request;
        private final TransportChannel channel;
        private final ReplicationTask task;
        private final ClusterStateObserver observer;
        final /* synthetic */ TransportReplicationAction this$0;

        AsyncReplicaAction(ReplicaRequest request, TransportChannel channel, ReplicationTask task) {
            this.this$0 = var1_1;
            this.observer = new ClusterStateObserver(this.this$0.clusterService, null, this.this$0.logger);
            this.request = request;
            this.channel = channel;
            this.task = task;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onFailure(Throwable t) {
            if (t instanceof RetryOnReplicaException) {
                this.this$0.logger.trace("Retrying operation on replica, action [{}], request [{}]", t, this.this$0.transportReplicaAction, this.request);
                this.observer.waitForNextChange(new ClusterStateObserver.Listener(){

                    @Override
                    public void onNewClusterState(ClusterState state) {
                        String extraMessage = "action [" + AsyncReplicaAction.this.this$0.transportReplicaAction + "], request[" + AsyncReplicaAction.this.request + "]";
                        TransportChannelResponseHandler<TransportResponse.Empty> handler = TransportChannelResponseHandler.emptyResponseHandler(AsyncReplicaAction.this.this$0.logger, AsyncReplicaAction.this.channel, extraMessage);
                        AsyncReplicaAction.this.this$0.transportService.sendRequest(AsyncReplicaAction.this.this$0.clusterService.localNode(), AsyncReplicaAction.this.this$0.transportReplicaAction, AsyncReplicaAction.this.request, handler);
                    }

                    @Override
                    public void onClusterServiceClose() {
                        AsyncReplicaAction.this.responseWithFailure(new NodeClosedException(AsyncReplicaAction.this.this$0.clusterService.localNode()));
                    }

                    @Override
                    public void onTimeout(TimeValue timeout) {
                        throw new AssertionError((Object)"Cannot happen: there is not timeout");
                    }
                });
            } else {
                try {
                    this.failReplicaIfNeeded(t);
                }
                catch (Throwable unexpected) {
                    this.this$0.logger.error("{} unexpected error while failing replica", unexpected, ((ReplicationRequest)this.request).shardId().id());
                }
                finally {
                    this.responseWithFailure(t);
                }
            }
        }

        private void failReplicaIfNeeded(Throwable t) {
            String index = ((ReplicationRequest)this.request).shardId().getIndex();
            int shardId = ((ReplicationRequest)this.request).shardId().id();
            this.this$0.logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, this.this$0.actionName, this.request);
            if (this.this$0.mustFailReplica(t)) {
                assert (!this.this$0.ignoreReplicaException(t));
                IndexService indexService = this.this$0.indicesService.indexService(index);
                if (indexService == null) {
                    this.this$0.logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
                    return;
                }
                IndexShard indexShard = indexService.shard(shardId);
                if (indexShard == null) {
                    this.this$0.logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
                    return;
                }
                indexShard.failShard(this.this$0.actionName + " failed on replica", t);
            }
        }

        protected void responseWithFailure(Throwable t) {
            try {
                this.channel.sendResponse(t);
            }
            catch (IOException responseException) {
                this.this$0.logger.warn("failed to send error message back to client for action [" + this.this$0.transportReplicaAction + "]", responseException, new Object[0]);
                this.this$0.logger.warn("actual Exception", t, new Object[0]);
            }
        }

        @Override
        protected void doRun() throws Exception {
            TransportReplicationAction.setPhase(this.task, "replica");
            assert (((ReplicationRequest)this.request).shardId() != null) : "request shardId must be set";
            try (Releasable ignored = this.this$0.getIndexShardOperationsCounter(((ReplicationRequest)this.request).shardId());){
                this.this$0.shardOperationOnReplica(this.request);
                if (this.this$0.logger.isTraceEnabled()) {
                    this.this$0.logger.trace("action [{}] completed on shard [{}] for request [{}]", this.this$0.transportReplicaAction, ((ReplicationRequest)this.request).shardId(), this.request);
                }
            }
            TransportReplicationAction.setPhase(this.task, "finished");
            this.channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

    public static class RetryOnReplicaException
    extends ElasticsearchException {
        public RetryOnReplicaException(ShardId shardId, String msg) {
            super(msg, new Object[0]);
            this.setShard(shardId);
        }

        public RetryOnReplicaException(StreamInput in) throws IOException {
            super(in);
        }
    }

    class ReplicaOperationTransportHandler
    extends TransportRequestHandler<ReplicaRequest> {
        ReplicaOperationTransportHandler() {
        }

        @Override
        public void messageReceived(ReplicaRequest request, TransportChannel channel) throws Exception {
            throw new UnsupportedOperationException("the task parameter is required for this operation");
        }

        @Override
        public void messageReceived(ReplicaRequest request, TransportChannel channel, Task task) throws Exception {
            new AsyncReplicaAction(TransportReplicationAction.this, request, channel, (ReplicationTask)task).run();
        }
    }

    class PrimaryOperationTransportHandler
    extends TransportRequestHandler<Request> {
        PrimaryOperationTransportHandler() {
        }

        @Override
        public void messageReceived(Request request, TransportChannel channel) throws Exception {
            throw new UnsupportedOperationException("the task parameter is required for this operation");
        }

        @Override
        public void messageReceived(Request request, TransportChannel channel, Task task) throws Exception {
            new PrimaryPhase(TransportReplicationAction.this, (ReplicationTask)task, request, channel).run();
        }
    }

    class OperationTransportHandler
    extends TransportRequestHandler<Request> {
        OperationTransportHandler() {
        }

        @Override
        public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception {
            TransportReplicationAction.this.execute(task, request, new ActionListener<Response>(){

                @Override
                public void onResponse(Response result) {
                    try {
                        channel.sendResponse((TransportResponse)result);
                    }
                    catch (Throwable e) {
                        this.onFailure(e);
                    }
                }

                @Override
                public void onFailure(Throwable e) {
                    try {
                        channel.sendResponse(e);
                    }
                    catch (Throwable e1) {
                        TransportReplicationAction.this.logger.warn("Failed to send response for " + TransportReplicationAction.this.actionName, e1, new Object[0]);
                    }
                }
            });
        }

        @Override
        public void messageReceived(Request request, TransportChannel channel) throws Exception {
            throw new UnsupportedOperationException("the task parameter is required for this operation");
        }
    }

    protected static class WriteResult<T extends ActionWriteResponse> {
        public final T response;
        public final Translog.Location location;

        public WriteResult(T response, Translog.Location location) {
            this.response = response;
            this.location = location;
        }

        public <T extends ActionWriteResponse> T response() {
            ((ActionWriteResponse)this.response).setShardInfo(new ActionWriteResponse.ShardInfo());
            return this.response;
        }
    }
}

