package com.tencent.supersonic.headless.server.facade.service.impl;

import com.els.common.system.util.JwtUtil;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.tencent.supersonic.auth.api.authentication.config.AuthenticationConfig;
import com.tencent.supersonic.auth.api.authentication.service.UserService;
import com.tencent.supersonic.auth.api.authentication.utils.UserHolder;
import com.tencent.supersonic.common.pojo.QueryColumn;
import com.tencent.supersonic.headless.api.pojo.Param;
import com.tencent.supersonic.headless.api.pojo.request.QuerySqlReq;
import com.tencent.supersonic.headless.api.pojo.request.SemanticQueryReq;
import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.tencent.supersonic.headless.server.facade.service.FlightService;
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
import com.tencent.supersonic.headless.server.utils.FlightUtils;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.sql.rowset.CachedRowSet;
import javax.sql.rowset.RowSetMetaDataImpl;
import javax.sql.rowset.RowSetProvider;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightConstants;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.BasicFlightSqlProducer;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service("FlightService")
/* loaded from: input_file:com/tencent/supersonic/headless/server/facade/service/impl/FlightServiceImpl.class */
public class FlightServiceImpl extends BasicFlightSqlProducer implements FlightService {
    private static final Logger log = LoggerFactory.getLogger(FlightServiceImpl.class);
    private String host;
    private Integer port;
    private ExecutorService executorService;
    private Cache<ByteString, SemanticQueryReq> preparedStatementCache;
    private final String dataSetIdHeaderKey = "dataSetId";
    private final String nameHeaderKey = "name";
    private final String passwordHeaderKey = "password";
    private final Calendar defaultCalendar = JdbcToArrowUtils.getUtcCalendar();
    private final SemanticLayerService queryService;
    private final AuthenticationConfig authenticationConfig;
    private final UserService userService;

    public FlightServiceImpl(SemanticLayerService semanticLayerService, AuthenticationConfig authenticationConfig, UserService userService) {
        this.queryService = semanticLayerService;
        this.authenticationConfig = authenticationConfig;
        this.userService = userService;
    }

    @Override // com.tencent.supersonic.headless.server.facade.service.FlightService
    public void setLocation(String str, Integer num) {
        this.host = str;
        this.port = num;
    }

    @Override // com.tencent.supersonic.headless.server.facade.service.FlightService
    public void setExecutorService(ExecutorService executorService, Integer num, Integer num2) {
        this.executorService = executorService;
        this.preparedStatementCache = CacheBuilder.newBuilder().maximumSize(num.intValue()).expireAfterWrite(num2.intValue(), TimeUnit.MINUTES).build();
    }

    public FlightInfo getFlightInfo(FlightProducer.CallContext callContext, FlightDescriptor flightDescriptor) {
        return super.getFlightInfo(callContext, flightDescriptor);
    }

    public void getStreamStatement(FlightSql.TicketStatementQuery ticketStatementQuery, FlightProducer.CallContext callContext, FlightProducer.ServerStreamListener serverStreamListener) {
        ByteString statementHandle = ticketStatementQuery.getStatementHandle();
        log.info("getStreamStatement {} ", statementHandle);
        executeQuery(statementHandle, serverStreamListener);
    }

    public FlightInfo getFlightInfoStatement(FlightSql.CommandStatementQuery commandStatementQuery, FlightProducer.CallContext callContext, FlightDescriptor flightDescriptor) {
        try {
            return getFlightInfoForSchema(FlightSql.TicketStatementQuery.newBuilder().setStatementHandle(addPrepared(callContext, commandStatementQuery.getQuery())).build(), flightDescriptor, null);
        } catch (Exception e) {
            log.error("getFlightInfoStatement error {}", e);
            return null;
        }
    }

    public void getStreamPreparedStatement(FlightSql.CommandPreparedStatementQuery commandPreparedStatementQuery, FlightProducer.CallContext callContext, FlightProducer.ServerStreamListener serverStreamListener) {
        log.info("getStreamPreparedStatement {}", commandPreparedStatementQuery.getPreparedStatementHandle());
        executeQuery(commandPreparedStatementQuery.getPreparedStatementHandle(), serverStreamListener);
    }

    private void executeQuery(ByteString byteString, FlightProducer.ServerStreamListener serverStreamListener) {
        SemanticQueryReq semanticQueryReq = (SemanticQueryReq) this.preparedStatementCache.getIfPresent(byteString);
        if (!Objects.isNull(semanticQueryReq)) {
            this.executorService.submit(() -> {
                RootAllocator rootAllocator = new RootAllocator();
                try {
                    try {
                        Optional findFirst = semanticQueryReq.getParams().stream().filter(param -> {
                            return param.getName().equals(this.authenticationConfig.getTokenHttpHeaderKey());
                        }).findFirst();
                        if (findFirst.isPresent()) {
                            ResultSet semanticQueryRespToResultSet = semanticQueryRespToResultSet(this.queryService.queryByReq(semanticQueryReq, UserHolder.findUser(((Param) findFirst.get()).getValue(), this.authenticationConfig.getTokenHttpHeaderAppKey())), semanticQueryReq.getDataSetId());
                            VectorSchemaRoot create = VectorSchemaRoot.create(JdbcToArrowUtils.jdbcToArrowSchema(semanticQueryRespToResultSet.getMetaData(), this.defaultCalendar), rootAllocator);
                            try {
                                VectorLoader vectorLoader = new VectorLoader(create);
                                serverStreamListener.start(create);
                                ArrowVectorIterator sqlToArrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator(semanticQueryRespToResultSet, rootAllocator);
                                while (sqlToArrowVectorIterator.hasNext()) {
                                    VectorSchemaRoot next = sqlToArrowVectorIterator.next();
                                    if (next.getRowCount() == 0) {
                                        break;
                                    }
                                    vectorLoader.load(new VectorUnloader(next).getRecordBatch());
                                    serverStreamListener.putNext();
                                    create.clear();
                                }
                                serverStreamListener.putNext();
                                if (create != null) {
                                    create.close();
                                }
                            } catch (Throwable th) {
                                if (create != null) {
                                    try {
                                        create.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        }
                        this.preparedStatementCache.invalidate(byteString);
                        serverStreamListener.completed();
                        rootAllocator.close();
                    } catch (Exception e) {
                        serverStreamListener.error(CallStatus.INTERNAL.withDescription(String.format("Failed to get exec statement %s", e.getMessage())).toRuntimeException());
                        log.error("getStreamPreparedStatement error {}", byteString);
                        this.preparedStatementCache.invalidate(byteString);
                        serverStreamListener.completed();
                        rootAllocator.close();
                    }
                } catch (Throwable th3) {
                    this.preparedStatementCache.invalidate(byteString);
                    serverStreamListener.completed();
                    rootAllocator.close();
                    throw th3;
                }
            });
            return;
        }
        serverStreamListener.error(CallStatus.INTERNAL.withDescription("Failed to get prepared statement: empty").toRuntimeException());
        log.error("getStreamPreparedStatement error {}", byteString);
        serverStreamListener.completed();
    }

    public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest actionClosePreparedStatementRequest, FlightProducer.CallContext callContext, FlightProducer.StreamListener<Result> streamListener) {
        log.info("closePreparedStatement {}", actionClosePreparedStatementRequest.getPreparedStatementHandle());
        streamListener.onCompleted();
    }

    public FlightInfo getFlightInfoPreparedStatement(FlightSql.CommandPreparedStatementQuery commandPreparedStatementQuery, FlightProducer.CallContext callContext, FlightDescriptor flightDescriptor) {
        return getFlightInfoForSchema(commandPreparedStatementQuery, flightDescriptor, null);
    }

    public void createPreparedStatement(FlightSql.ActionCreatePreparedStatementRequest actionCreatePreparedStatementRequest, FlightProducer.CallContext callContext, FlightProducer.StreamListener<Result> streamListener) {
        prepared(actionCreatePreparedStatementRequest, callContext, streamListener);
    }

    private ByteString addPrepared(FlightProducer.CallContext callContext, String str) throws Exception {
        if (Arrays.asList("dataSetId", "name", "password").stream().anyMatch(str2 -> {
            return !callContext.getMiddleware(FlightConstants.HEADER_KEY).headers().containsKey(str2);
        })) {
            throw new Exception(String.format("Failed to create prepared statement: HeaderCallOption miss %s %s %s", "dataSetId", "name", "password"));
        }
        Long valueOf = Long.valueOf(callContext.getMiddleware(FlightConstants.HEADER_KEY).headers().get("dataSetId"));
        if (StringUtils.isBlank(str)) {
            throw new Exception("Failed to create prepared statement: query is empty");
        }
        try {
            String userAuth = getUserAuth(callContext.getMiddleware(FlightConstants.HEADER_KEY).headers());
            if (StringUtils.isBlank(userAuth)) {
                throw new Exception("auth empty");
            }
            ByteString copyFrom = ByteString.copyFrom(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
            QuerySqlReq querySqlReq = new QuerySqlReq();
            querySqlReq.setDataSetId(valueOf);
            querySqlReq.setSql(str);
            querySqlReq.setParams(Arrays.asList(new Param(this.authenticationConfig.getTokenHttpHeaderKey(), userAuth)));
            this.preparedStatementCache.put(copyFrom, querySqlReq);
            log.info("createPreparedStatement {} {} {} ", new Object[]{copyFrom, valueOf, str});
            return copyFrom;
        } catch (Exception e) {
            throw e;
        }
    }

    private void prepared(FlightSql.ActionCreatePreparedStatementRequest actionCreatePreparedStatementRequest, FlightProducer.CallContext callContext, FlightProducer.StreamListener<Result> streamListener) {
        try {
            try {
                streamListener.onNext(new Result(Any.pack(FlightSql.ActionCreatePreparedStatementResult.newBuilder().setDatasetSchema(ByteString.EMPTY).setParameterSchema(ByteString.empty()).setPreparedStatementHandle(addPrepared(callContext, actionCreatePreparedStatementRequest.getQuery())).build()).toByteArray()));
                streamListener.onCompleted();
            } catch (Exception e) {
                streamListener.onError(CallStatus.INTERNAL.withDescription(String.format("Failed to create prepared statement: %s", e.getMessage())).toRuntimeException());
                streamListener.onCompleted();
            }
        } catch (Throwable th) {
            streamListener.onCompleted();
            throw th;
        }
    }

    protected <T extends Message> List<FlightEndpoint> determineEndpoints(T t, FlightDescriptor flightDescriptor, Schema schema) {
        throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
    }

    private <T extends Message> FlightInfo getFlightInfoForSchema(T t, FlightDescriptor flightDescriptor, Schema schema) {
        return new FlightInfo(schema, flightDescriptor, Collections.singletonList(new FlightEndpoint(new Ticket(Any.pack(t).toByteArray()), new Location[]{Location.forGrpcInsecure(this.host, this.port.intValue())})), -1L, -1L);
    }

    private String getUserAuth(CallHeaders callHeaders) throws Exception {
        return JwtUtil.sign(callHeaders.get("name"), callHeaders.get("password"));
    }

    private ResultSet semanticQueryRespToResultSet(SemanticQueryResp semanticQueryResp, Long l) throws SQLException {
        CachedRowSet createCachedRowSet = RowSetProvider.newFactory().createCachedRowSet();
        RowSetMetaDataImpl rowSetMetaDataImpl = new RowSetMetaDataImpl();
        int size = semanticQueryResp.getColumns().size();
        rowSetMetaDataImpl.setColumnCount(size);
        for (int i = 1; i <= size; i++) {
            String nameEn = ((QueryColumn) semanticQueryResp.getColumns().get(i - 1)).getNameEn();
            rowSetMetaDataImpl.setColumnName(i, nameEn);
            Optional findFirst = semanticQueryResp.getResultList().stream().filter(map -> {
                return map.containsKey(nameEn) && Objects.nonNull(map.get(nameEn));
            }).findFirst();
            if (findFirst.isPresent()) {
                int resolveType = FlightUtils.resolveType(findFirst.get());
                rowSetMetaDataImpl.setColumnType(i, resolveType);
                rowSetMetaDataImpl.setNullable(i, FlightUtils.isNullable(resolveType));
            } else {
                rowSetMetaDataImpl.setNullable(i, 1);
                rowSetMetaDataImpl.setColumnType(i, 12);
            }
            rowSetMetaDataImpl.setCatalogName(i, String.valueOf(l));
            rowSetMetaDataImpl.setSchemaName(i, "dataSetId");
        }
        createCachedRowSet.setMetaData(rowSetMetaDataImpl);
        for (Map map2 : semanticQueryResp.getResultList()) {
            createCachedRowSet.moveToInsertRow();
            for (int i2 = 1; i2 <= size; i2++) {
                String nameEn2 = ((QueryColumn) semanticQueryResp.getColumns().get(i2 - 1)).getNameEn();
                if (map2.containsKey(nameEn2)) {
                    createCachedRowSet.updateObject(i2, map2.get(nameEn2));
                } else {
                    createCachedRowSet.updateObject(i2, (Object) null);
                }
            }
            createCachedRowSet.insertRow();
            createCachedRowSet.moveToCurrentRow();
        }
        return createCachedRowSet;
    }
}
