/*
 * Decompiled with CFR 0.152.
 */
package com.github.microwww.redis.protocal.operation;

import com.github.microwww.redis.ChannelContext;
import com.github.microwww.redis.RequestParams;
import com.github.microwww.redis.database.Bytes;
import com.github.microwww.redis.database.PubSub;
import com.github.microwww.redis.logger.LogFactory;
import com.github.microwww.redis.logger.Logger;
import com.github.microwww.redis.protocal.AbstractOperation;
import com.github.microwww.redis.protocal.RedisRequest;
import com.github.microwww.redis.util.Assert;
import com.github.microwww.redis.util.SafeEncoder;
import com.github.microwww.redis.util.StringUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;

public class PubSubOperation
extends AbstractOperation {
    private static final Logger log = LogFactory.getLogger(PubSubOperation.class);

    public void psubscribe(RedisRequest request) throws IOException {
        request.expectArgumentsCountGE(1);
        RequestParams[] args = request.getParams();
        Object[] sub = new Object[3];
        sub[0] = "psubscribe".getBytes(StandardCharsets.UTF_8);
        PubSub pubSub = request.getPubSub();
        for (RequestParams arg : args) {
            Bytes patten = arg.toBytes();
            NewChannelListener notify = new NewChannelListener(request.getContext(), patten, pubSub);
            try {
                notify.subscribe();
            }
            catch (Exception e) {
                log.warn("subscribe [{}] error", notify.patten, e);
                notify.unsubscribe();
            }
            sub[1] = patten.getBytes();
            sub[2] = request.getContext().getPattenSubscribe().subscribeChannels().size();
            request.getOutputProtocol().writerComplex(sub);
        }
    }

    public void publish(RedisRequest request) throws IOException {
        PubSub pubSub = request.getPubSub();
        request.expectArgumentsCount(2);
        RequestParams[] args = request.getParams();
        Bytes channel = args[0].toBytes();
        Bytes message = args[1].toBytes();
        int count = pubSub.publish(channel, message);
        request.getOutputProtocol().writer(count);
    }

    public void pubsub(RedisRequest request) throws IOException {
        String subcommand;
        request.expectArgumentsCountGE(1);
        RequestParams[] args = request.getParams();
        switch (subcommand = args[0].getByteArray2string().toLowerCase()) {
            case "channels": {
                this.subChannels(request);
                break;
            }
            case "numsub": {
                this.subNumSub(request);
                break;
            }
            case "numpat": {
                this.subNumPat(request);
                break;
            }
            default: {
                throw new UnsupportedOperationException(subcommand);
            }
        }
    }

    private void subChannels(RedisRequest request) throws IOException {
        PubSub pubSub = request.getPubSub();
        Stream<PubSub.MessageChannel> stream = pubSub.getChannels().values().stream().filter(PubSub.MessageChannel::isActive);
        if (request.getParams().length > 1) {
            String patten = request.getParams()[1].getByteArray2string();
            stream = stream.filter(e -> StringUtil.antPatternMatches(patten, SafeEncoder.encode(e.getChannel().getBytes())));
        }
        Object[] objects = stream.map(PubSub.MessageChannel::getChannel).toArray(Object[]::new);
        request.getOutputProtocol().writerComplex(objects);
    }

    private void subNumSub(RedisRequest request) throws IOException {
        PubSub pubSub = request.getPubSub();
        int len = request.getParams().length;
        ArrayList<Comparable<Bytes>> list = new ArrayList<Comparable<Bytes>>();
        if (len > 1) {
            for (int i = 1; i < len; ++i) {
                Bytes bytes = request.getParams()[i].toBytes();
                list.add(bytes);
                PubSub.MessageChannel mc = pubSub.getChannels().get(bytes);
                if (mc == null) {
                    list.add(Integer.valueOf(0));
                    continue;
                }
                list.add(Integer.valueOf(mc.getNumsub()));
            }
        }
        request.getOutputProtocol().writerComplex(list.toArray());
    }

    private void subNumPat(RedisRequest request) throws IOException {
        PubSub pubSub = request.getPubSub();
        request.getOutputProtocol().writer(new HashSet<Bytes>(pubSub.newChannelNotify.getPattens()).size());
    }

    public void punsubscribe(RedisRequest request) throws IOException {
        RequestParams[] args = request.getParams();
        Object[] uns = new Object[3];
        uns[0] = SafeEncoder.encode("punsubscribe");
        if (args.length == 0) {
            Map mpa = request.getContext().getPattenSubscribe().subscribeChannels();
            for (Bytes next : new HashSet<Bytes>(mpa.keySet())) {
                uns[1] = next;
                NewChannelListener.find(request.getContext(), next).ifPresent(NewChannelListener::unsubscribe);
                uns[2] = request.getContext().getPattenSubscribe().subscribeChannels().size();
                request.getOutputProtocol().writerComplex(uns);
            }
        } else {
            for (RequestParams arg : args) {
                Bytes bytes = arg.toBytes();
                uns[1] = bytes;
                ChannelMessageListener.find(request.getContext(), bytes).ifPresent(ChannelMessageListener::unsubscribe);
                uns[2] = request.getContext().getSubscribe().subscribeChannels().size();
                request.getOutputProtocol().writerComplex(uns);
            }
        }
        request.getOutputProtocol().flush();
    }

    public void subscribe(RedisRequest request) throws IOException {
        PubSub pubSub = request.getPubSub();
        request.expectArgumentsCountGE(1);
        RequestParams[] args = request.getParams();
        Object[] sub = new Object[3];
        sub[0] = "subscribe".getBytes(StandardCharsets.UTF_8);
        for (RequestParams arg : args) {
            Bytes bytes = arg.toBytes();
            ChannelMessageListener channelMessageListener = new ChannelMessageListener(request.getContext(), bytes, pubSub);
            try {
                channelMessageListener.subscribe();
            }
            catch (Exception e) {
                log.warn("subscribe [{}] error", channelMessageListener.channel, e);
                channelMessageListener.unsubscribe();
            }
            sub[1] = bytes.getBytes();
            sub[2] = request.getContext().getSubscribe().subscribeChannels().size();
            request.getOutputProtocol().writerComplex(sub);
        }
    }

    public void unsubscribe(RedisRequest request) throws IOException {
        RequestParams[] args = request.getParams();
        Object[] uns = new Object[3];
        uns[0] = "unsubscribe".getBytes(StandardCharsets.UTF_8);
        if (args.length == 0) {
            Map mpa = request.getContext().getSubscribe().subscribeChannels();
            for (Bytes next : new HashSet<Bytes>(mpa.keySet())) {
                uns[1] = next;
                ChannelMessageListener.find(request.getContext(), next).ifPresent(ChannelMessageListener::unsubscribe);
                uns[2] = request.getContext().getSubscribe().subscribeChannels().size();
                request.getOutputProtocol().writerComplex(uns);
            }
        } else {
            for (RequestParams arg : args) {
                Bytes bytes = arg.toBytes();
                uns[1] = bytes;
                ChannelMessageListener.find(request.getContext(), bytes).ifPresent(ChannelMessageListener::unsubscribe);
                uns[2] = request.getContext().getSubscribe().subscribeChannels().size();
                request.getOutputProtocol().writerComplex(uns);
            }
        }
        request.getOutputProtocol().flush();
    }

    public static class ChannelMessageListener
    implements Observer {
        private final ChannelContext context;
        private Bytes patten;
        private final Bytes channel;
        private final PubSub pubSub;
        private final ChannelContext.CloseListener channelClose;

        public ChannelMessageListener(ChannelContext context, Bytes channel, PubSub pubSub) {
            this.context = context;
            this.channel = channel;
            this.pubSub = pubSub;
            this.channelClose = context.addCloseListener(this::close);
        }

        private void close(ChannelContext context) {
            try {
                this.unsubscribe();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        @Override
        public void update(Observable o, Object arg) {
            try {
                ArrayList<Object> msg = new ArrayList<Object>(4);
                if (this.getPatten().isPresent()) {
                    msg.add(SafeEncoder.encode("pmessage"));
                    msg.add(this.getPatten().get().getBytes());
                } else {
                    msg.add(SafeEncoder.encode("message"));
                }
                msg.add(this.channel);
                Assert.isTrue(arg instanceof Bytes, "Observable publish must be `Bytes`");
                msg.add(arg);
                log.debug("send subscribe TO {}", this.context.getRemoteHost());
                this.context.getProtocol().sendToSubscribe(msg.toArray());
                this.context.getProtocol().flush();
            }
            catch (Exception e) {
                log.warn("Notify subscriber error, ignore, {}", e);
            }
        }

        public void unsubscribe() {
            this.pubSub.unsubscribe(this.channel, this);
            this.context.getSubscribe().removeSubscribe(this.channel);
            this.context.removeCloseListener(this.channelClose);
        }

        public void subscribe() {
            ChannelMessageListener.find(this.context, this.channel).ifPresent(ChannelMessageListener::unsubscribe);
            this.context.getSubscribe().addSubscribe(this.channel, this);
            this.pubSub.subscribe(this.channel, this);
        }

        public static Optional<ChannelMessageListener> find(ChannelContext context, Bytes bytes) {
            return context.getSubscribe().getSubscribe(bytes);
        }

        public ChannelMessageListener setPatten(Bytes patten) {
            this.patten = patten;
            return this;
        }

        public Optional<Bytes> getPatten() {
            return Optional.ofNullable(this.patten);
        }
    }

    public static class NewChannelListener
    implements Observer {
        private final ChannelContext context;
        private final Bytes bytes;
        private final Pattern patten;
        private final PubSub pubSub;
        private final ChannelContext.CloseListener channelClose;
        private final Map<Bytes, ChannelMessageListener> notifies = new HashMap<Bytes, ChannelMessageListener>();

        public NewChannelListener(ChannelContext context, Bytes patten, PubSub pubSub) {
            this.context = context;
            this.bytes = patten;
            this.patten = StringUtil.antPattern(SafeEncoder.encode(patten.getBytes()));
            this.pubSub = pubSub;
            this.channelClose = context.addCloseListener(this::close);
        }

        private void close(ChannelContext context) {
            try {
                this.unsubscribe();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        @Override
        public void update(Observable o, Object arg) {
            Bytes channel = (Bytes)arg;
            String encode = SafeEncoder.encode(channel.getBytes());
            if (this.patten.matcher(encode).matches()) {
                this.notifies.computeIfAbsent(channel, k -> {
                    ChannelMessageListener channelMessageListener = new ChannelMessageListener(this.context, (Bytes)arg, this.pubSub);
                    channelMessageListener.setPatten(this.bytes);
                    this.pubSub.subscribe(channel, channelMessageListener);
                    return channelMessageListener;
                });
            }
        }

        public void unsubscribe() {
            this.pubSub.newChannelNotify.unsubscribe(this);
            this.notifies.values().forEach(e -> this.pubSub.unsubscribe(((ChannelMessageListener)e).channel, (Observer)e));
            this.context.getPattenSubscribe().removeSubscribe(this.bytes);
            this.context.removeCloseListener(this.channelClose);
        }

        public void subscribe() {
            NewChannelListener.find(this.context, this.bytes).ifPresent(NewChannelListener::unsubscribe);
            this.pubSub.newChannelNotify.subscribe(this);
            this.context.getPattenSubscribe().addSubscribe(this.bytes, this);
        }

        public static Optional<NewChannelListener> find(ChannelContext context, Bytes bytes) {
            return context.getPattenSubscribe().getSubscribe(bytes);
        }

        public Bytes getPatten() {
            return this.bytes;
        }
    }
}

