/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.dsl;

import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

class PublisherIntegrationFlow<T>
extends StandardIntegrationFlow
implements Publisher<Message<T>> {
    private static final Subscription NO_OP_SUBSCRIPTION = new Subscription(){

        public void request(long n) {
        }

        public void cancel() {
        }
    };
    private final Queue<Subscriber<? super Message<T>>> subscribers = new LinkedBlockingQueue<Subscriber<? super Message<T>>>();
    private final MessageChannel messageChannel;
    private final Executor executor;

    PublisherIntegrationFlow(Set<Object> integrationComponents, MessageChannel messageChannel, Executor executor) {
        super(integrationComponents);
        this.messageChannel = messageChannel;
        this.executor = executor;
        this.start();
    }

    public void subscribe(Subscriber<? super Message<T>> subscriber) {
        if (!this.isRunning()) {
            subscriber.onSubscribe(NO_OP_SUBSCRIPTION);
            subscriber.onError((Throwable)new IllegalStateException("The Publisher must be started ('Lifecycle.start()') before accepting subscription."));
            return;
        }
        this.subscribers.add(subscriber);
        if (this.messageChannel instanceof SubscribableChannel) {
            subscriber.onSubscribe((Subscription)new MessageHandlerSubscription(subscriber));
        } else if (this.messageChannel instanceof PollableChannel) {
            subscriber.onSubscribe((Subscription)new PollableSubscription(subscriber));
        } else {
            subscriber.onSubscribe(NO_OP_SUBSCRIPTION);
            subscriber.onError((Throwable)new IllegalStateException("Unsupported MessageChannel type [" + this.messageChannel + "]. Must be 'SubscribableChannel' or 'PollableChannel'."));
        }
    }

    @Override
    public void stop() {
        super.stop();
        this.shutdown();
    }

    public void shutdown() {
        Subscriber<? super Message<T>> subscriber;
        while ((subscriber = this.subscribers.poll()) != null) {
            subscriber.onComplete();
        }
    }

    private final class PollableSubscription
    extends SubscriberSubscription {
        private PollableSubscription(Subscriber<Message<?>> subscriber) {
            super(subscriber);
        }

        @Override
        public void onRequest(final long n) {
            PublisherIntegrationFlow.this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    if (n == Long.MAX_VALUE) {
                        while (!PollableSubscription.this.terminated && PublisherIntegrationFlow.this.isRunning()) {
                            Message receive = ((PollableChannel)PublisherIntegrationFlow.this.messageChannel).receive(50L);
                            if (receive == null) continue;
                            PollableSubscription.this.subscriber.onNext((Object)receive);
                        }
                    } else {
                        long i = 0L;
                        while (!PollableSubscription.this.terminated && PublisherIntegrationFlow.this.isRunning() && i < n) {
                            Message receive = ((PollableChannel)PublisherIntegrationFlow.this.messageChannel).receive(50L);
                            if (receive == null) continue;
                            PollableSubscription.this.subscriber.onNext((Object)receive);
                            ++i;
                        }
                    }
                }
            });
        }
    }

    private final class MessageHandlerSubscription
    extends SubscriberSubscription
    implements MessageHandler {
        private final Queue<Long> pendingRequests;
        private final AtomicReference<Long> currentRequest;
        private final AtomicLong count;
        private volatile boolean unbounded;

        private MessageHandlerSubscription(Subscriber<Message<?>> subscriber) {
            super(subscriber);
            this.pendingRequests = new LinkedBlockingQueue<Long>();
            this.currentRequest = new AtomicReference();
            this.count = new AtomicLong();
        }

        @Override
        public void onRequest(long n) {
            if (n == Long.MAX_VALUE) {
                this.unbounded = true;
                this.pendingRequests.clear();
                this.currentRequest.set(null);
                this.count.set(0L);
            } else if (!this.unbounded) {
                if (this.currentRequest.get() != null) {
                    this.pendingRequests.offer(n);
                } else {
                    this.currentRequest.set(n);
                    this.count.set(0L);
                }
            }
            ((SubscribableChannel)PublisherIntegrationFlow.this.messageChannel).subscribe((MessageHandler)this);
        }

        public void handleMessage(Message<?> message) throws MessagingException {
            if (this.terminated || !PublisherIntegrationFlow.this.isRunning()) {
                ((SubscribableChannel)PublisherIntegrationFlow.this.messageChannel).unsubscribe((MessageHandler)this);
                throw new MessageDeliveryException(message);
            }
            if (this.unbounded) {
                this.subscriber.onNext(message);
            } else {
                if (this.currentRequest.get() == null || this.count.getAndIncrement() == this.currentRequest.get().longValue()) {
                    this.currentRequest.set(this.pendingRequests.poll());
                    this.count.set(0L);
                    if (this.currentRequest.get() == null) {
                        ((SubscribableChannel)PublisherIntegrationFlow.this.messageChannel).unsubscribe((MessageHandler)this);
                        throw new MessageDeliveryException(message);
                    }
                }
                this.subscriber.onNext(message);
            }
        }

        @Override
        public void cancel() {
            ((SubscribableChannel)PublisherIntegrationFlow.this.messageChannel).unsubscribe((MessageHandler)this);
            super.cancel();
        }
    }

    private abstract class SubscriberSubscription
    implements Subscription {
        final Subscriber<Message<?>> subscriber;
        volatile boolean terminated;

        SubscriberSubscription(Subscriber<Message<?>> subscriber) {
            this.subscriber = subscriber;
        }

        public void request(long n) {
            if (n <= 0L) {
                this.subscriber.onError((Throwable)new IllegalArgumentException("Spec. Rule 3.9 - Cannot request a non strictly positive number: " + n));
            } else if (!this.terminated && PublisherIntegrationFlow.this.isRunning()) {
                this.onRequest(n);
            }
        }

        public void cancel() {
            PublisherIntegrationFlow.this.subscribers.remove(this.subscriber);
            this.terminated = true;
        }

        protected abstract void onRequest(long var1);
    }
}

