package com.zrzt.mqtt.util;

import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
import com.zrzt.mqtt.ConnMQServer;
import com.zrzt.mqtt.consumer.AbstractConsumer;
import com.zrzt.mqtt.pojo.EXchangeBO;
import com.zrzt.mqtt.pojo.MessageBO;
import com.zrzt.mqtt.pojo.QueueBO;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zrzt/mqtt/util/MQServerUtil.class */
public class MQServerUtil {
    private static final Logger LOG = LoggerFactory.getLogger(MQServerUtil.class);
    private AutorecoveringChannel channel;
    private String virtual;

    public MQServerUtil(String str) {
        this.virtual = str;
        this.channel = ConnMQServer.getChannel(str);
        this.channel.addRecoveryListener(new RecoveryListener() { // from class: com.zrzt.mqtt.util.MQServerUtil.1
            public void handleRecovery(Recoverable recoverable) {
                MQServerUtil.LOG.warn("恢复配置成功！");
            }

            public void handleRecoveryStarted(Recoverable recoverable) {
                MQServerUtil.LOG.warn("恢复与MQ服务器连接完成！");
            }
        });
    }

    public String getVirtual() {
        return this.virtual;
    }

    public void cleanQueue(String str) throws IOException {
        this.channel.queuePurge(str);
    }

    public void deleteEXchange(String str) throws IOException {
        this.channel.exchangeDelete(str);
    }

    public void deleteQueue(String str) throws IOException {
        this.channel.queueDelete(str);
    }

    public void ack(long j, boolean z) {
        try {
            this.channel.basicAck(j, z);
        } catch (IOException e) {
            LOG.error("确认消息>>>>连接异常", e);
        }
    }

    public void nack(long j, boolean z, boolean z2) {
        try {
            this.channel.basicNack(j, z, z2);
        } catch (IOException e) {
            LOG.error("确认消息>>>>连接异常", e);
        }
    }

    public void declareEXchange(EXchangeBO eXchangeBO) {
        try {
            this.channel.exchangeDeclare(eXchangeBO.getExchangeName(), eXchangeBO.getExchangeType().getType(), eXchangeBO.getDurable(), eXchangeBO.getAutoDelete(), eXchangeBO.getInternal(), eXchangeBO.getArguments());
        } catch (IOException e) {
            LOG.error("声明路由>>>>连接异常", e);
        }
    }

    public void declareQueue(QueueBO queueBO) {
        try {
            this.channel.queueDeclare(queueBO.getQueueName(), queueBO.getDurable(), queueBO.getExclusive(), queueBO.getAutoDelete(), queueBO.getArguments());
        } catch (IOException e) {
            LOG.error("声明队列>>>>连接异常", e);
        }
    }

    public void bindQueue(QueueBO queueBO, EXchangeBO eXchangeBO, String str) {
        try {
            this.channel.queueBind(queueBO.getQueueName(), eXchangeBO.getExchangeName(), str);
        } catch (IOException e) {
            LOG.error("绑定队列到路由>>>>连接异常", e);
        }
    }

    public void publishMsg(EXchangeBO eXchangeBO, MessageBO messageBO, String str, String... strArr) {
        declareEXchange(eXchangeBO);
        publishMsg(eXchangeBO.getExchangeName(), messageBO, str, strArr);
    }

    public void publishMsg(String str, MessageBO messageBO, String str2, String... strArr) {
        try {
            this.channel.basicPublish(str, str2, messageBO.getProps(), messageBO.getBody());
            for (String str3 : strArr) {
                this.channel.basicPublish(str, str3, messageBO.getProps(), messageBO.getBody());
            }
        } catch (IOException e) {
            LOG.error("发布消息出错>>>>连接异常", e);
        }
    }

    public void subscribeMsg(EXchangeBO eXchangeBO, QueueBO queueBO, AbstractConsumer abstractConsumer, String... strArr) {
        try {
            declareEXchange(eXchangeBO);
            declareQueue(queueBO);
            for (String str : strArr) {
                bindQueue(queueBO, eXchangeBO, str);
            }
            this.channel.basicConsume(queueBO.getQueueName(), abstractConsumer);
        } catch (IOException e) {
            LOG.error("订阅消息出错>>>>连接异常", e);
        }
    }

    public void basicQos(int i) {
        try {
            this.channel.basicQos(i);
        } catch (IOException e) {
            LOG.error("通道待确认消息数量设置>>>>连接异常", e);
        }
    }

    public void confirmSelect() {
        try {
            this.channel.confirmSelect();
        } catch (IOException e) {
            LOG.error("开启发送方消息确认模式>>>>连接异常", e);
        }
    }

    public void addConfirmListener(ConfirmListener confirmListener) {
        confirmSelect();
        this.channel.addConfirmListener(confirmListener);
    }

    public void addConfirm() {
        this.channel = ConnMQServer.getChannel(this.virtual);
    }

    public AutorecoveringChannel getChannel() {
        return this.channel;
    }

    public void closeChannel() {
        if (this.channel == null || !this.channel.isOpen()) {
            return;
        }
        try {
            this.channel.close();
            this.channel = null;
        } catch (Exception e) {
            LOG.error("关闭通道>>>>连接异常", e);
        }
    }
}
