/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.events;

import io.a2a.server.events.EnhancedRunnable;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueClosedException;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.TaskStatusUpdateEvent;
import java.util.concurrent.Flow;
import mutiny.zero.BackpressureStrategy;
import mutiny.zero.TubeConfiguration;
import mutiny.zero.ZeroPublisher;

public class EventConsumer {
    private final EventQueue queue;
    private Throwable error;
    private static final String ERROR_MSG = "Agent did not return any response";
    private static final int NO_WAIT = -1;
    private static final int QUEUE_WAIT_MILLISECONDS = 500;

    public EventConsumer(EventQueue queue) {
        this.queue = queue;
    }

    public Event consumeOne() throws A2AServerException, EventQueueClosedException {
        Event event = this.queue.dequeueEvent(-1);
        if (event == null) {
            throw new A2AServerException(ERROR_MSG, (Throwable)new InternalError(ERROR_MSG));
        }
        return event;
    }

    public Flow.Publisher<Event> consumeAll() {
        TubeConfiguration conf = new TubeConfiguration().withBackpressureStrategy(BackpressureStrategy.BUFFER).withBufferSize(256);
        return ZeroPublisher.create((TubeConfiguration)conf, tube -> {
            boolean completed = false;
            try {
                while (true) {
                    TaskStatusUpdateEvent tue;
                    if (this.error != null) {
                        completed = true;
                        tube.fail(this.error);
                        return;
                    }
                    Event event = this.queue.dequeueEvent(500);
                    if (event == null) continue;
                    if (event instanceof Throwable) {
                        Throwable thr = (Throwable)event;
                        tube.fail(thr);
                        return;
                    }
                    try {
                        tube.send((Object)event);
                    }
                    catch (EventQueueClosedException e) {
                        completed = true;
                        tube.complete();
                        return;
                    }
                    catch (Throwable t) {
                        tube.fail(t);
                        return;
                    }
                    boolean isFinalEvent = false;
                    if (event instanceof TaskStatusUpdateEvent && (tue = (TaskStatusUpdateEvent)event).isFinal()) {
                        isFinalEvent = true;
                    } else if (event instanceof Message) {
                        isFinalEvent = true;
                    } else if (event instanceof Task) {
                        Task task = (Task)event;
                        switch (task.getStatus().state()) {
                            case COMPLETED: 
                            case CANCELED: 
                            case FAILED: 
                            case REJECTED: 
                            case UNKNOWN: {
                                isFinalEvent = true;
                            }
                        }
                    }
                    if (isFinalEvent) break;
                }
                this.queue.close();
            }
            finally {
                if (!completed) {
                    tube.complete();
                }
            }
        });
    }

    public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
        return agentRunnable -> {
            if (agentRunnable.getError() != null) {
                this.error = agentRunnable.getError();
            }
        };
    }
}

