/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.supersonic.headless.server.facade.service.impl;

import com.els.common.system.util.JwtUtil;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.tencent.supersonic.auth.api.authentication.config.AuthenticationConfig;
import com.tencent.supersonic.auth.api.authentication.pojo.User;
import com.tencent.supersonic.auth.api.authentication.service.UserService;
import com.tencent.supersonic.auth.api.authentication.utils.UserHolder;
import com.tencent.supersonic.common.pojo.QueryColumn;
import com.tencent.supersonic.headless.api.pojo.Param;
import com.tencent.supersonic.headless.api.pojo.request.QuerySqlReq;
import com.tencent.supersonic.headless.api.pojo.request.SemanticQueryReq;
import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.tencent.supersonic.headless.server.facade.service.FlightService;
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
import com.tencent.supersonic.headless.server.utils.FlightUtils;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.sql.rowset.CachedRowSet;
import javax.sql.rowset.RowSetFactory;
import javax.sql.rowset.RowSetMetaDataImpl;
import javax.sql.rowset.RowSetProvider;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightConstants;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.ServerHeaderMiddleware;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.BasicFlightSqlProducer;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service(value="FlightService")
public class FlightServiceImpl
extends BasicFlightSqlProducer
implements FlightService {
    private static final Logger log = LoggerFactory.getLogger(FlightServiceImpl.class);
    private String host;
    private Integer port;
    private ExecutorService executorService;
    private Cache<ByteString, SemanticQueryReq> preparedStatementCache;
    private final String dataSetIdHeaderKey = "dataSetId";
    private final String nameHeaderKey = "name";
    private final String passwordHeaderKey = "password";
    private final Calendar defaultCalendar = JdbcToArrowUtils.getUtcCalendar();
    private final SemanticLayerService queryService;
    private final AuthenticationConfig authenticationConfig;
    private final UserService userService;

    public FlightServiceImpl(SemanticLayerService queryService, AuthenticationConfig authenticationConfig, UserService userService) {
        this.queryService = queryService;
        this.authenticationConfig = authenticationConfig;
        this.userService = userService;
    }

    @Override
    public void setLocation(String host, Integer port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public void setExecutorService(ExecutorService executorService, Integer queue, Integer expireMinute) {
        this.executorService = executorService;
        this.preparedStatementCache = CacheBuilder.newBuilder().maximumSize((long)queue.intValue()).expireAfterWrite((long)expireMinute.intValue(), TimeUnit.MINUTES).build();
    }

    public FlightInfo getFlightInfo(FlightProducer.CallContext callContext, FlightDescriptor flightDescriptor) {
        return super.getFlightInfo(callContext, flightDescriptor);
    }

    public void getStreamStatement(FlightSql.TicketStatementQuery ticketStatementQuery, FlightProducer.CallContext context, FlightProducer.ServerStreamListener listener) {
        ByteString handle = ticketStatementQuery.getStatementHandle();
        log.info("getStreamStatement {} ", (Object)handle);
        this.executeQuery(handle, listener);
    }

    public FlightInfo getFlightInfoStatement(FlightSql.CommandStatementQuery request, FlightProducer.CallContext context, FlightDescriptor descriptor) {
        try {
            ByteString preparedStatementHandle = this.addPrepared(context, request.getQuery());
            FlightSql.TicketStatementQuery ticket = FlightSql.TicketStatementQuery.newBuilder().setStatementHandle(preparedStatementHandle).build();
            return this.getFlightInfoForSchema(ticket, descriptor, null);
        }
        catch (Exception e) {
            log.error("getFlightInfoStatement error {}", (Throwable)e);
            return null;
        }
    }

    public void getStreamPreparedStatement(FlightSql.CommandPreparedStatementQuery command, FlightProducer.CallContext context, FlightProducer.ServerStreamListener listener) {
        log.info("getStreamPreparedStatement {}", (Object)command.getPreparedStatementHandle());
        this.executeQuery(command.getPreparedStatementHandle(), listener);
    }

    private void executeQuery(ByteString hander, FlightProducer.ServerStreamListener listener) {
        SemanticQueryReq semanticQueryReq = (SemanticQueryReq)this.preparedStatementCache.getIfPresent((Object)hander);
        if (Objects.isNull(semanticQueryReq)) {
            listener.error((Throwable)CallStatus.INTERNAL.withDescription("Failed to get prepared statement: empty").toRuntimeException());
            log.error("getStreamPreparedStatement error {}", (Object)hander);
            listener.completed();
            return;
        }
        this.executorService.submit(() -> {
            block12: {
                RootAllocator rootAllocator = new RootAllocator();
                try {
                    Optional<Param> authOpt = semanticQueryReq.getParams().stream().filter(p -> p.getName().equals(this.authenticationConfig.getTokenHttpHeaderKey())).findFirst();
                    if (!authOpt.isPresent()) break block12;
                    User user = UserHolder.findUser((String)authOpt.get().getValue(), (String)this.authenticationConfig.getTokenHttpHeaderAppKey());
                    SemanticQueryResp resp = this.queryService.queryByReq(semanticQueryReq, user);
                    ResultSet resultSet = this.semanticQueryRespToResultSet(resp, semanticQueryReq.getDataSetId());
                    Schema schema = JdbcToArrowUtils.jdbcToArrowSchema((ResultSetMetaData)resultSet.getMetaData(), (Calendar)this.defaultCalendar);
                    try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)rootAllocator);){
                        VectorSchemaRoot batch;
                        VectorLoader loader = new VectorLoader(vectorSchemaRoot);
                        listener.start(vectorSchemaRoot);
                        ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator((ResultSet)resultSet, (BufferAllocator)rootAllocator);
                        while (iterator.hasNext() && (batch = iterator.next()).getRowCount() != 0) {
                            VectorUnloader unloader = new VectorUnloader(batch);
                            loader.load(unloader.getRecordBatch());
                            listener.putNext();
                            vectorSchemaRoot.clear();
                        }
                        listener.putNext();
                    }
                }
                catch (Exception e) {
                    listener.error((Throwable)CallStatus.INTERNAL.withDescription(String.format("Failed to get exec statement %s", e.getMessage())).toRuntimeException());
                    log.error("getStreamPreparedStatement error {}", (Object)hander);
                }
                finally {
                    this.preparedStatementCache.invalidate((Object)hander);
                    listener.completed();
                    rootAllocator.close();
                }
            }
        });
    }

    public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest request, FlightProducer.CallContext context, FlightProducer.StreamListener<Result> listener) {
        log.info("closePreparedStatement {}", (Object)request.getPreparedStatementHandle());
        listener.onCompleted();
    }

    public FlightInfo getFlightInfoPreparedStatement(FlightSql.CommandPreparedStatementQuery command, FlightProducer.CallContext context, FlightDescriptor descriptor) {
        return this.getFlightInfoForSchema(command, descriptor, null);
    }

    public void createPreparedStatement(FlightSql.ActionCreatePreparedStatementRequest request, FlightProducer.CallContext context, FlightProducer.StreamListener<Result> listener) {
        this.prepared(request, context, listener);
    }

    private ByteString addPrepared(FlightProducer.CallContext context, String query) throws Exception {
        if (Arrays.asList("dataSetId", "name", "password").stream().anyMatch(h -> !((ServerHeaderMiddleware)context.getMiddleware(FlightConstants.HEADER_KEY)).headers().containsKey(h))) {
            throw new Exception(String.format("Failed to create prepared statement: HeaderCallOption miss %s %s %s", "dataSetId", "name", "password"));
        }
        Long dataSetId = Long.valueOf(((ServerHeaderMiddleware)context.getMiddleware(FlightConstants.HEADER_KEY)).headers().get("dataSetId"));
        if (StringUtils.isBlank((CharSequence)query)) {
            throw new Exception("Failed to create prepared statement: query is empty");
        }
        String auth = this.getUserAuth(((ServerHeaderMiddleware)context.getMiddleware(FlightConstants.HEADER_KEY)).headers());
        if (StringUtils.isBlank((CharSequence)auth)) {
            throw new Exception("auth empty");
        }
        ByteString preparedStatementHandle = ByteString.copyFrom((byte[])UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
        QuerySqlReq querySqlReq = new QuerySqlReq();
        querySqlReq.setDataSetId(dataSetId);
        querySqlReq.setSql(query);
        querySqlReq.setParams(Arrays.asList(new Param(this.authenticationConfig.getTokenHttpHeaderKey(), auth)));
        this.preparedStatementCache.put((Object)preparedStatementHandle, (Object)querySqlReq);
        log.info("createPreparedStatement {} {} {} ", new Object[]{preparedStatementHandle, dataSetId, query});
        return preparedStatementHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepared(FlightSql.ActionCreatePreparedStatementRequest request, FlightProducer.CallContext context, FlightProducer.StreamListener<Result> listener) {
        try {
            ByteString preparedStatementHandle = this.addPrepared(context, request.getQuery());
            FlightSql.ActionCreatePreparedStatementResult result = FlightSql.ActionCreatePreparedStatementResult.newBuilder().setDatasetSchema(ByteString.EMPTY).setParameterSchema(ByteString.empty()).setPreparedStatementHandle(preparedStatementHandle).build();
            listener.onNext((Object)new Result(Any.pack((Message)result).toByteArray()));
        }
        catch (Exception e) {
            listener.onError((Throwable)CallStatus.INTERNAL.withDescription(String.format("Failed to create prepared statement: %s", e.getMessage())).toRuntimeException());
        }
        finally {
            listener.onCompleted();
        }
    }

    protected <T extends Message> List<FlightEndpoint> determineEndpoints(T t, FlightDescriptor flightDescriptor, Schema schema) {
        throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
    }

    private <T extends Message> FlightInfo getFlightInfoForSchema(T request, FlightDescriptor descriptor, Schema schema) {
        Ticket ticket = new Ticket(Any.pack(request).toByteArray());
        Location listenLocation = Location.forGrpcInsecure((String)this.host, (int)this.port);
        List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, new Location[]{listenLocation}));
        return new FlightInfo(schema, descriptor, endpoints, -1L, -1L);
    }

    private String getUserAuth(CallHeaders callHeaders) throws Exception {
        return JwtUtil.sign((String)callHeaders.get("name"), (String)callHeaders.get("password"));
    }

    private ResultSet semanticQueryRespToResultSet(SemanticQueryResp resp, Long dataSetId) throws SQLException {
        RowSetFactory factory = RowSetProvider.newFactory();
        CachedRowSet rowset = factory.createCachedRowSet();
        RowSetMetaDataImpl rowSetMetaData = new RowSetMetaDataImpl();
        int columnNum = resp.getColumns().size();
        rowSetMetaData.setColumnCount(columnNum);
        for (int i = 1; i <= columnNum; ++i) {
            String columnName = ((QueryColumn)resp.getColumns().get(i - 1)).getNameEn();
            rowSetMetaData.setColumnName(i, columnName);
            Optional<Map> valOpt = resp.getResultList().stream().filter(r -> r.containsKey(columnName) && Objects.nonNull(r.get(columnName))).findFirst();
            if (valOpt.isPresent()) {
                int type = FlightUtils.resolveType(valOpt.get());
                rowSetMetaData.setColumnType(i, type);
                rowSetMetaData.setNullable(i, FlightUtils.isNullable(type));
            } else {
                rowSetMetaData.setNullable(i, 1);
                rowSetMetaData.setColumnType(i, 12);
            }
            rowSetMetaData.setCatalogName(i, String.valueOf(dataSetId));
            rowSetMetaData.setSchemaName(i, "dataSetId");
        }
        rowset.setMetaData(rowSetMetaData);
        for (Map row : resp.getResultList()) {
            rowset.moveToInsertRow();
            for (int i = 1; i <= columnNum; ++i) {
                String columnName = ((QueryColumn)resp.getColumns().get(i - 1)).getNameEn();
                if (row.containsKey(columnName)) {
                    rowset.updateObject(i, row.get(columnName));
                    continue;
                }
                rowset.updateObject(i, null);
            }
            rowset.insertRow();
            rowset.moveToCurrentRow();
        }
        return rowset;
    }
}

