/*
 * Decompiled with CFR 0.152.
 */
package io.github.pigmesh.ai.deepseek.core;

import io.github.pigmesh.ai.deepseek.core.ErrorHandling;
import io.github.pigmesh.ai.deepseek.core.Json;
import io.github.pigmesh.ai.deepseek.core.ResponseHandle;
import io.github.pigmesh.ai.deepseek.core.ResponseLoggingInterceptor;
import io.github.pigmesh.ai.deepseek.core.StreamingCompletionHandling;
import io.github.pigmesh.ai.deepseek.core.StreamingResponseHandling;
import io.github.pigmesh.ai.deepseek.core.Utils;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamingRequestExecutor<Request, Response, ResponseContent> {
    private static final Logger log = LoggerFactory.getLogger(StreamingRequestExecutor.class);
    private final OkHttpClient okHttpClient;
    private final String endpointUrl;
    private final Supplier<Request> requestWithStreamSupplier;
    private final Class<Response> responseClass;
    private final Function<Response, ResponseContent> streamEventContentExtractor;
    private final boolean logStreamingResponses;
    private final ResponseLoggingInterceptor responseLogger = new ResponseLoggingInterceptor();

    StreamingRequestExecutor(OkHttpClient okHttpClient, String endpointUrl, Supplier<Request> requestWithStreamSupplier, Class<Response> responseClass, Function<Response, ResponseContent> streamEventContentExtractor, boolean logStreamingResponses) {
        this.okHttpClient = okHttpClient;
        this.endpointUrl = endpointUrl;
        this.requestWithStreamSupplier = requestWithStreamSupplier;
        this.responseClass = responseClass;
        this.streamEventContentExtractor = streamEventContentExtractor;
        this.logStreamingResponses = logStreamingResponses;
    }

    StreamingResponseHandling onPartialResponse(final Consumer<ResponseContent> partialResponseHandler) {
        return new StreamingResponseHandling(){

            @Override
            public StreamingCompletionHandling onComplete(final Runnable streamingCompletionCallback) {
                return new StreamingCompletionHandling(){

                    @Override
                    public ErrorHandling onError(final Consumer<Throwable> errorHandler) {
                        return new ErrorHandling(){

                            @Override
                            public ResponseHandle execute() {
                                return StreamingRequestExecutor.this.stream(partialResponseHandler, streamingCompletionCallback, errorHandler);
                            }
                        };
                    }

                    @Override
                    public ErrorHandling ignoreErrors() {
                        return new ErrorHandling(){

                            @Override
                            public ResponseHandle execute() {
                                return StreamingRequestExecutor.this.stream(partialResponseHandler, streamingCompletionCallback, e -> {});
                            }
                        };
                    }
                };
            }

            @Override
            public ErrorHandling onError(final Consumer<Throwable> errorHandler) {
                return new ErrorHandling(){

                    @Override
                    public ResponseHandle execute() {
                        return StreamingRequestExecutor.this.stream(partialResponseHandler, () -> {}, errorHandler);
                    }
                };
            }

            @Override
            public ErrorHandling ignoreErrors() {
                return new ErrorHandling(){

                    @Override
                    public ResponseHandle execute() {
                        return StreamingRequestExecutor.this.stream(partialResponseHandler, () -> {}, e -> {});
                    }
                };
            }
        };
    }

    private ResponseHandle stream(final Consumer<ResponseContent> partialResponseHandler, final Runnable streamingCompletionCallback, final Consumer<Throwable> errorHandler) {
        Request request = this.requestWithStreamSupplier.get();
        String requestJson = Json.toJson(request);
        Request okHttpRequest = new Request.Builder().url(this.endpointUrl).post(RequestBody.create((String)requestJson, (MediaType)MediaType.get((String)"application/json; charset=utf-8"))).build();
        final ResponseHandle responseHandle = new ResponseHandle();
        EventSourceListener eventSourceListener = new EventSourceListener(){

            public void onOpen(EventSource eventSource, Response response) {
                if (responseHandle.cancelled) {
                    eventSource.cancel();
                    return;
                }
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    StreamingRequestExecutor.this.responseLogger.log(response);
                }
            }

            public void onEvent(EventSource eventSource, String id, String type, String data) {
                if (responseHandle.cancelled) {
                    eventSource.cancel();
                    return;
                }
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    log.debug("onEvent() {}", (Object)data);
                }
                if ("[DONE]".equals(data)) {
                    return;
                }
                try {
                    Object response = Json.fromJson(data, StreamingRequestExecutor.this.responseClass);
                    Object responseContent = StreamingRequestExecutor.this.streamEventContentExtractor.apply(response);
                    if (responseContent != null) {
                        partialResponseHandler.accept(responseContent);
                    }
                }
                catch (Exception e) {
                    errorHandler.accept(e);
                }
            }

            public void onClosed(EventSource eventSource) {
                if (responseHandle.cancelled) {
                    eventSource.cancel();
                    return;
                }
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    log.debug("onClosed()");
                }
                streamingCompletionCallback.run();
            }

            public void onFailure(EventSource eventSource, Throwable t, Response response) {
                if (responseHandle.cancelled) {
                    return;
                }
                if (t instanceof IllegalArgumentException && "byteCount < 0: -1".equals(t.getMessage())) {
                    streamingCompletionCallback.run();
                    return;
                }
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    log.debug("onFailure()", t);
                    StreamingRequestExecutor.this.responseLogger.log(response);
                }
                if (t != null) {
                    errorHandler.accept(t);
                } else {
                    try {
                        errorHandler.accept(Utils.toException(response));
                    }
                    catch (IOException e) {
                        errorHandler.accept(e);
                    }
                }
            }
        };
        EventSources.createFactory((OkHttpClient)this.okHttpClient).newEventSource(okHttpRequest, eventSourceListener);
        return responseHandle;
    }
}

