/*
 * Decompiled with CFR 0.152.
 */
package com.jd.easyflow.flow.engine.impl;

import com.jd.easyflow.flow.engine.FlowContext;
import com.jd.easyflow.flow.engine.impl.BaseFlowRunner;
import com.jd.easyflow.flow.engine.impl.FlowContextImpl;
import com.jd.easyflow.flow.exception.FlowException;
import com.jd.easyflow.flow.model.NodeContext;
import com.jd.easyflow.flow.util.ExceptionUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleThreadFlowRunner
extends BaseFlowRunner {
    private static final Logger logger = LoggerFactory.getLogger(MultipleThreadFlowRunner.class);
    protected static long startId = System.currentTimeMillis();
    protected Executor executor;
    protected long timeout = 0L;
    protected boolean throwExceptionOnTimeout = false;

    public MultipleThreadFlowRunner() {
    }

    public MultipleThreadFlowRunner(Executor executor, long timeout) {
        this.executor = executor;
        this.timeout = timeout;
    }

    public MultipleThreadFlowRunner(Executor executor, long timeout, boolean throwExceptionOnTimeout) {
        this.executor = executor;
        this.timeout = timeout;
        this.throwExceptionOnTimeout = throwExceptionOnTimeout;
    }

    @Override
    public void runNodes(FlowContextImpl context) {
        String runId = startId++ + "";
        if (context.isLogOn() && logger.isInfoEnabled()) {
            logger.info("Start running flow node, runId:" + runId);
        }
        Lock lock = this.createLock(context);
        AtomicInteger counter = new AtomicInteger();
        this.scheduleNodes(context, counter, lock, runId);
        try {
            if (this.timeout == 0L) {
                lock.lockInterruptibly();
            } else {
                boolean result = lock.tryLock(this.timeout, TimeUnit.MILLISECONDS);
                context.put("_flow.multi.await.result", result);
                if (!result) {
                    context.setInterrupted();
                    if (this.throwExceptionOnTimeout) {
                        throw new FlowException("flow execution timeout, runId:" + runId + ", flowId:" + context.getFlowId());
                    }
                }
            }
        }
        catch (InterruptedException e) {
            throw new FlowException(e);
        }
        List exceptions = (List)context.get("_flow.multi.exceptions");
        if (exceptions != null && exceptions.size() > 0) {
            if (context.isLogOn()) {
                logger.error("Flow execute exception");
            }
            Throwable t = (Throwable)((NodeContext)exceptions.get(0)).get("_flow.multi.exception");
            throw ExceptionUtil.throwException(t);
        }
    }

    protected Lock createLock(FlowContextImpl context) {
        return new CountDownLatchLock();
    }

    protected void scheduleNodes(FlowContextImpl context, AtomicInteger counter, Lock lock, String runId) {
        this.addTaskIfExists(context, counter, lock, runId);
    }

    private void addTaskIfExists(FlowContextImpl context, AtomicInteger counter, Lock lock, String runId) {
        NodeContext currentNode;
        while ((currentNode = context.getNextNode()) != null) {
            NodeContext finalCurrentNode = currentNode;
            counter.addAndGet(1);
            this.executor.execute(() -> {
                block9: {
                    try {
                        NodeContext[] nextNodes;
                        if (context.isLogOn() && logger.isInfoEnabled()) {
                            logger.info("Start execute flow node:" + finalCurrentNode.getNodeId() + ", runId:" + runId);
                        }
                        if ((nextNodes = this.runOneNode(finalCurrentNode, context)) != null) {
                            context.addNodes(nextNodes);
                        }
                        if (context.isInterrupted()) {
                            if (context.isLogOn() && logger.isInfoEnabled()) {
                                logger.info("Flow state is interrupted");
                            }
                            lock.unlock();
                            return;
                        }
                        this.addTaskIfExists(context, counter, lock, runId);
                        int count = counter.addAndGet(-1);
                        if (count == 0) {
                            lock.unlock();
                        }
                    }
                    catch (Throwable t) {
                        this.addException(context, finalCurrentNode, t);
                        if (context.isInterrupted()) {
                            if (context.isLogOn() && logger.isInfoEnabled()) {
                                logger.info("Flow state is interrupted");
                            }
                            lock.unlock();
                            return;
                        }
                        int count = counter.addAndGet(-1);
                        if (count != 0) break block9;
                        lock.unlock();
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addException(FlowContext context, NodeContext nodeContext, Throwable t) {
        FlowContext flowContext = context;
        synchronized (flowContext) {
            ArrayList<NodeContext> exceptionNodes = (ArrayList<NodeContext>)context.get("_flow.multi.exceptions");
            if (exceptionNodes == null) {
                exceptionNodes = new ArrayList<NodeContext>();
                context.put("_flow.multi.exceptions", exceptionNodes);
            }
            nodeContext.put("_flow.multi.exception", t);
            exceptionNodes.add(nodeContext);
        }
    }

    protected void printStackTrace() {
        StackTraceElement[] stack = Thread.currentThread().getStackTrace();
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < stack.length; ++i) {
            builder.append(stack[i].toString() + "\n");
        }
        logger.error("Flow execute exception,\n " + builder);
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public boolean isThrowExceptionOnTimeout() {
        return this.throwExceptionOnTimeout;
    }

    public void setThrowExceptionOnTimeout(boolean throwExceptionOnTimeout) {
        this.throwExceptionOnTimeout = throwExceptionOnTimeout;
    }

    private static class CountDownLatchLock
    implements Lock {
        private CountDownLatch latch = new CountDownLatch(1);

        private CountDownLatchLock() {
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {
            this.latch.await();
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return this.latch.await(time, unit);
        }

        @Override
        public void unlock() {
            this.latch.countDown();
        }

        @Override
        public void lock() {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean tryLock() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }
}

