/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.indices.recovery;

import java.io.IOException;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.DelayRecoveryException;
import org.elasticsearch.indices.recovery.RecoveriesCollection;
import org.elasticsearch.indices.recovery.RecoveryCleanFilesRequest;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest;
import org.elasticsearch.indices.recovery.RecoveryFinalizeRecoveryRequest;
import org.elasticsearch.indices.recovery.RecoveryHandoffPrimaryContextRequest;
import org.elasticsearch.indices.recovery.RecoveryPrepareForTranslogOperationsRequest;
import org.elasticsearch.indices.recovery.RecoveryResponse;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTranslogOperationsRequest;
import org.elasticsearch.indices.recovery.RecoveryTranslogOperationsResponse;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

public class PeerRecoveryTargetService
implements IndexEventListener {
    private static final Logger logger = LogManager.getLogger(PeerRecoveryTargetService.class);
    private final ThreadPool threadPool;
    private final TransportService transportService;
    private final RecoverySettings recoverySettings;
    private final ClusterService clusterService;
    private final RecoveriesCollection onGoingRecoveries;

    public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.recoverySettings = recoverySettings;
        this.clusterService = clusterService;
        this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
        transportService.registerRequestHandler("internal:index/shard/recovery/filesInfo", RecoveryFilesInfoRequest::new, "generic", new FilesInfoRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/recovery/file_chunk", RecoveryFileChunkRequest::new, "generic", new FileChunkTransportRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/recovery/clean_files", "generic", RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/recovery/prepare_translog", "generic", RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/recovery/translog_ops", "generic", RecoveryTranslogOperationsRequest::new, new TranslogOperationsRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/recovery/finalize", RecoveryFinalizeRecoveryRequest::new, "generic", new FinalizeRecoveryRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/recovery/handoff_primary_context", RecoveryHandoffPrimaryContextRequest::new, "generic", new HandoffPrimaryContextRequestHandler());
    }

    @Override
    public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
        if (indexShard != null) {
            this.onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
        }
    }

    public void startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryListener listener) {
        long recoveryId = this.onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, this.recoverySettings.activityTimeout());
        this.threadPool.generic().execute(new RecoveryRunner(recoveryId));
    }

    protected void retryRecovery(long recoveryId, Throwable reason, TimeValue retryAfter, TimeValue activityTimeout) {
        logger.trace(() -> new ParameterizedMessage("will retry recovery with id [{}] in [{}]", (Object)recoveryId, (Object)retryAfter), reason);
        this.retryRecovery(recoveryId, retryAfter, activityTimeout);
    }

    protected void retryRecovery(long recoveryId, String reason, TimeValue retryAfter, TimeValue activityTimeout) {
        logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", (Object)recoveryId, (Object)retryAfter, (Object)reason);
        this.retryRecovery(recoveryId, retryAfter, activityTimeout);
    }

    private void retryRecovery(long recoveryId, TimeValue retryAfter, TimeValue activityTimeout) {
        RecoveryTarget newTarget = this.onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
        if (newTarget != null) {
            this.threadPool.schedule(new RecoveryRunner(newTarget.recoveryId()), retryAfter, "generic");
        }
    }

    private void doRecovery(final long recoveryId) {
        StartRecoveryRequest request;
        CancellableThreads cancellableThreads;
        RecoveryState.Timer timer;
        try (RecoveriesCollection.RecoveryRef recoveryRef = this.onGoingRecoveries.getRecovery(recoveryId);){
            if (recoveryRef == null) {
                logger.trace("not running recovery with id [{}] - can not find it (probably finished)", (Object)recoveryId);
                return;
            }
            RecoveryTarget recoveryTarget = recoveryRef.target();
            timer = recoveryTarget.state().getTimer();
            cancellableThreads = recoveryTarget.cancellableThreads();
            try {
                assert (recoveryTarget.sourceNode() != null) : "can not do a recovery without a source node";
                request = this.getStartRecoveryRequest(recoveryTarget);
                logger.trace("{} preparing shard for peer recovery", (Object)recoveryTarget.shardId());
                recoveryTarget.indexShard().prepareForIndexRecovery();
            }
            catch (Exception e2) {
                logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", (Throwable)e2);
                this.onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", (Throwable)e2), true);
                if (recoveryRef != null) {
                    recoveryRef.close();
                }
                return;
            }
        }
        final Consumer<Exception> handleException = e -> {
            Throwable cause;
            if (logger.isTraceEnabled()) {
                logger.trace(() -> new ParameterizedMessage("[{}][{}] Got exception on recovery", (Object)request.shardId().getIndex().getName(), (Object)request.shardId().id()), (Throwable)e);
            }
            if ((cause = ExceptionsHelper.unwrapCause(e)) instanceof CancellableThreads.ExecutionCancelledException) {
                this.onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source has canceled the recovery", cause), false);
                return;
            }
            if (cause instanceof RecoveryEngineException) {
                cause = cause.getCause();
            }
            if ((cause = ExceptionsHelper.unwrapCause(cause)) instanceof RecoveryEngineException) {
                cause = cause.getCause();
            }
            if (cause instanceof IllegalIndexShardStateException || cause instanceof org.elasticsearch.index.IndexNotFoundException || cause instanceof ShardNotFoundException) {
                this.retryRecovery(recoveryId, "remote shard not ready", this.recoverySettings.retryDelayStateSync(), this.recoverySettings.activityTimeout());
                return;
            }
            if (cause instanceof DelayRecoveryException) {
                this.retryRecovery(recoveryId, cause, this.recoverySettings.retryDelayStateSync(), this.recoverySettings.activityTimeout());
                return;
            }
            if (cause instanceof ConnectTransportException) {
                logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", (Object)request.shardId(), (Object)this.recoverySettings.retryDelayNetwork(), (Object)cause.getMessage());
                this.retryRecovery(recoveryId, cause.getMessage(), this.recoverySettings.retryDelayNetwork(), this.recoverySettings.activityTimeout());
                return;
            }
            if (cause instanceof AlreadyClosedException) {
                this.onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false);
                return;
            }
            this.onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, (Throwable)e), true);
        };
        try {
            logger.trace("{} starting recovery from {}", (Object)request.shardId(), (Object)request.sourceNode());
            cancellableThreads.executeIO(() -> this.transportService.submitRequest(request.sourceNode(), "internal:index/shard/recovery/start_recovery", request, new TransportResponseHandler<RecoveryResponse>(){

                @Override
                public void handleResponse(RecoveryResponse recoveryResponse) {
                    TimeValue recoveryTime = new TimeValue(timer.time());
                    PeerRecoveryTargetService.this.onGoingRecoveries.markRecoveryAsDone(recoveryId);
                    if (logger.isTraceEnabled()) {
                        StringBuilder sb = new StringBuilder();
                        sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()).append("] ");
                        sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n");
                        sb.append("   phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]").append(", took [").append(TimeValue.timeValueMillis((long)recoveryResponse.phase1Time)).append("], throttling_wait [").append(TimeValue.timeValueMillis((long)recoveryResponse.phase1ThrottlingWaitTime)).append(']').append("\n");
                        sb.append("         : reusing_files   [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
                        sb.append("   phase2: start took [").append(TimeValue.timeValueMillis((long)recoveryResponse.startTime)).append("]\n");
                        sb.append("         : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations").append(", took [").append(TimeValue.timeValueMillis((long)recoveryResponse.phase2Time)).append("]").append("\n");
                        logger.trace("{}", (Object)sb);
                    } else {
                        logger.debug("{} recovery done from [{}], took [{}]", (Object)request.shardId(), (Object)request.sourceNode(), (Object)recoveryTime);
                    }
                }

                @Override
                public void handleException(TransportException e) {
                    handleException.accept(e);
                }

                @Override
                public String executor() {
                    return "generic";
                }

                @Override
                public RecoveryResponse read(StreamInput in) throws IOException {
                    return new RecoveryResponse(in);
                }
            }));
        }
        catch (CancellableThreads.ExecutionCancelledException e3) {
            logger.trace("recovery cancelled", (Throwable)e3);
        }
        catch (Exception e4) {
            handleException.accept(e4);
        }
    }

    private Store.MetadataSnapshot getStoreMetadataSnapshot(RecoveryTarget recoveryTarget) {
        try {
            return recoveryTarget.indexShard().snapshotStoreMetadata();
        }
        catch (IndexNotFoundException e) {
            logger.trace("{} shard folder empty, recovering all files", (Object)recoveryTarget);
            return Store.MetadataSnapshot.EMPTY;
        }
        catch (IOException e) {
            logger.warn("error while listing local files, recovering as if there are none", (Throwable)e);
            return Store.MetadataSnapshot.EMPTY;
        }
    }

    private StartRecoveryRequest getStartRecoveryRequest(RecoveryTarget recoveryTarget) {
        logger.trace("{} collecting local files for [{}]", (Object)recoveryTarget.shardId(), (Object)recoveryTarget.sourceNode());
        Store.MetadataSnapshot metadataSnapshot = this.getStoreMetadataSnapshot(recoveryTarget);
        logger.trace("{} local file count [{}]", (Object)recoveryTarget.shardId(), (Object)metadataSnapshot.size());
        long startingSeqNo = metadataSnapshot.size() > 0 ? PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget) : -2L;
        if (startingSeqNo == -2L) {
            logger.trace("{} preparing for file-based recovery from [{}]", (Object)recoveryTarget.shardId(), (Object)recoveryTarget.sourceNode());
        } else {
            logger.trace("{} preparing for sequence-number-based recovery starting at sequence number [{}] from [{}]", (Object)recoveryTarget.shardId(), (Object)startingSeqNo, (Object)recoveryTarget.sourceNode());
        }
        StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), this.clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId(), startingSeqNo);
        return request;
    }

    public static long getStartingSeqNo(Logger logger, RecoveryTarget recoveryTarget) {
        try {
            Store store = recoveryTarget.store();
            String translogUUID = (String)store.readLastCommittedSegmentsInfo().getUserData().get("translog_uuid");
            long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
            List existingCommits = DirectoryReader.listCommits((Directory)store.directory());
            IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
            SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
            if (logger.isTraceEnabled()) {
                StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
                for (IndexCommit commit : existingCommits) {
                    descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
                }
                logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]", (Object)globalCheckpoint, (Object)CombinedDeletionPolicy.commitDescription(safeCommit), (Object)descriptionOfExistingCommits);
            }
            if (seqNoStats.maxSeqNo <= globalCheckpoint) {
                assert (seqNoStats.localCheckpoint <= globalCheckpoint);
                return seqNoStats.localCheckpoint + 1L;
            }
            return -2L;
        }
        catch (IOException | TranslogCorruptedException e) {
            return -2L;
        }
    }

    class RecoveryRunner
    extends AbstractRunnable {
        final long recoveryId;

        RecoveryRunner(long recoveryId) {
            this.recoveryId = recoveryId;
        }

        @Override
        public void onFailure(Exception e) {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecovery(this.recoveryId);){
                if (recoveryRef != null) {
                    logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", (Object)this.recoveryId), (Throwable)e);
                    PeerRecoveryTargetService.this.onGoingRecoveries.failRecovery(this.recoveryId, new RecoveryFailedException(recoveryRef.target().state(), "unexpected error", (Throwable)e), true);
                } else {
                    logger.debug(() -> new ParameterizedMessage("unexpected error during recovery, but recovery id [{}] is finished", (Object)this.recoveryId), (Throwable)e);
                }
            }
        }

        @Override
        public void doRun() {
            PeerRecoveryTargetService.this.doRecovery(this.recoveryId);
        }
    }

    class FileChunkTransportRequestHandler
    implements TransportRequestHandler<RecoveryFileChunkRequest> {
        final AtomicLong bytesSinceLastPause = new AtomicLong();

        FileChunkTransportRequestHandler() {
        }

        @Override
        public void messageReceived(RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                long bytes;
                RateLimiter rateLimiter;
                RecoveryTarget recoveryTarget = recoveryRef.target();
                RecoveryState.Index indexState = recoveryTarget.state().getIndex();
                if (request.sourceThrottleTimeInNanos() != -1L) {
                    indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
                }
                if ((rateLimiter = PeerRecoveryTargetService.this.recoverySettings.rateLimiter()) != null && (bytes = this.bytesSinceLastPause.addAndGet(request.content().length())) > rateLimiter.getMinPauseCheckBytes()) {
                    this.bytesSinceLastPause.addAndGet(-bytes);
                    long throttleTimeInNanos = rateLimiter.pause(bytes);
                    indexState.addTargetThrottling(throttleTimeInNanos);
                    recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
                }
                ChannelActionListener listener = new ChannelActionListener(channel, "internal:index/shard/recovery/file_chunk", request);
                recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(), request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    }

    class CleanFilesRequestHandler
    implements TransportRequestHandler<RecoveryCleanFilesRequest> {
        CleanFilesRequestHandler() {
        }

        @Override
        public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                ChannelActionListener listener = new ChannelActionListener(channel, "internal:index/shard/recovery/clean_files", request);
                recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    }

    class FilesInfoRequestHandler
    implements TransportRequestHandler<RecoveryFilesInfoRequest> {
        FilesInfoRequestHandler() {
        }

        @Override
        public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                ChannelActionListener listener = new ChannelActionListener(channel, "internal:index/shard/recovery/filesInfo", request);
                recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes, request.totalTranslogOps, ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    }

    class TranslogOperationsRequestHandler
    implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
        TranslogOperationsRequestHandler() {
        }

        @Override
        public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel, final Task task) throws IOException {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                ClusterStateObserver observer = new ClusterStateObserver(PeerRecoveryTargetService.this.clusterService, null, logger, PeerRecoveryTargetService.this.threadPool.getThreadContext());
                RecoveryTarget recoveryTarget = recoveryRef.target();
                final ChannelActionListener listener = new ChannelActionListener(channel, "internal:index/shard/recovery/translog_ops", request);
                Consumer<Exception> retryOnMappingException = exception -> {
                    logger.debug("delaying recovery due to missing mapping changes", (Throwable)exception);
                    observer.waitForNextChange(new ClusterStateObserver.Listener(){

                        @Override
                        public void onNewClusterState(ClusterState state) {
                            try {
                                TranslogOperationsRequestHandler.this.messageReceived(request, channel, task);
                            }
                            catch (Exception e) {
                                listener.onFailure(e);
                            }
                        }

                        @Override
                        public void onClusterServiceClose() {
                            listener.onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates", new Object[0]));
                        }

                        @Override
                        public void onTimeout(TimeValue timeout) {
                            listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout + "])", new Object[0]));
                        }
                    });
                };
                IndexMetaData indexMetaData = PeerRecoveryTargetService.this.clusterService.state().metaData().index(request.shardId().getIndex());
                long mappingVersionOnTarget = indexMetaData != null ? indexMetaData.getMappingVersion() : 0L;
                recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), request.retentionLeases(), request.mappingVersionOnPrimary(), ActionListener.wrap(checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse((long)checkpoint)), e -> {
                    if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
                        retryOnMappingException.accept((Exception)e);
                    } else {
                        listener.onFailure((Exception)e);
                    }
                }));
            }
        }
    }

    class HandoffPrimaryContextRequestHandler
    implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
        HandoffPrimaryContextRequestHandler() {
        }

        @Override
        public void messageReceived(RecoveryHandoffPrimaryContextRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                recoveryRef.target().handoffPrimaryContext(request.primaryContext());
            }
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

    class FinalizeRecoveryRequestHandler
    implements TransportRequestHandler<RecoveryFinalizeRecoveryRequest> {
        FinalizeRecoveryRequestHandler() {
        }

        @Override
        public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                ChannelActionListener listener = new ChannelActionListener(channel, "internal:index/shard/recovery/finalize", request);
                recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    }

    class PrepareForTranslogOperationsRequestHandler
    implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
        PrepareForTranslogOperationsRequestHandler() {
        }

        @Override
        public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
            try (RecoveriesCollection.RecoveryRef recoveryRef = PeerRecoveryTargetService.this.onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());){
                ChannelActionListener listener = new ChannelActionListener(channel, "internal:index/shard/recovery/prepare_translog", request);
                recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    }

    public static interface RecoveryListener {
        public void onRecoveryDone(RecoveryState var1);

        public void onRecoveryFailure(RecoveryState var1, RecoveryFailedException var2, boolean var3);
    }

    public static class Actions {
        public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo";
        public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk";
        public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files";
        public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
        public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
        public static final String FINALIZE = "internal:index/shard/recovery/finalize";
        public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
    }
}

