/*
 * Decompiled with CFR 0.152.
 */
package darabonba.core.sse;

import com.aliyun.core.http.HttpHeaders;
import com.aliyun.core.http.HttpResponseHandler;
import darabonba.core.ResponseBytes;
import darabonba.core.async.AsyncResponseHandler;
import darabonba.core.async.ByteArrayAsyncResponseHandler;
import darabonba.core.sse.Event;
import darabonba.core.sse.SSEResponseIterator;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class SSEHttpResponseHandler
implements HttpResponseHandler {
    private volatile AsyncResponseHandler<?, ?> handler = new ByteArrayAsyncResponseHandler();
    private static final byte[][] SEPARATORS = new byte[][]{"\r\n\r\n".getBytes(StandardCharsets.UTF_8), "\r\r".getBytes(StandardCharsets.UTF_8), "\n\n".getBytes(StandardCharsets.UTF_8)};
    private final SSEResponseIterator<?> iterator;
    private volatile AsyncResponseHandler<String, ResponseBytes<String>> errorAsyncResponseHandler;

    public SSEHttpResponseHandler(SSEResponseIterator<?> iterator) {
        this.iterator = iterator;
    }

    public void onStream(Publisher<ByteBuffer> publisher, int httpStatusCode, HttpHeaders headers) {
        this.iterator.setStatusCode(httpStatusCode);
        this.iterator.setHeaders(headers.toMap());
        if (httpStatusCode / 100 != 2) {
            this.errorAsyncResponseHandler = AsyncResponseHandler.toBytes();
            this.errorAsyncResponseHandler.onStream(publisher);
        } else {
            SSEProcessor proc = new SSEProcessor(this.iterator);
            this.handler.onStream((Publisher<ByteBuffer>)proc);
            publisher.subscribe((Subscriber)proc);
        }
    }

    public void onError(Throwable throwable) {
        this.iterator.endOfFailure(throwable);
        this.handler.onError(throwable);
    }

    public byte[] getErrorBodyByteArrayUnsafe() {
        if (this.errorAsyncResponseHandler == null) {
            return null;
        }
        ResponseBytes<String> result = this.errorAsyncResponseHandler.transform("");
        return result.asByteArrayUnsafe();
    }

    public byte[] getErrorBodyByteArray() {
        if (this.errorAsyncResponseHandler == null) {
            return null;
        }
        ResponseBytes<String> result = this.errorAsyncResponseHandler.transform("");
        return result.asByteArray();
    }

    static class SSEProcessor
    implements Processor<ByteBuffer, ByteBuffer> {
        protected volatile Subscriber<? super ByteBuffer> subscriber;
        private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        private final SSEResponseIterator<?> iterator;
        private final Executor processingExecutor;

        SSEProcessor(SSEResponseIterator<?> iterator) {
            this.iterator = iterator;
            this.processingExecutor = Executors.newSingleThreadExecutor(r -> {
                Thread t = new Thread(r, "SSE-Processor");
                t.setDaemon(true);
                return t;
            });
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriber.onSubscribe(subscription);
        }

        public void onNext(ByteBuffer byteBuffer) {
            this.processingExecutor.execute(() -> {
                try {
                    this.processData(byteBuffer);
                    this.subscriber.onNext((Object)byteBuffer);
                }
                catch (Exception e) {
                    this.iterator.endOfFailure(e);
                    this.subscriber.onError((Throwable)e);
                }
            });
        }

        private void processData(ByteBuffer byteBuffer) {
            if (!byteBuffer.hasRemaining()) {
                return;
            }
            byte[] bytes = new byte[byteBuffer.remaining()];
            byteBuffer.duplicate().get(bytes);
            this.buffer.write(bytes, 0, bytes.length);
            this.processBufferedData();
        }

        private void processBufferedData() {
            byte[] data = this.buffer.toByteArray();
            int lastProcessedIndex = 0;
            while (lastProcessedIndex < data.length) {
                int separatorIndex = this.findNextSeparator(data, lastProcessedIndex);
                if (separatorIndex == -1) {
                    if (lastProcessedIndex <= 0) break;
                    byte[] remaining = Arrays.copyOfRange(data, lastProcessedIndex, data.length);
                    this.buffer.reset();
                    this.buffer.write(remaining, 0, remaining.length);
                    break;
                }
                byte[] eventBytes = Arrays.copyOfRange(data, lastProcessedIndex, separatorIndex);
                String eventString = new String(eventBytes, StandardCharsets.UTF_8);
                if (!eventString.trim().isEmpty()) {
                    try {
                        Event event = Event.parse(eventString);
                        this.iterator.addEvent(event);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                lastProcessedIndex = separatorIndex + this.getSeparatorLength(data, separatorIndex);
            }
            if (lastProcessedIndex >= data.length) {
                this.buffer.reset();
            }
        }

        private int findNextSeparator(byte[] data, int startIndex) {
            for (int i = startIndex; i <= data.length - this.getMinSeparatorLength(); ++i) {
                for (byte[] separator : SEPARATORS) {
                    if (!this.matchesAt(data, i, separator)) continue;
                    return i;
                }
            }
            return -1;
        }

        private boolean matchesAt(byte[] data, int index, byte[] pattern) {
            if (index + pattern.length > data.length) {
                return false;
            }
            for (int i = 0; i < pattern.length; ++i) {
                if (data[index + i] == pattern[i]) continue;
                return false;
            }
            return true;
        }

        private int getSeparatorLength(byte[] data, int index) {
            for (byte[] separator : SEPARATORS) {
                if (!this.matchesAt(data, index, separator)) continue;
                return separator.length;
            }
            return 0;
        }

        private int getMinSeparatorLength() {
            int min = Integer.MAX_VALUE;
            for (byte[] separator : SEPARATORS) {
                min = Math.min(min, separator.length);
            }
            return min;
        }

        public void onError(Throwable throwable) {
            this.iterator.endOfFailure(throwable);
            this.subscriber.onError(throwable);
        }

        public void onComplete() {
            this.processingExecutor.execute(() -> {
                try {
                    String eventString;
                    byte[] remaining = this.buffer.toByteArray();
                    if (remaining.length > 0 && !(eventString = new String(remaining, StandardCharsets.UTF_8)).trim().isEmpty()) {
                        try {
                            Event event = Event.parse(eventString);
                            this.iterator.addEvent(event);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                    this.iterator.endOfEvent();
                    this.subscriber.onComplete();
                }
                catch (Exception e) {
                    this.iterator.endOfFailure(e);
                    this.subscriber.onError((Throwable)e);
                }
            });
        }
    }
}

