package com.taobao.arthas.core.distribution.impl;

import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.command.model.InputStatusModel;
import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.distribution.DistributorOptions;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.util.Constants;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl.class */
public class SharingResultDistributorImpl implements SharingResultDistributor {
    private static final Logger logger = LoggerFactory.getLogger(SharingResultDistributorImpl.class);
    private final Session session;
    private List<ResultConsumer> consumers = new CopyOnWriteArrayList();
    private BlockingQueue<ResultModel> pendingResultQueue = new ArrayBlockingQueue(10);
    private AtomicInteger consumerNumGenerator = new AtomicInteger(0);
    private SharingResultConsumerImpl sharingResultConsumer = new SharingResultConsumerImpl();
    private volatile boolean running = true;
    private Thread distributorThread = new Thread(new DistributorTask(), "ResultDistributor");

    /* loaded from: input_file:com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl$DistributorTask.class */
    private class DistributorTask implements Runnable {
        private DistributorTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            SharingResultDistributorImpl.this.distribute();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl$SharingResultConsumerImpl.class */
    public static class SharingResultConsumerImpl implements ResultConsumer {
        private BlockingQueue<ResultModel> resultQueue;
        private ReentrantLock queueLock;
        private InputStatusModel lastInputStatus;

        private SharingResultConsumerImpl() {
            this.resultQueue = new ArrayBlockingQueue(DistributorOptions.resultQueueSize);
            this.queueLock = new ReentrantLock();
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public boolean appendResult(ResultModel resultModel) {
            this.queueLock.lock();
            try {
                if (resultModel instanceof InputStatusModel) {
                    this.lastInputStatus = (InputStatusModel) resultModel;
                    return true;
                }
                while (!this.resultQueue.offer(resultModel)) {
                    this.resultQueue.poll();
                }
                if (!this.queueLock.isHeldByCurrentThread()) {
                    return true;
                }
                this.queueLock.unlock();
                return true;
            } finally {
                if (this.queueLock.isHeldByCurrentThread()) {
                    this.queueLock.unlock();
                }
            }
        }

        public void copyTo(ResultConsumer resultConsumer) {
            this.queueLock.lock();
            try {
                Iterator it = this.resultQueue.iterator();
                while (it.hasNext()) {
                    resultConsumer.appendResult((ResultModel) it.next());
                }
                if (this.lastInputStatus != null) {
                    resultConsumer.appendResult(this.lastInputStatus);
                }
            } finally {
                if (this.queueLock.isHeldByCurrentThread()) {
                    this.queueLock.unlock();
                }
            }
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public List<ResultModel> pollResults() {
            return null;
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public long getLastAccessTime() {
            return 0L;
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public void close() {
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public boolean isClosed() {
            return false;
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public boolean isPolling() {
            return false;
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public String getConsumerId() {
            return "shared-consumer";
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public void setConsumerId(String str) {
        }

        @Override // com.taobao.arthas.core.distribution.ResultConsumer
        public boolean isHealthy() {
            return true;
        }
    }

    public SharingResultDistributorImpl(Session session) {
        this.session = session;
        this.distributorThread.start();
    }

    @Override // com.taobao.arthas.core.distribution.ResultDistributor
    public void appendResult(ResultModel resultModel) {
        try {
            if (!this.pendingResultQueue.offer(resultModel, 100L, TimeUnit.MILLISECONDS)) {
                this.pendingResultQueue.poll();
                interruptJob("result queue is full: " + this.pendingResultQueue.size());
            }
        } catch (InterruptedException e) {
        }
    }

    private void interruptJob(String str) {
        Job foregroundJob = this.session.getForegroundJob();
        if (foregroundJob != null) {
            logger.warn(str + ", current job was interrupted.", Integer.valueOf(foregroundJob.id()));
            foregroundJob.interrupt();
            this.pendingResultQueue.offer(new MessageModel(str + ", current job was interrupted."));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void distribute() {
        while (this.running) {
            try {
                ResultModel poll = this.pendingResultQueue.poll(100L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    this.sharingResultConsumer.appendResult(poll);
                    int i = 0;
                    for (int i2 = 0; i2 < this.consumers.size(); i2++) {
                        ResultConsumer resultConsumer = this.consumers.get(i2);
                        if (resultConsumer.isHealthy()) {
                            i++;
                        }
                        resultConsumer.appendResult(poll);
                    }
                    if (i == 0) {
                        interruptJob("all consumers are unhealthy");
                    }
                }
            } catch (Throwable th) {
                logger.warn("distribute result failed: " + th.getMessage(), th);
            }
        }
    }

    @Override // com.taobao.arthas.core.distribution.ResultDistributor
    public void close() {
        this.running = false;
    }

    @Override // com.taobao.arthas.core.distribution.SharingResultDistributor
    public void addConsumer(ResultConsumer resultConsumer) {
        resultConsumer.setConsumerId(UUID.randomUUID().toString().replaceAll("-", Constants.EMPTY_STRING) + "_" + this.consumerNumGenerator.incrementAndGet());
        this.sharingResultConsumer.copyTo(resultConsumer);
        this.consumers.add(resultConsumer);
    }

    @Override // com.taobao.arthas.core.distribution.SharingResultDistributor
    public void removeConsumer(ResultConsumer resultConsumer) {
        this.consumers.remove(resultConsumer);
        resultConsumer.close();
    }

    @Override // com.taobao.arthas.core.distribution.SharingResultDistributor
    public List<ResultConsumer> getConsumers() {
        return this.consumers;
    }

    @Override // com.taobao.arthas.core.distribution.SharingResultDistributor
    public ResultConsumer getConsumer(String str) {
        for (int i = 0; i < this.consumers.size(); i++) {
            ResultConsumer resultConsumer = this.consumers.get(i);
            if (resultConsumer.getConsumerId().equals(str)) {
                return resultConsumer;
            }
        }
        return null;
    }
}
