/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.misc.Loggers;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import istio.mcp.v1alpha1.ResourceOuterClass;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Service;

@Service
public class NacosMcpOverXdsService
extends AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase {
    private final AtomicInteger connectIdGenerator = new AtomicInteger(0);
    private final Map<Integer, StreamObserver<DiscoveryResponse>> connnections = new ConcurrentHashMap<Integer, StreamObserver<DiscoveryResponse>>(16);
    private final ConcurrentHashMap<Integer, Boolean> connectionInited = new ConcurrentHashMap();
    private static final String MCP_RESOURCES_URL = "type.googleapis.com/istio.mcp.v1alpha1.Resource";
    private static final String SERVICEENTY_TYPE = "networking.istio.io/v1alpha3/ServiceEntry";
    private Map<String, ResourceOuterClass.Resource> resourceMapCache;

    public void sendResources(Map<String, ResourceOuterClass.Resource> resourceMap) {
        this.resourceMapCache = resourceMap;
        Loggers.MAIN.info("send resources for mcpOverXds,count : {}", (Object)resourceMap.size());
        DiscoveryResponse discoveryResponse = this.generateResponse(resourceMap);
        if (Loggers.MAIN.isDebugEnabled()) {
            Loggers.MAIN.debug("discoveryResponse:{}", (Object)discoveryResponse.toString());
        }
        for (StreamObserver<DiscoveryResponse> observer : this.connnections.values()) {
            Loggers.MAIN.info("mcpOverXds send to:{}", (Object)observer.toString());
            observer.onNext((Object)discoveryResponse);
        }
    }

    private DiscoveryResponse generateResponse(Map<String, ResourceOuterClass.Resource> resourceMap) {
        ArrayList<Any> anies = new ArrayList<Any>();
        for (ResourceOuterClass.Resource resource : resourceMap.values()) {
            Any any = Any.newBuilder().setValue(resource.toByteString()).setTypeUrl(MCP_RESOURCES_URL).build();
            anies.add(any);
        }
        return DiscoveryResponse.newBuilder().addAllResources(anies).setNonce(String.valueOf(System.currentTimeMillis())).setTypeUrl(SERVICEENTY_TYPE).build();
    }

    public StreamObserver<DiscoveryRequest> streamAggregatedResources(final StreamObserver<DiscoveryResponse> responseObserver) {
        final int id = this.connectIdGenerator.incrementAndGet();
        this.connnections.put(id, responseObserver);
        return new StreamObserver<DiscoveryRequest>(){
            private final int connectionId;
            {
                this.connectionId = id;
            }

            public void onNext(DiscoveryRequest discoveryRequest) {
                Boolean inited;
                Loggers.MAIN.info("receiving request,  {}", (Object)discoveryRequest.toString());
                if (discoveryRequest.getErrorDetail() != null && discoveryRequest.getErrorDetail().getCode() != 0) {
                    Loggers.MAIN.error("NACK error code: {}, message: {}", (Object)discoveryRequest.getErrorDetail().getCode(), (Object)discoveryRequest.getErrorDetail().getMessage());
                    return;
                }
                if (NacosMcpOverXdsService.SERVICEENTY_TYPE.equals(discoveryRequest.getTypeUrl()) && ((inited = (Boolean)NacosMcpOverXdsService.this.connectionInited.get(id)) == null || !inited.booleanValue())) {
                    NacosMcpOverXdsService.this.connectionInited.put(id, true);
                    if (NacosMcpOverXdsService.this.resourceMapCache != null) {
                        DiscoveryResponse discoveryResponse = NacosMcpOverXdsService.this.generateResponse(NacosMcpOverXdsService.this.resourceMapCache);
                        Loggers.MAIN.info("ACK for serviceEntry discoveryRequest {}", (Object)discoveryRequest.toString());
                        responseObserver.onNext((Object)discoveryResponse);
                    }
                }
            }

            public void onError(Throwable throwable) {
                Loggers.MAIN.error("stream error.", throwable);
                NacosMcpOverXdsService.this.connnections.remove(this.connectionId);
            }

            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

