/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FederationInterceptor
extends AbstractRequestInterceptor {
    private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class);
    public static final String NMSS_CLASS_PREFIX = "FederationInterceptor/";
    public static final String NMSS_REG_REQUEST_KEY = "FederationInterceptor/registerRequest";
    public static final String NMSS_REG_RESPONSE_KEY = "FederationInterceptor/registerResponse";
    public static final String NMSS_SECONDARY_SC_PREFIX = "FederationInterceptor/secondarySC/";
    public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
    private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null);
    private AllocateResponse lastAllocateResponse;
    private final Object lastAllocateResponseLock = new Object();
    private ApplicationAttemptId attemptId;
    private AMRMClientRelayer homeRMRelayer;
    private SubClusterId homeSubClusterId;
    private AMHeartbeatRequestHandler homeHeartbeartHandler;
    private UnmanagedAMPoolManager uamPool;
    private Map<String, AMRMClientRelayer> secondaryRelayers;
    private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
    private Map<SubClusterId, AllocateResponse> lastSCResponse;
    private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;
    private Map<SubClusterId, Future<?>> uamRegisterFutures;
    private ExecutorService threadpool;
    private volatile boolean justRecovered = false;
    private volatile boolean finishAMCalled = false;
    private Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap;
    private RegisterApplicationMasterRequest amRegistrationRequest = null;
    private RegisterApplicationMasterResponse amRegistrationResponse = null;
    private FederationStateStoreFacade federationFacade;
    private SubClusterResolver subClusterResolver;
    private Map<SubClusterId, Long> lastSCResponseTime;
    private long subClusterTimeOut;
    private long lastAMHeartbeatTime;
    private FederationAMRMProxyPolicy policyInterpreter;
    private FederationRegistryClient registryClient;
    private long heartbeatMaxWaitTimeMs;
    private MonotonicClock clock = new MonotonicClock();

    public FederationInterceptor() {
        this.containerIdToSubClusterIdMap = new ConcurrentHashMap<ContainerId, SubClusterId>();
        this.asyncResponseSink = new ConcurrentHashMap<SubClusterId, List<AllocateResponse>>();
        this.lastSCResponse = new ConcurrentHashMap<SubClusterId, AllocateResponse>();
        this.uamRegistrations = new ConcurrentHashMap<SubClusterId, RegisterApplicationMasterResponse>();
        this.uamRegisterFutures = new ConcurrentHashMap();
        this.threadpool = Executors.newCachedThreadPool();
        this.uamPool = this.createUnmanagedAMPoolManager(this.threadpool);
        this.secondaryRelayers = new ConcurrentHashMap<String, AMRMClientRelayer>();
        this.lastSCResponseTime = new ConcurrentHashMap<SubClusterId, Long>();
        this.lastAMHeartbeatTime = this.clock.getTime();
    }

    @Override
    public void init(AMRMProxyApplicationContext appContext) {
        UserGroupInformation appOwner;
        super.init(appContext);
        LOG.info("Initializing Federation Interceptor");
        Configuration conf = appContext.getConf();
        if (conf == null) {
            conf = this.getConf();
        } else {
            this.setConf(conf);
        }
        try {
            appOwner = UserGroupInformation.createProxyUser((String)appContext.getUser(), (UserGroupInformation)UserGroupInformation.getCurrentUser());
        }
        catch (Exception ex) {
            throw new YarnRuntimeException((Throwable)ex);
        }
        if (appContext.getRegistryClient() != null) {
            this.registryClient = new FederationRegistryClient(conf, appContext.getRegistryClient(), appOwner);
            if (appContext.getCredentials() != null) {
                appOwner.addCredentials(appContext.getCredentials());
            }
        }
        this.attemptId = appContext.getApplicationAttemptId();
        ApplicationId appId = this.attemptId.getApplicationId();
        this.homeSubClusterId = SubClusterId.newInstance((String)YarnConfiguration.getClusterId((Configuration)conf));
        this.homeRMRelayer = new AMRMClientRelayer(this.createHomeRMProxy(appContext, ApplicationMasterProtocol.class, appOwner), appId, this.homeSubClusterId.toString());
        this.homeHeartbeartHandler = this.createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer);
        this.homeHeartbeartHandler.setUGI(appOwner);
        this.homeHeartbeartHandler.setDaemon(true);
        this.homeHeartbeartHandler.start();
        this.lastAllocateResponse = (AllocateResponse)RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
        this.lastAllocateResponse.setResponseId(-1);
        this.federationFacade = FederationStateStoreFacade.getInstance();
        this.subClusterResolver = this.federationFacade.getSubClusterResolver();
        this.policyInterpreter = null;
        this.uamPool.init(conf);
        this.uamPool.start();
        this.heartbeatMaxWaitTimeMs = conf.getLong("yarn.federation.amrmproxy.hb.maximum.wait.ms", 5000L);
        this.subClusterTimeOut = conf.getLong("yarn.federation.amrmproxy.subcluster.timeout.ms", 60000L);
        if (this.subClusterTimeOut <= 0L) {
            LOG.info("{} configured to be {}, should be positive. Using default of {}.", new Object[]{"yarn.federation.amrmproxy.subcluster.timeout.ms", this.subClusterTimeOut, 60000L});
            this.subClusterTimeOut = 60000L;
        }
    }

    @Override
    public void recover(Map<String, byte[]> recoveredDataMap) {
        super.recover(recoveredDataMap);
        LOG.info("Recovering data for FederationInterceptor for {}", (Object)this.attemptId);
        this.justRecovered = true;
        if (recoveredDataMap == null) {
            return;
        }
        try {
            Map<String, Token> uamMap;
            YarnServiceProtos.RegisterApplicationMasterRequestProto pb;
            if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
                pb = YarnServiceProtos.RegisterApplicationMasterRequestProto.parseFrom((byte[])recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
                this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb);
                LOG.info("amRegistrationRequest recovered for {}", (Object)this.attemptId);
                this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
            }
            if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
                pb = YarnServiceProtos.RegisterApplicationMasterResponseProto.parseFrom((byte[])recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
                this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl((YarnServiceProtos.RegisterApplicationMasterResponseProto)pb);
                LOG.info("amRegistrationResponse recovered for {}", (Object)this.attemptId);
            }
            if (this.registryClient != null) {
                uamMap = this.registryClient.loadStateFromRegistry(this.attemptId.getApplicationId());
                LOG.info("Found {} existing UAMs for application {} in Yarn Registry", (Object)uamMap.size(), (Object)this.attemptId.getApplicationId());
            } else {
                uamMap = new HashMap();
                for (Map.Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
                    if (!entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) continue;
                    String string = entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length());
                    Token amrmToken = new Token();
                    amrmToken.decodeFromUrlString(new String((byte[])entry.getValue(), STRING_TO_BYTE_FORMAT));
                    uamMap.put(string, amrmToken);
                    LOG.debug("Recovered UAM in {} from NMSS", (Object)string);
                }
                LOG.info("Found {} existing UAMs for application {} in NMStateStore", (Object)uamMap.size(), (Object)this.attemptId.getApplicationId());
            }
            int containers = 0;
            for (Map.Entry entry : uamMap.entrySet()) {
                SubClusterId subClusterId = SubClusterId.newInstance((String)((String)entry.getKey()));
                YarnConfiguration config = new YarnConfiguration(this.getConf());
                FederationProxyProviderUtil.updateConfForFederation((Configuration)config, (String)subClusterId.getId());
                try {
                    this.uamPool.reAttachUAM(subClusterId.getId(), (Configuration)config, this.attemptId.getApplicationId(), this.amRegistrationResponse.getQueue(), this.getApplicationContext().getUser(), this.homeSubClusterId.getId(), (Token)entry.getValue(), subClusterId.toString());
                    this.secondaryRelayers.put(subClusterId.getId(), this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
                    RegisterApplicationMasterResponse response = this.uamPool.registerApplicationMaster(subClusterId.getId(), this.amRegistrationRequest);
                    this.lastSCResponseTime.put(subClusterId, this.clock.getTime() - this.subClusterTimeOut);
                    for (Container container : response.getContainersFromPreviousAttempts()) {
                        this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
                        ++containers;
                        LOG.debug("  From subcluster {} running container {}", (Object)subClusterId, (Object)container.getId());
                    }
                    LOG.info("Recovered {} running containers from UAM in {}", (Object)response.getContainersFromPreviousAttempts().size(), (Object)subClusterId);
                }
                catch (Exception e) {
                    LOG.error("Error reattaching UAM to " + subClusterId + " for " + this.attemptId, (Throwable)e);
                }
            }
            UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser((String)this.getApplicationContext().getUser());
            ApplicationClientProtocol applicationClientProtocol = this.createHomeRMProxy(this.getApplicationContext(), ApplicationClientProtocol.class, appSubmitter);
            GetContainersResponse response = applicationClientProtocol.getContainers(GetContainersRequest.newInstance((ApplicationAttemptId)this.attemptId));
            for (ContainerReport container : response.getContainerList()) {
                this.containerIdToSubClusterIdMap.put(container.getContainerId(), this.homeSubClusterId);
                ++containers;
                LOG.debug("  From home RM {} running container {}", (Object)this.homeSubClusterId, (Object)container.getContainerId());
            }
            LOG.info("{} running containers including AM recovered from home RM {}", (Object)response.getContainerList().size(), (Object)this.homeSubClusterId);
            LOG.info("In all {} UAMs {} running containers including AM recovered for {}", new Object[]{uamMap.size(), containers, this.attemptId});
            if (this.amRegistrationResponse != null) {
                String queue = this.amRegistrationResponse.getQueue();
                this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy((String)queue, (FederationAMRMProxyPolicy)this.policyInterpreter, (Configuration)this.getConf(), (FederationStateStoreFacade)this.federationFacade, (SubClusterId)this.homeSubClusterId);
            }
        }
        catch (IOException | YarnException e) {
            throw new YarnRuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException {
        String queue;
        Object object = this.lastAllocateResponseLock;
        synchronized (object) {
            this.lastAllocateResponse.setResponseId(0);
        }
        this.justRecovered = false;
        if (this.amRegistrationRequest != null) {
            if (!this.amRegistrationRequest.equals(request)) {
                throw new YarnException("AM should not call registerApplicationMaster with a different request body");
            }
        } else {
            this.amRegistrationRequest = request;
            if (this.getNMStateStore() != null) {
                try {
                    RegisterApplicationMasterRequestPBImpl pb = (RegisterApplicationMasterRequestPBImpl)this.amRegistrationRequest;
                    this.getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
                }
                catch (Exception e) {
                    LOG.error("Error storing AMRMProxy application context entry for " + this.attemptId, (Throwable)e);
                }
            }
        }
        if (this.amRegistrationResponse != null) {
            return this.amRegistrationResponse;
        }
        this.amRegistrationResponse = this.homeRMRelayer.registerApplicationMaster(request);
        if (this.amRegistrationResponse.getContainersFromPreviousAttempts() != null) {
            this.cacheAllocatedContainers(this.amRegistrationResponse.getContainersFromPreviousAttempts(), this.homeSubClusterId);
        }
        ApplicationId appId = this.attemptId.getApplicationId();
        this.reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
        if (this.getNMStateStore() != null) {
            try {
                RegisterApplicationMasterResponsePBImpl pb = (RegisterApplicationMasterResponsePBImpl)this.amRegistrationResponse;
                this.getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
            }
            catch (Exception e) {
                LOG.error("Error storing AMRMProxy application context entry for " + this.attemptId, (Throwable)e);
            }
        }
        if ((queue = this.amRegistrationResponse.getQueue()) == null) {
            LOG.warn("Received null queue for application " + appId + " from home subcluster. Will use default queue name " + "default" + " for getting AMRMProxyPolicy");
        } else {
            LOG.info("Application " + appId + " belongs to queue " + queue);
        }
        try {
            this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy((String)queue, (FederationAMRMProxyPolicy)this.policyInterpreter, (Configuration)this.getConf(), (FederationStateStoreFacade)this.federationFacade, (SubClusterId)this.homeSubClusterId);
        }
        catch (FederationPolicyInitializationException e) {
            throw new YarnRuntimeException((Throwable)e);
        }
        return this.amRegistrationResponse;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException {
        Preconditions.checkArgument((this.policyInterpreter != null ? 1 : 0) != 0, (Object)"Allocate should be called after registerApplicationMaster");
        this.lastAMHeartbeatTime = this.clock.getTime();
        if (this.justRecovered) {
            throw new ApplicationMasterNotRegisteredException("AMRMProxy just restarted and recovered for " + this.attemptId + ". AM should re-register and full re-send pending requests.");
        }
        if (this.finishAMCalled) {
            LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat processing and return dummy response" + this.attemptId);
            return (AllocateResponse)RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
        }
        Object object = this.lastAllocateResponseLock;
        synchronized (object) {
            LOG.info("Heartbeat from " + this.attemptId + " with responseId " + request.getResponseId() + " when we are expecting " + this.lastAllocateResponse.getResponseId());
            if (AMRMClientUtils.getNextResponseId((int)request.getResponseId()) == this.lastAllocateResponse.getResponseId()) {
                return this.lastAllocateResponse;
            }
            if (request.getResponseId() != this.lastAllocateResponse.getResponseId()) {
                throw new InvalidApplicationMasterRequestException(AMRMClientUtils.assembleInvalidResponseIdExceptionMessage((ApplicationAttemptId)this.attemptId, (int)this.lastAllocateResponse.getResponseId(), (int)request.getResponseId()));
            }
        }
        try {
            Map<SubClusterId, AllocateRequest> requests = this.splitAllocateRequest(request);
            this.sendRequestsToResourceManagers(requests);
            long startTime = this.clock.getTime();
            Map<SubClusterId, List<AllocateResponse>> map = this.asyncResponseSink;
            synchronized (map) {
                try {
                    this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            long firstResponseTime = this.clock.getTime() - startTime;
            try {
                Thread.sleep(firstResponseTime);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            AllocateResponse response = this.generateBaseAllocationResponse();
            this.mergeAllocateResponses(response);
            if (!FederationInterceptor.isNullOrEmpty(this.uamRegistrations)) {
                HashMap<SubClusterId, RegisterApplicationMasterResponse> newRegistrations;
                Map<SubClusterId, RegisterApplicationMasterResponse> map2 = this.uamRegistrations;
                synchronized (map2) {
                    newRegistrations = new HashMap<SubClusterId, RegisterApplicationMasterResponse>(this.uamRegistrations);
                    this.uamRegistrations.clear();
                }
                this.mergeRegistrationResponses(response, newRegistrations);
            }
            Object object2 = this.lastAllocateResponseLock;
            synchronized (object2) {
                response.setResponseId(AMRMClientUtils.getNextResponseId((int)this.lastAllocateResponse.getResponseId()));
                this.lastAllocateResponse = response;
            }
            return response;
        }
        catch (Throwable ex) {
            LOG.error("Exception encountered while processing heart beat for " + this.attemptId, ex);
            throw new YarnException(ex);
        }
    }

    public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnException, IOException {
        this.finishAMCalled = true;
        boolean failedToUnRegister = false;
        ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc = null;
        Set subClusterIds = this.uamPool.getAllUAMIds();
        if (subClusterIds.size() > 0) {
            final FinishApplicationMasterRequest finishRequest = request;
            compSvc = new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(this.threadpool);
            LOG.info("Sending finish application request to {} sub-cluster RMs", (Object)subClusterIds.size());
            for (final String subClusterId : subClusterIds) {
                compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>(){

                    @Override
                    public FinishApplicationMasterResponseInfo call() throws Exception {
                        LOG.info("Sending finish application request to RM {}", (Object)subClusterId);
                        FinishApplicationMasterResponse uamResponse = null;
                        try {
                            uamResponse = FederationInterceptor.this.uamPool.finishApplicationMaster(subClusterId, finishRequest);
                            if (uamResponse.getIsUnregistered()) {
                                FederationInterceptor.this.secondaryRelayers.remove(subClusterId);
                                if (FederationInterceptor.this.getNMStateStore() != null) {
                                    FederationInterceptor.this.getNMStateStore().removeAMRMProxyAppContextEntry(FederationInterceptor.this.attemptId, FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + subClusterId);
                                }
                            }
                        }
                        catch (Throwable e) {
                            LOG.warn("Failed to finish unmanaged application master: RM address: " + subClusterId + " ApplicationId: " + FederationInterceptor.this.attemptId, e);
                        }
                        return new FinishApplicationMasterResponseInfo(uamResponse, subClusterId);
                    }
                });
            }
        }
        FinishApplicationMasterResponse homeResponse = this.homeRMRelayer.finishApplicationMaster(request);
        this.homeHeartbeartHandler.shutdown();
        if (subClusterIds.size() > 0) {
            LOG.info("Waiting for finish application response from {} sub-cluster RMs", (Object)subClusterIds.size());
            for (int i = 0; i < subClusterIds.size(); ++i) {
                try {
                    Future future = compSvc.take();
                    FinishApplicationMasterResponseInfo uamResponse = (FinishApplicationMasterResponseInfo)future.get();
                    LOG.debug("Received finish application response from RM: {}", (Object)uamResponse.getSubClusterId());
                    if (uamResponse.getResponse() != null && uamResponse.getResponse().getIsUnregistered()) continue;
                    failedToUnRegister = true;
                    continue;
                }
                catch (Throwable e) {
                    failedToUnRegister = true;
                    LOG.warn("Failed to finish unmanaged application master:  ApplicationId: " + this.attemptId, e);
                }
            }
        }
        if (failedToUnRegister) {
            homeResponse.setIsUnregistered(false);
        } else {
            this.uamPool.stop();
            if (this.registryClient != null) {
                this.registryClient.removeAppFromRegistry(this.attemptId.getApplicationId());
            }
        }
        return homeResponse;
    }

    @Override
    public void setNextInterceptor(RequestInterceptor next) {
        throw new YarnRuntimeException("setNextInterceptor is being called on FederationInterceptor. It should always be used as the last interceptor in the chain");
    }

    @Override
    public void shutdown() {
        LOG.info("Shutting down FederationInterceptor for {}", (Object)this.attemptId);
        try {
            this.uamPool.shutDownConnections();
        }
        catch (YarnException e) {
            LOG.error("Error shutting down all UAM clients without killing them", (Throwable)e);
        }
        if (this.threadpool != null) {
            try {
                this.threadpool.shutdown();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.threadpool = null;
        }
        this.homeHeartbeartHandler.shutdown();
        this.homeRMRelayer.shutdown();
        super.shutdown();
    }

    @VisibleForTesting
    protected void cleanupRegistry() {
        if (this.registryClient != null) {
            this.registryClient.cleanAllApplications();
        }
    }

    @VisibleForTesting
    protected FederationRegistryClient getRegistryClient() {
        return this.registryClient;
    }

    @VisibleForTesting
    protected ApplicationAttemptId getAttemptId() {
        return this.attemptId;
    }

    @VisibleForTesting
    protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() {
        return this.homeHeartbeartHandler;
    }

    @VisibleForTesting
    protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(ExecutorService threadPool) {
        return new UnmanagedAMPoolManager(threadPool);
    }

    @VisibleForTesting
    protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(Configuration conf, ApplicationId appId, AMRMClientRelayer rmProxyRelayer) {
        return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer);
    }

    protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext, Class<T> protocol, UserGroupInformation user) {
        try {
            return (T)FederationProxyProviderUtil.createRMProxy((Configuration)appContext.getConf(), protocol, (SubClusterId)this.homeSubClusterId, (UserGroupInformation)user, appContext.getAMRMToken());
        }
        catch (Exception ex) {
            throw new YarnRuntimeException((Throwable)ex);
        }
    }

    private void mergeRegisterResponse(RegisterApplicationMasterResponse homeResponse, RegisterApplicationMasterResponse otherResponse) {
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) {
                homeResponse.getContainersFromPreviousAttempts().addAll(otherResponse.getContainersFromPreviousAttempts());
            } else {
                homeResponse.setContainersFromPreviousAttempts(otherResponse.getContainersFromPreviousAttempts());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) {
                homeResponse.getNMTokensFromPreviousAttempts().addAll(otherResponse.getNMTokensFromPreviousAttempts());
            } else {
                homeResponse.setNMTokensFromPreviousAttempts(otherResponse.getNMTokensFromPreviousAttempts());
            }
        }
    }

    protected void reAttachUAMAndMergeRegisterResponse(RegisterApplicationMasterResponse homeResponse, final ApplicationId appId) {
        if (this.registryClient == null) {
            LOG.warn("registryClient is null, skip attaching existing UAM if any");
            return;
        }
        Map uamMap = this.registryClient.loadStateFromRegistry(appId);
        if (uamMap.size() == 0) {
            LOG.info("No existing UAM for application {} found in Yarn Registry", (Object)appId);
            return;
        }
        LOG.info("Found {} existing UAMs for application {} in Yarn Registry. Reattaching in parallel", (Object)uamMap.size(), (Object)appId);
        ExecutorCompletionService<RegisterApplicationMasterResponse> completionService = new ExecutorCompletionService<RegisterApplicationMasterResponse>(this.threadpool);
        for (Map.Entry entry : uamMap.entrySet()) {
            final SubClusterId subClusterId = SubClusterId.newInstance((String)((String)entry.getKey()));
            final Token amrmToken = (Token)entry.getValue();
            completionService.submit(new Callable<RegisterApplicationMasterResponse>(){

                @Override
                public RegisterApplicationMasterResponse call() throws Exception {
                    RegisterApplicationMasterResponse response = null;
                    try {
                        YarnConfiguration config = new YarnConfiguration(FederationInterceptor.this.getConf());
                        FederationProxyProviderUtil.updateConfForFederation((Configuration)config, (String)subClusterId.getId());
                        FederationInterceptor.this.uamPool.reAttachUAM(subClusterId.getId(), (Configuration)config, appId, FederationInterceptor.this.amRegistrationResponse.getQueue(), FederationInterceptor.this.getApplicationContext().getUser(), FederationInterceptor.this.homeSubClusterId.getId(), amrmToken, subClusterId.toString());
                        FederationInterceptor.this.secondaryRelayers.put(subClusterId.getId(), FederationInterceptor.this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
                        response = FederationInterceptor.this.uamPool.registerApplicationMaster(subClusterId.getId(), FederationInterceptor.this.amRegistrationRequest);
                        FederationInterceptor.this.lastSCResponseTime.put(subClusterId, FederationInterceptor.this.clock.getTime() - FederationInterceptor.this.subClusterTimeOut);
                        if (response != null && response.getContainersFromPreviousAttempts() != null) {
                            FederationInterceptor.this.cacheAllocatedContainers(response.getContainersFromPreviousAttempts(), subClusterId);
                        }
                        LOG.info("UAM {} reattached for {}", (Object)subClusterId, (Object)appId);
                    }
                    catch (Throwable e) {
                        LOG.error("Reattaching UAM " + subClusterId + " failed for " + appId, e);
                    }
                    return response;
                }
            });
        }
        for (int i = 0; i < uamMap.size(); ++i) {
            try {
                Future future = completionService.take();
                RegisterApplicationMasterResponse registerResponse = (RegisterApplicationMasterResponse)future.get();
                if (registerResponse == null) continue;
                LOG.info("Merging register response for {}", (Object)appId);
                this.mergeRegisterResponse(homeResponse, registerResponse);
                continue;
            }
            catch (Exception e) {
                LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, (Throwable)e);
            }
        }
    }

    private SubClusterId getSubClusterForNode(String nodeName) {
        SubClusterId subClusterId = null;
        try {
            subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
        }
        catch (YarnException e) {
            LOG.error("Failed to resolve sub-cluster for node " + nodeName + ", skipping this node", (Throwable)e);
            return null;
        }
        if (subClusterId == null) {
            LOG.error("Failed to resolve sub-cluster for node {}, skipping this node", (Object)nodeName);
            return null;
        }
        return subClusterId;
    }

    private Map<SubClusterId, AllocateRequest> splitAllocateRequest(AllocateRequest request) throws YarnException {
        AllocateRequest newRequest;
        HashMap<SubClusterId, AllocateRequest> requestMap = new HashMap<SubClusterId, AllocateRequest>();
        FederationInterceptor.findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request, requestMap);
        Set subClusterIds = this.uamPool.getAllUAMIds();
        for (Object subClusterId2 : subClusterIds) {
            FederationInterceptor.findOrCreateAllocateRequestForSubCluster(SubClusterId.newInstance((String)subClusterId2), request, requestMap);
        }
        if (!FederationInterceptor.isNullOrEmpty(request.getAskList())) {
            Map<SubClusterId, List<ResourceRequest>> asks = this.splitResourceRequests(request.getAskList());
            for (Map.Entry entry : asks.entrySet()) {
                newRequest = FederationInterceptor.findOrCreateAllocateRequestForSubCluster((SubClusterId)entry.getKey(), request, requestMap);
                newRequest.getAskList().addAll((Collection)entry.getValue());
            }
        }
        if (request.getResourceBlacklistRequest() != null) {
            if (!FederationInterceptor.isNullOrEmpty(request.getResourceBlacklistRequest().getBlacklistAdditions())) {
                for (String resourceName : request.getResourceBlacklistRequest().getBlacklistAdditions()) {
                    SubClusterId subClusterId = this.getSubClusterForNode(resourceName);
                    if (subClusterId == null) continue;
                    newRequest = FederationInterceptor.findOrCreateAllocateRequestForSubCluster(subClusterId, request, requestMap);
                    newRequest.getResourceBlacklistRequest().getBlacklistAdditions().add(resourceName);
                }
            }
            if (!FederationInterceptor.isNullOrEmpty(request.getResourceBlacklistRequest().getBlacklistRemovals())) {
                for (String resourceName : request.getResourceBlacklistRequest().getBlacklistRemovals()) {
                    SubClusterId subClusterId = this.getSubClusterForNode(resourceName);
                    if (subClusterId == null) continue;
                    newRequest = FederationInterceptor.findOrCreateAllocateRequestForSubCluster(subClusterId, request, requestMap);
                    newRequest.getResourceBlacklistRequest().getBlacklistRemovals().add(resourceName);
                }
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(request.getReleaseList())) {
            for (ContainerId cid : request.getReleaseList()) {
                if (!this.warnIfNotExists(cid, "release")) continue;
                SubClusterId subClusterId = this.containerIdToSubClusterIdMap.get(cid);
                newRequest = (AllocateRequest)requestMap.get(subClusterId);
                newRequest.getReleaseList().add(cid);
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(request.getUpdateRequests())) {
            for (UpdateContainerRequest ucr : request.getUpdateRequests()) {
                if (!this.warnIfNotExists(ucr.getContainerId(), "update")) continue;
                SubClusterId subClusterId = this.containerIdToSubClusterIdMap.get(ucr.getContainerId());
                newRequest = (AllocateRequest)requestMap.get(subClusterId);
                newRequest.getUpdateRequests().add(ucr);
            }
        }
        return requestMap;
    }

    private void sendRequestsToResourceManagers(Map<SubClusterId, AllocateRequest> requests) throws YarnException, IOException {
        List<SubClusterId> newSubClusters = this.registerAndAllocateWithNewSubClusters(requests);
        for (Map.Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
            SubClusterId subClusterId = entry.getKey();
            if (newSubClusters.contains(subClusterId)) continue;
            if (subClusterId.equals((Object)this.homeSubClusterId)) {
                this.homeHeartbeartHandler.allocateAsync(entry.getValue(), (AsyncCallback)new HeartbeatCallBack(this.homeSubClusterId, false));
                continue;
            }
            if (!this.uamPool.hasUAMId(subClusterId.getId())) {
                throw new YarnException("UAM not found for " + this.attemptId + " in sub-cluster " + subClusterId);
            }
            this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), (AsyncCallback)new HeartbeatCallBack(subClusterId, true));
        }
    }

    private List<SubClusterId> registerAndAllocateWithNewSubClusters(final Map<SubClusterId, AllocateRequest> requests) throws IOException {
        ArrayList<SubClusterId> newSubClusters = new ArrayList<SubClusterId>();
        for (SubClusterId subClusterId : requests.keySet()) {
            if (subClusterId.equals((Object)this.homeSubClusterId) || this.uamPool.hasUAMId(subClusterId.getId())) continue;
            newSubClusters.add(subClusterId);
            this.lastSCResponseTime.put(subClusterId, this.clock.getTime() - this.subClusterTimeOut);
        }
        this.uamRegisterFutures.clear();
        for (final SubClusterId scId : newSubClusters) {
            Future<?> future = this.threadpool.submit(new Runnable(){

                @Override
                public void run() {
                    String subClusterId = scId.getId();
                    YarnConfiguration config = new YarnConfiguration(FederationInterceptor.this.getConf());
                    FederationProxyProviderUtil.updateConfForFederation((Configuration)config, (String)subClusterId);
                    RegisterApplicationMasterResponse uamResponse = null;
                    Token token = null;
                    try {
                        token = FederationInterceptor.this.uamPool.launchUAM(subClusterId, (Configuration)config, FederationInterceptor.this.attemptId.getApplicationId(), FederationInterceptor.this.amRegistrationResponse.getQueue(), FederationInterceptor.this.getApplicationContext().getUser(), FederationInterceptor.this.homeSubClusterId.toString(), true, subClusterId);
                        FederationInterceptor.this.secondaryRelayers.put(subClusterId, FederationInterceptor.this.uamPool.getAMRMClientRelayer(subClusterId));
                        uamResponse = FederationInterceptor.this.uamPool.registerApplicationMaster(subClusterId, FederationInterceptor.this.amRegistrationRequest);
                    }
                    catch (Throwable e) {
                        LOG.error("Failed to register application master: " + subClusterId + " Application: " + FederationInterceptor.this.attemptId, e);
                        return;
                    }
                    FederationInterceptor.this.uamRegistrations.put(scId, uamResponse);
                    LOG.info("Successfully registered unmanaged application master: " + subClusterId + " ApplicationId: " + FederationInterceptor.this.attemptId);
                    try {
                        FederationInterceptor.this.uamPool.allocateAsync(subClusterId, (AllocateRequest)requests.get(scId), (AsyncCallback)new HeartbeatCallBack(scId, true));
                    }
                    catch (Throwable e) {
                        LOG.error("Failed to allocate async to " + subClusterId + " Application: " + FederationInterceptor.this.attemptId, e);
                    }
                    try {
                        if (FederationInterceptor.this.registryClient != null) {
                            FederationInterceptor.this.registryClient.writeAMRMTokenForUAM(FederationInterceptor.this.attemptId.getApplicationId(), subClusterId, token);
                        } else if (FederationInterceptor.this.getNMStateStore() != null) {
                            FederationInterceptor.this.getNMStateStore().storeAMRMProxyAppContextEntry(FederationInterceptor.this.attemptId, FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + subClusterId, token.encodeToUrlString().getBytes(FederationInterceptor.STRING_TO_BYTE_FORMAT));
                        }
                    }
                    catch (Throwable e) {
                        LOG.error("Failed to persist UAM token from " + subClusterId + " Application: " + FederationInterceptor.this.attemptId, e);
                    }
                }
            });
            this.uamRegisterFutures.put(scId, future);
        }
        return newSubClusters;
    }

    protected AllocateResponse generateBaseAllocationResponse() {
        AllocateResponse baseResponse = (AllocateResponse)RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
        baseResponse.setAvailableResources(Resource.newInstance((int)0, (int)0));
        baseResponse.setNumClusterNodes(0);
        Set<SubClusterId> expiredSC = this.getTimedOutSCs(false);
        for (Map.Entry<SubClusterId, AllocateResponse> entry : this.lastSCResponse.entrySet()) {
            if (expiredSC.contains(entry.getKey())) continue;
            AllocateResponse response = entry.getValue();
            if (response.getAvailableResources() != null) {
                baseResponse.setAvailableResources(Resources.add((Resource)baseResponse.getAvailableResources(), (Resource)response.getAvailableResources()));
            }
            baseResponse.setNumClusterNodes(baseResponse.getNumClusterNodes() + response.getNumClusterNodes());
        }
        return baseResponse;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mergeAllocateResponses(AllocateResponse mergedResponse) {
        Map<SubClusterId, List<AllocateResponse>> map = this.asyncResponseSink;
        synchronized (map) {
            for (Map.Entry<SubClusterId, List<AllocateResponse>> entry : this.asyncResponseSink.entrySet()) {
                SubClusterId subClusterId = entry.getKey();
                List<AllocateResponse> responses = entry.getValue();
                if (responses.size() <= 0) continue;
                for (AllocateResponse response : responses) {
                    this.removeFinishedContainersFromCache(response.getCompletedContainersStatuses());
                    this.cacheAllocatedContainers(response.getAllocatedContainers(), subClusterId);
                    this.mergeAllocateResponse(mergedResponse, response, subClusterId);
                }
                responses.clear();
            }
        }
    }

    private void removeFinishedContainersFromCache(List<ContainerStatus> finishedContainers) {
        for (ContainerStatus container : finishedContainers) {
            LOG.debug("Completed container {}", (Object)container);
            if (!this.containerIdToSubClusterIdMap.containsKey(container.getContainerId())) continue;
            this.containerIdToSubClusterIdMap.remove(container.getContainerId());
        }
    }

    private void mergeRegistrationResponses(AllocateResponse homeResponse, Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
        for (Map.Entry<SubClusterId, RegisterApplicationMasterResponse> entry : registrations.entrySet()) {
            RegisterApplicationMasterResponse registration = entry.getValue();
            if (!FederationInterceptor.isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
                List tempContainers = homeResponse.getAllocatedContainers();
                if (!FederationInterceptor.isNullOrEmpty(tempContainers)) {
                    tempContainers.addAll(registration.getContainersFromPreviousAttempts());
                    homeResponse.setAllocatedContainers(tempContainers);
                } else {
                    homeResponse.setAllocatedContainers(registration.getContainersFromPreviousAttempts());
                }
                this.cacheAllocatedContainers(registration.getContainersFromPreviousAttempts(), entry.getKey());
            }
            if (FederationInterceptor.isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) continue;
            List tempTokens = homeResponse.getNMTokens();
            if (!FederationInterceptor.isNullOrEmpty(tempTokens)) {
                tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
                homeResponse.setNMTokens(tempTokens);
                continue;
            }
            homeResponse.setNMTokens(registration.getNMTokensFromPreviousAttempts());
        }
    }

    @VisibleForTesting
    protected void mergeAllocateResponse(AllocateResponse homeResponse, AllocateResponse otherResponse, SubClusterId otherRMAddress) {
        if (otherResponse.getAMRMToken() != null) {
            if (otherRMAddress.equals((Object)this.homeSubClusterId)) {
                homeResponse.setAMRMToken(otherResponse.getAMRMToken());
            } else {
                throw new YarnRuntimeException("amrmToken from UAM " + otherRMAddress + " should be null here");
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getAllocatedContainers())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getAllocatedContainers())) {
                homeResponse.getAllocatedContainers().addAll(otherResponse.getAllocatedContainers());
            } else {
                homeResponse.setAllocatedContainers(otherResponse.getAllocatedContainers());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
                homeResponse.getCompletedContainersStatuses().addAll(otherResponse.getCompletedContainersStatuses());
            } else {
                homeResponse.setCompletedContainersStatuses(otherResponse.getCompletedContainersStatuses());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getUpdatedNodes())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getUpdatedNodes())) {
                homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
            } else {
                homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
            }
        }
        if (otherResponse.getApplicationPriority() != null) {
            homeResponse.setApplicationPriority(otherResponse.getApplicationPriority());
        }
        homeResponse.setNumClusterNodes(homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
        PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
        PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
        if (homePreempMessage == null && otherPreempMessage != null) {
            homeResponse.setPreemptionMessage(otherPreempMessage);
        }
        if (homePreempMessage != null && otherPreempMessage != null) {
            PreemptionContract par1 = homePreempMessage.getContract();
            PreemptionContract par2 = otherPreempMessage.getContract();
            if (par1 == null && par2 != null) {
                homePreempMessage.setContract(par2);
            }
            if (par1 != null && par2 != null) {
                par1.getResourceRequest().addAll(par2.getResourceRequest());
                par1.getContainers().addAll(par2.getContainers());
            }
            StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
            StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();
            if (spar1 == null && spar2 != null) {
                homePreempMessage.setStrictContract(spar2);
            }
            if (spar1 != null && spar2 != null) {
                spar1.getContainers().addAll(spar2.getContainers());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getNMTokens())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getNMTokens())) {
                homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
            } else {
                homeResponse.setNMTokens(otherResponse.getNMTokens());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getUpdatedContainers())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getUpdatedContainers())) {
                homeResponse.getUpdatedContainers().addAll(otherResponse.getUpdatedContainers());
            } else {
                homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getUpdateErrors())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getUpdateErrors())) {
                homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
            } else {
                homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
            }
        }
    }

    private void cacheAllocatedContainers(List<Container> containers, SubClusterId subClusterId) {
        for (Container container : containers) {
            LOG.debug("Adding container {}", (Object)container);
            if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
                SubClusterId existingSubClusterId = this.containerIdToSubClusterIdMap.get(container.getId());
                if (existingSubClusterId.equals((Object)subClusterId)) {
                    LOG.warn("Duplicate containerID: {} found in the allocated containers from same sub-cluster: {}, so ignoring.", (Object)container.getId(), (Object)subClusterId);
                } else {
                    throw new YarnRuntimeException("Duplicate containerID found in the allocated containers. This can happen if the RM epoch is not configured properly. ContainerId: " + container.getId().toString() + " ApplicationId: " + this.attemptId + " From RM: " + subClusterId + " . Previous container was from sub-cluster: " + existingSubClusterId);
                }
            }
            this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
        }
    }

    private static AllocateRequest findOrCreateAllocateRequestForSubCluster(SubClusterId subClusterId, AllocateRequest originalAMRequest, Map<SubClusterId, AllocateRequest> requestMap) {
        AllocateRequest newRequest = null;
        if (requestMap.containsKey(subClusterId)) {
            newRequest = requestMap.get(subClusterId);
        } else {
            newRequest = FederationInterceptor.createAllocateRequest();
            newRequest.setResponseId(originalAMRequest.getResponseId());
            newRequest.setProgress(originalAMRequest.getProgress());
            requestMap.put(subClusterId, newRequest);
        }
        return newRequest;
    }

    private static AllocateRequest createAllocateRequest() {
        AllocateRequest request = (AllocateRequest)RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
        request.setAskList(new ArrayList());
        request.setReleaseList(new ArrayList());
        ResourceBlacklistRequest blackList = ResourceBlacklistRequest.newInstance(null, null);
        blackList.setBlacklistAdditions(new ArrayList());
        blackList.setBlacklistRemovals(new ArrayList());
        request.setResourceBlacklistRequest(blackList);
        request.setUpdateRequests(new ArrayList());
        return request;
    }

    protected Set<SubClusterId> getTimedOutSCs(boolean verbose) {
        HashSet<SubClusterId> timedOutSCs = new HashSet<SubClusterId>();
        for (Map.Entry<SubClusterId, Long> entry : this.lastSCResponseTime.entrySet()) {
            long duration;
            if (entry.getValue() > this.lastAMHeartbeatTime || (duration = this.clock.getTime() - entry.getValue()) <= this.subClusterTimeOut) continue;
            if (verbose) {
                LOG.warn("Subcluster {} doesn't have a successful heartbeat for {} seconds for {}", new Object[]{entry.getKey(), (double)duration / 1000.0, this.attemptId});
            }
            timedOutSCs.add(entry.getKey());
        }
        return timedOutSCs;
    }

    private boolean warnIfNotExists(ContainerId containerId, String actionName) {
        if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
            LOG.error("AM is trying to {} a container {} that does not exist. Might happen shortly after NM restart when NM recovery is enabled", (Object)actionName, (Object)containerId.toString());
            return false;
        }
        return true;
    }

    protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(List<ResourceRequest> askList) throws YarnException {
        return this.policyInterpreter.splitResourceRequests(askList, this.getTimedOutSCs(true));
    }

    @VisibleForTesting
    protected int getUnmanagedAMPoolSize() {
        return this.uamPool.getAllUAMIds().size();
    }

    @VisibleForTesting
    protected UnmanagedAMPoolManager getUnmanagedAMPool() {
        return this.uamPool;
    }

    @VisibleForTesting
    protected Map<SubClusterId, Future<?>> getUamRegisterFutures() {
        return this.uamRegisterFutures;
    }

    @VisibleForTesting
    public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
        return this.asyncResponseSink;
    }

    public static <T> boolean isNullOrEmpty(Collection<T> c) {
        return c == null || c.size() == 0;
    }

    public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
        return c == null || c.size() == 0;
    }

    private static class FinishApplicationMasterResponseInfo {
        private FinishApplicationMasterResponse response;
        private String subClusterId;

        FinishApplicationMasterResponseInfo(FinishApplicationMasterResponse response, String subClusterId) {
            this.response = response;
            this.subClusterId = subClusterId;
        }

        public FinishApplicationMasterResponse getResponse() {
            return this.response;
        }

        public String getSubClusterId() {
            return this.subClusterId;
        }
    }

    private class HeartbeatCallBack
    implements AsyncCallback<AllocateResponse> {
        private SubClusterId subClusterId;
        private boolean isUAM;

        HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
            this.subClusterId = subClusterId;
            this.isUAM = isUAM;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void callback(AllocateResponse response) {
            Map map = FederationInterceptor.this.asyncResponseSink;
            synchronized (map) {
                List<AllocateResponse> responses = null;
                if (FederationInterceptor.this.asyncResponseSink.containsKey(this.subClusterId)) {
                    responses = (List)FederationInterceptor.this.asyncResponseSink.get(this.subClusterId);
                } else {
                    responses = new ArrayList();
                    FederationInterceptor.this.asyncResponseSink.put(this.subClusterId, responses);
                }
                responses.add(response);
                FederationInterceptor.this.asyncResponseSink.notifyAll();
            }
            FederationInterceptor.this.lastSCResponse.put(this.subClusterId, response);
            FederationInterceptor.this.lastSCResponseTime.put(this.subClusterId, FederationInterceptor.this.clock.getTime());
            try {
                FederationInterceptor.this.policyInterpreter.notifyOfResponse(this.subClusterId, response);
            }
            catch (YarnException e) {
                LOG.warn("notifyOfResponse for policy failed for sub-cluster " + this.subClusterId, (Throwable)e);
            }
            if (this.isUAM && response.getAMRMToken() != null) {
                Token newToken = ConverterUtils.convertFromYarn((org.apache.hadoop.yarn.api.records.Token)response.getAMRMToken(), (Text)null);
                response.setAMRMToken(null);
                if (FederationInterceptor.this.registryClient != null) {
                    if (FederationInterceptor.this.registryClient.writeAMRMTokenForUAM(FederationInterceptor.this.attemptId.getApplicationId(), this.subClusterId.getId(), newToken)) {
                        try {
                            AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
                            identifier.readFields((DataInput)new DataInputStream(new ByteArrayInputStream(newToken.getIdentifier())));
                            LOG.info("Received new UAM amrmToken with keyId {} and service {} from {} for {}, written to Registry", new Object[]{identifier.getKeyId(), newToken.getService(), this.subClusterId, FederationInterceptor.this.attemptId});
                        }
                        catch (IOException identifier) {}
                    }
                } else if (FederationInterceptor.this.getNMStateStore() != null) {
                    try {
                        FederationInterceptor.this.getNMStateStore().storeAMRMProxyAppContextEntry(FederationInterceptor.this.attemptId, FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + this.subClusterId.getId(), newToken.encodeToUrlString().getBytes(FederationInterceptor.STRING_TO_BYTE_FORMAT));
                    }
                    catch (IOException e) {
                        LOG.error("Error storing UAM token as AMRMProxy context entry in NMSS for " + FederationInterceptor.this.attemptId, (Throwable)e);
                    }
                }
            }
        }
    }
}

