/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cairo;

import io.questdb.MessageBus;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.O3PurgeJob;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.TxnScoreboard;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.AbstractQueueConsumerJob;
import io.questdb.mp.RingQueue;
import io.questdb.mp.Sequence;
import io.questdb.std.Chars;
import io.questdb.std.FilesFacade;
import io.questdb.std.LongList;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.str.MutableCharSink;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.tasks.O3PurgeDiscoveryTask;
import io.questdb.tasks.O3PurgeTask;
import org.jetbrains.annotations.Nullable;

public class O3PurgeDiscoveryJob
extends AbstractQueueConsumerJob<O3PurgeDiscoveryTask> {
    private static final Log LOG = LogFactory.getLog(O3PurgeDiscoveryJob.class);
    private final CairoConfiguration configuration;
    private final MutableCharSink[] sink;
    private final NativeLPSZ[] nativeLPSZ;
    private final LongList[] txnList;
    private final RingQueue<O3PurgeTask> purgeQueue;
    private final Sequence purgePubSeq;

    public O3PurgeDiscoveryJob(MessageBus messageBus, int workerCount) {
        super(messageBus.getO3PurgeDiscoveryQueue(), messageBus.getO3PurgeDiscoverySubSeq());
        this.configuration = messageBus.getConfiguration();
        this.purgeQueue = messageBus.getO3PurgeQueue();
        this.purgePubSeq = messageBus.getO3PurgePubSeq();
        this.sink = new MutableCharSink[workerCount];
        this.nativeLPSZ = new NativeLPSZ[workerCount];
        this.txnList = new LongList[workerCount];
        for (int i = 0; i < workerCount; ++i) {
            this.sink[i] = new StringSink();
            this.nativeLPSZ[i] = new NativeLPSZ();
            this.txnList[i] = new LongList();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean discoverPartitions(FilesFacade ff, MutableCharSink sink, NativeLPSZ nativeLPSZ, LongList txnList, RingQueue<O3PurgeTask> purgeQueue, @Nullable Sequence purgePubSeq, CharSequence root, CharSequence tableName, int partitionBy, long partitionTimestamp, TxnScoreboard txnScoreboard) {
        LOG.info().$("processing [table=").$(tableName).$(", ts=").$ts(partitionTimestamp).I$();
        Path path = Path.getThreadLocal(root);
        path.concat(tableName).slash$();
        sink.clear();
        TableUtils.setSinkForPartition(sink, partitionBy, partitionTimestamp, false);
        path.slash$();
        txnList.clear();
        long p = ff.findFirst(path);
        if (p > 0L) {
            try {
                do {
                    O3PurgeDiscoveryJob.processDir(sink, nativeLPSZ, tableName, txnList, ff.findName(p), ff.findType(p));
                } while (ff.findNext(p) > 0);
            }
            finally {
                ff.findClose(p);
            }
        }
        if (txnList.size() > 1) {
            txnList.sort();
            int n = txnList.size() - 1;
            for (int i = 0; i < n; ++i) {
                long nameTxnToRemove = txnList.getQuick(i);
                long minTxnToExpect = txnList.getQuick(i + 1);
                int errno = O3PurgeJob.purgePartitionDir(ff, path.of(root).concat(tableName), partitionBy, partitionTimestamp, txnScoreboard, nameTxnToRemove, minTxnToExpect);
                if (errno == 0) continue;
                if (purgePubSeq != null) {
                    long cursor = purgePubSeq.next();
                    if (cursor > -1L) {
                        LOG.error().$("queuing [table=").$(tableName).$(", ts=").$ts(partitionTimestamp).$(", txn=").$(nameTxnToRemove).$(", errno=").$(errno).$(']').$();
                        O3PurgeTask task = purgeQueue.get(cursor);
                        task.of(tableName, partitionBy, txnScoreboard, partitionTimestamp, nameTxnToRemove, minTxnToExpect);
                        purgePubSeq.done(cursor);
                        continue;
                    }
                    LOG.error().$("purge queue is full [table=").$(tableName).$(", ts=").$ts(partitionTimestamp).$(", txn=").$(nameTxnToRemove).$(", errno=").$(errno).$(']').$();
                    continue;
                }
                LOG.error().$("could not purge [table=").$(tableName).$(", ts=").$ts(partitionTimestamp).$(", txn=").$(nameTxnToRemove).$(", errno=").$(errno).$(']').$();
            }
            return true;
        }
        return false;
    }

    private static void processDir(MutableCharSink sink, NativeLPSZ nativeLPSZ, CharSequence tableName, LongList txnList, long name, int type) {
        if (type == 4) {
            nativeLPSZ.of(name);
            if (Chars.notDots(nativeLPSZ) && Chars.startsWith((CharSequence)nativeLPSZ, sink)) {
                int index = Chars.lastIndexOf(nativeLPSZ, '.');
                if (index < 0) {
                    txnList.add(-1L);
                } else {
                    try {
                        txnList.add(Numbers.parseLong(nativeLPSZ, index + 1, nativeLPSZ.length()));
                    }
                    catch (NumericException e) {
                        LOG.error().$("unknown directory [table=").utf8(tableName).$(", dir=").utf8(nativeLPSZ).$(']').$();
                    }
                }
            }
        }
    }

    @Override
    protected boolean doRun(int workerId, long cursor) {
        O3PurgeDiscoveryTask task = (O3PurgeDiscoveryTask)this.queue.get(cursor);
        boolean useful = O3PurgeDiscoveryJob.discoverPartitions(this.configuration.getFilesFacade(), this.sink[workerId], this.nativeLPSZ[workerId], this.txnList[workerId], this.purgeQueue, this.purgePubSeq, this.configuration.getRoot(), task.getTableName(), task.getPartitionBy(), task.getTimestamp(), task.getTxnScoreboard());
        this.subSeq.done(cursor);
        return useful;
    }
}

