/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.operators.sink.StreamingCommitterState;
import org.apache.flink.util.Preconditions;

final class StreamingCommitterStateSerializer<CommT>
implements SimpleVersionedSerializer<StreamingCommitterState<CommT>> {
    private static final int MAGIC_NUMBER = -1189141204;
    private final SimpleVersionedSerializer<CommT> committableSerializer;

    StreamingCommitterStateSerializer(SimpleVersionedSerializer<CommT> committableSerializer) {
        this.committableSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(committableSerializer);
    }

    public int getVersion() {
        return 1;
    }

    public byte[] serialize(StreamingCommitterState<CommT> state) throws IOException {
        DataOutputSerializer out = new DataOutputSerializer(256);
        out.writeInt(-1189141204);
        this.serializeV1(state, (DataOutputView)out);
        return out.getCopyOfBuffer();
    }

    public StreamingCommitterState<CommT> deserialize(int version, byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        if (version == 1) {
            StreamingCommitterStateSerializer.validateMagicNumber((DataInputView)in);
            return this.deserializeV1((DataInputView)in);
        }
        throw new IOException("Unrecognized version or corrupt state: " + version);
    }

    private StreamingCommitterState<CommT> deserializeV1(DataInputView in) throws IOException {
        ArrayList<Object> r = new ArrayList<Object>();
        int committableSerializerVersion = in.readInt();
        int numOfCommittable = in.readInt();
        for (int i = 0; i < numOfCommittable; ++i) {
            byte[] bytes = new byte[in.readInt()];
            in.readFully(bytes);
            Object committable = this.committableSerializer.deserialize(committableSerializerVersion, bytes);
            r.add(committable);
        }
        return new StreamingCommitterState(r);
    }

    private void serializeV1(StreamingCommitterState<CommT> state, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(this.committableSerializer.getVersion());
        dataOutputView.writeInt(state.getCommittables().size());
        for (CommT committable : state.getCommittables()) {
            byte[] serialized = this.committableSerializer.serialize(committable);
            dataOutputView.writeInt(serialized.length);
            dataOutputView.write(serialized);
        }
    }

    private static void validateMagicNumber(DataInputView in) throws IOException {
        int magicNumber = in.readInt();
        if (magicNumber != -1189141204) {
            throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
        }
    }
}

