/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.table;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcLookupFunction
extends TableFunction<Row> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupFunction.class);
    private static final long serialVersionUID = 2L;
    private final String query;
    private final JdbcConnectionProvider connectionProvider;
    private final TypeInformation[] keyTypes;
    private final int[] keySqlTypes;
    private final String[] fieldNames;
    private final String[] keyNames;
    private final TypeInformation[] fieldTypes;
    private final int[] outputSqlTypes;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private transient PreparedStatement statement;
    private transient Cache<Row, List<Row>> cache;

    public JdbcLookupFunction(JdbcConnectorOptions options, JdbcLookupOptions lookupOptions, String[] fieldNames, TypeInformation[] fieldTypes, String[] keyNames) {
        this.connectionProvider = new SimpleJdbcConnectionProvider(options);
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
        this.keyNames = keyNames;
        List<String> nameList = Arrays.asList(fieldNames);
        this.keyTypes = (TypeInformation[])Arrays.stream(keyNames).map(s -> {
            Preconditions.checkArgument((boolean)nameList.contains(s), (String)"keyName %s can't find in fieldNames %s.", (Object[])new Object[]{s, nameList});
            return fieldTypes[nameList.indexOf(s)];
        }).toArray(TypeInformation[]::new);
        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
        this.keySqlTypes = Arrays.stream(this.keyTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
        this.outputSqlTypes = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
        this.query = FieldNamedPreparedStatementImpl.parseNamedStatement(options.getDialect().getSelectFromStatement(options.getTableName(), fieldNames, keyNames), new HashMap<String, List<Integer>>());
    }

    public static Builder builder() {
        return new Builder();
    }

    public void open(FunctionContext context) throws Exception {
        try {
            this.establishConnectionAndStatement();
            this.cache = this.cacheMaxSize == -1L || this.cacheExpireMs == -1L ? null : CacheBuilder.newBuilder().expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(this.cacheMaxSize).build();
        }
        catch (SQLException sqe) {
            throw new IllegalArgumentException("open() failed.", sqe);
        }
        catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
        }
    }

    public void eval(Object ... keys) {
        List cachedRows;
        Row keyRow = Row.of((Object[])keys);
        if (this.cache != null && (cachedRows = (List)this.cache.getIfPresent((Object)keyRow)) != null) {
            for (Row cachedRow : cachedRows) {
                this.collect(cachedRow);
            }
            return;
        }
        for (int retry = 0; retry <= this.maxRetryTimes; ++retry) {
            try {
                this.statement.clearParameters();
                for (int i = 0; i < keys.length; ++i) {
                    JdbcUtils.setField(this.statement, this.keySqlTypes[i], keys[i], i);
                }
                try (ResultSet resultSet = this.statement.executeQuery();){
                    if (this.cache == null) {
                        while (resultSet.next()) {
                            this.collect(this.convertToRowFromResultSet(resultSet));
                        }
                    } else {
                        ArrayList<Row> rows = new ArrayList<Row>();
                        while (resultSet.next()) {
                            Row row = this.convertToRowFromResultSet(resultSet);
                            rows.add(row);
                            this.collect(row);
                        }
                        rows.trimToSize();
                        this.cache.put((Object)keyRow, rows);
                    }
                    break;
                }
            }
            catch (SQLException e) {
                LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), (Throwable)e);
                if (retry >= this.maxRetryTimes) {
                    throw new RuntimeException("Execution of JDBC statement failed.", e);
                }
                try {
                    if (!this.connectionProvider.isConnectionValid()) {
                        this.statement.close();
                        this.connectionProvider.closeConnection();
                        this.establishConnectionAndStatement();
                    }
                }
                catch (ClassNotFoundException | SQLException excpetion) {
                    LOG.error("JDBC connection is not valid, and reestablish connection failed", (Throwable)excpetion);
                    throw new RuntimeException("Reestablish JDBC connection failed", excpetion);
                }
                try {
                    Thread.sleep(1000 * retry);
                    continue;
                }
                catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
    }

    private Row convertToRowFromResultSet(ResultSet resultSet) throws SQLException {
        Row row = new Row(this.outputSqlTypes.length);
        for (int i = 0; i < this.outputSqlTypes.length; ++i) {
            row.setField(i, JdbcUtils.getFieldFromResultSet(i, this.outputSqlTypes[i], resultSet));
        }
        return row;
    }

    private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
        Connection dbConn = this.connectionProvider.getOrEstablishConnection();
        this.statement = dbConn.prepareStatement(this.query);
    }

    public void close() throws IOException {
        if (this.cache != null) {
            this.cache.cleanUp();
            this.cache = null;
        }
        if (this.statement != null) {
            try {
                this.statement.close();
            }
            catch (SQLException e) {
                LOG.info("JDBC statement could not be closed: " + e.getMessage());
            }
            finally {
                this.statement = null;
            }
        }
        this.connectionProvider.closeConnection();
    }

    @VisibleForTesting
    public Connection getDbConnection() {
        return this.connectionProvider.getConnection();
    }

    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(this.fieldTypes, this.fieldNames);
    }

    public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
        return this.keyTypes;
    }

    public static class Builder {
        private JdbcConnectorOptions options;
        private JdbcLookupOptions lookupOptions;
        protected String[] fieldNames;
        protected TypeInformation[] fieldTypes;
        protected String[] keyNames;

        public Builder setOptions(JdbcConnectorOptions options) {
            this.options = options;
            return this;
        }

        public Builder setLookupOptions(JdbcLookupOptions lookupOptions) {
            this.lookupOptions = lookupOptions;
            return this;
        }

        public Builder setFieldNames(String[] fieldNames) {
            this.fieldNames = fieldNames;
            return this;
        }

        public Builder setFieldTypes(TypeInformation[] fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }

        public Builder setKeyNames(String[] keyNames) {
            this.keyNames = keyNames;
            return this;
        }

        public JdbcLookupFunction build() {
            Preconditions.checkNotNull((Object)this.options, (String)"No JdbcOptions supplied.");
            if (this.lookupOptions == null) {
                this.lookupOptions = JdbcLookupOptions.builder().build();
            }
            Preconditions.checkNotNull((Object)this.fieldNames, (String)"No fieldNames supplied.");
            Preconditions.checkNotNull((Object)this.fieldTypes, (String)"No fieldTypes supplied.");
            Preconditions.checkNotNull((Object)this.keyNames, (String)"No keyNames supplied.");
            return new JdbcLookupFunction(this.options, this.lookupOptions, this.fieldNames, this.fieldTypes, this.keyNames);
        }
    }
}

