/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.iot.api.message.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.iot.api.Profile;
import com.aliyun.openservices.iot.api.exception.IotClientException;
import com.aliyun.openservices.iot.api.http2.IotHttp2Client;
import com.aliyun.openservices.iot.api.http2.callback.AbstractHttp2StreamDataReceiver;
import com.aliyun.openservices.iot.api.http2.callback.Http2StreamListener;
import com.aliyun.openservices.iot.api.http2.connection.Connection;
import com.aliyun.openservices.iot.api.http2.connection.ConnectionListener;
import com.aliyun.openservices.iot.api.http2.connection.ConnectionStatus;
import com.aliyun.openservices.iot.api.http2.entity.Http2Response;
import com.aliyun.openservices.iot.api.http2.entity.StreamData;
import com.aliyun.openservices.iot.api.message.api.MessageClient;
import com.aliyun.openservices.iot.api.message.callback.ConnectionCallback;
import com.aliyun.openservices.iot.api.message.callback.MessageCallback;
import com.aliyun.openservices.iot.api.message.entity.Message;
import com.aliyun.openservices.iot.api.message.entity.MessageToken;
import com.aliyun.openservices.iot.api.message.entity.SubscribeInfo;
import com.aliyun.openservices.iot.api.util.StringUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.handler.codec.Headers;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageClientImpl
extends AbstractHttp2StreamDataReceiver
implements ConnectionListener,
MessageClient {
    private static final Logger log = LoggerFactory.getLogger(MessageClientImpl.class);
    private static final String MESSAGE_ID = "x-message-id";
    private static final String PATH_MESSAGE_ACK = "/message/ack";
    private static final String PATH_MESSAGE_PUB = "/message/pub";
    private static final String PATH_MESSAGE_SUB = "/message/sub";
    private static final String PATH_MESSAGE_UNSUB = "/message/unsub";
    private static final String PATH_CONNECT = "/message/echo/success";
    private static final String SLASH = "/";
    private static final String PLUS_SIGN = "+";
    private static final String CROSSHATCH = "#";
    private static final String IOT_ID = "x-iot-id";
    private static final String TOPIC = "x-topic";
    private static final String QOS = "x-qos";
    private static final String GENERATE_TIME = "x-generate-time";
    private static final String AS_STATUS_PREFIX = "/as/mqtt/status/";
    private static String HEADER_X_SDK_VERSION = "x-sdk-version";
    private static String HEADER_X_SDK_VERSION_NAME = "x-sdk-version-name";
    private static String HEADER_X_SDK_PLATFORM = "x-sdk-platform";
    private static String X_SDK_VERSION = "1.1.4";
    private static String X_SDK_VERSION_NAME = "v1.1.4";
    private static String X_SDK_PLATFORM = "java";
    private final Profile profile;
    private MessageCallback messageCallback;
    private ExecutorService messageCallbackExecutorService;
    private ScheduledExecutorService publishRetryExecutorService;
    private IotHttp2Client client;
    private List<SubscribeInfo> subscriptionInfo;
    private AtomicBoolean started;
    private AtomicBoolean isConnected;
    private AtomicLong localMessageId;
    private ConnectionCallback connectionCallback;

    public MessageClientImpl(Profile profile) {
        this.profile = profile;
        this.subscriptionInfo = new CopyOnWriteArrayList<SubscribeInfo>();
        this.started = new AtomicBoolean(false);
        this.isConnected = new AtomicBoolean(false);
        this.localMessageId = new AtomicLong(0L);
    }

    private CompletableFuture<Http2Response> sendAuth(Connection connection) {
        Http2Headers headers = this.client.authHeader();
        headers.path((CharSequence)PATH_CONNECT);
        headers.add((Object)"x-clear-session", (Object)(this.profile.isCleanSession() ? "1" : "0"));
        headers.add((Headers)this.getDefaultHeaders());
        return this.client.sendRequest(connection, headers, null);
    }

    private void doConnect() {
        try {
            Connection connection = this.client.newConnection();
            connection.setDefaultStreamListener((Http2StreamListener)this);
            CompletableFuture<Http2Response> completableFuture = this.sendAuth(connection);
            Http2Response r = completableFuture.get(15L, TimeUnit.SECONDS);
            if (r.getStatus() != HttpResponseStatus.OK) {
                throw new IotClientException("connect to server failed", r);
            }
            connection.setStatus(ConnectionStatus.AUTHORIZED);
            this.client.addConnectionListener((ConnectionListener)this);
            this.isConnected.set(true);
            Optional.ofNullable(this.connectionCallback).ifPresent(callback -> this.messageCallbackExecutorService.execute(() -> callback.onConnected(false)));
        }
        catch (ExecutionException e) {
            throw new IotClientException(e.getCause());
        }
        catch (InterruptedException | TimeoutException e) {
            throw new IotClientException((Throwable)e);
        }
    }

    private Message convertStreamData2Message(Http2Response response) throws IOException {
        if (!HttpResponseStatus.OK.equals((Object)response.getStatus())) {
            throw new IOException("status is not success, code: " + response.getStatus() + ", content: " + new String(response.getContent()));
        }
        byte[] payload = response.getContent();
        String messageId = ((CharSequence)response.getHeaders().get((Object)MESSAGE_ID)).toString();
        String topic = null;
        if (response.getHeaders().contains((Object)TOPIC)) {
            topic = ((CharSequence)response.getHeaders().get((Object)TOPIC)).toString();
        }
        int qos = 0;
        if (response.getHeaders().contains((Object)QOS)) {
            qos = response.getHeaders().getInt((Object)QOS);
        }
        long generateTime = 0L;
        if (response.getHeaders().contains((Object)GENERATE_TIME)) {
            generateTime = response.getHeaders().getLong((Object)GENERATE_TIME);
        }
        if (this.isStatusCallback(topic)) {
            String[] splits = topic.split(SLASH);
            JSONObject jsonPayload = JSON.parseObject((String)new String(payload));
            jsonPayload.remove((Object)"meta");
            if (splits.length > 5) {
                jsonPayload.put("productKey", (Object)splits[4]);
                jsonPayload.put("deviceName", (Object)splits[5]);
            }
            payload = jsonPayload.toJSONString().getBytes();
        }
        return new Message(payload, topic, messageId, qos, generateTime);
    }

    private boolean isStatusCallback(String topic) {
        return StringUtil.isNotEmpty((CharSequence)topic) && topic.startsWith(AS_STATUS_PREFIX);
    }

    private boolean needAck(Message m) {
        return m.getQos() == 1 || m.getQos() == 2;
    }

    public void onStreamError(Connection connection, Http2Stream stream, IOException e) {
        log.error("message receive error, {}", (Object)e.getMessage());
    }

    public void onStreamDataReceived(Connection connection, Http2Stream stream, StreamData streamData) {
        try {
            Http2Response response = new Http2Response(streamData.getHeaders(), streamData.readAllData());
            Message m = this.convertStreamData2Message(response);
            MessageToken token = new MessageToken(m, connection, this.client);
            log.info("receive msg, messageId:{}, data size: {}", (Object)m.getMessageId(), (Object)m.getPayload().length);
            CompletableFuture<MessageCallback.Action> cf = CompletableFuture.supplyAsync(() -> {
                try {
                    Optional<MessageCallback> op = this.subscriptionInfo.stream().filter(info -> this.isMatch(m.getTopic(), info.getTopic())).findFirst().map(SubscribeInfo::getCallback);
                    MessageCallback callback = op.orElse(this.messageCallback);
                    if (callback != null) {
                        return callback.consume(token);
                    }
                    log.warn("no message callback for " + m.getTopic());
                    return MessageCallback.Action.CommitFailure;
                }
                catch (Throwable t) {
                    log.error("message consume error, messageId:{}, {}", (Object)m.getMessageId(), (Object)t);
                    return MessageCallback.Action.CommitFailure;
                }
            }, this.messageCallbackExecutorService);
            if (this.needAck(m)) {
                cf.whenComplete((action, throwable) -> {
                    if (throwable != null) {
                        log.error("consume message {}, occurs error: ", (Object)m, (Object)throwable.getMessage());
                    }
                    log.info("consume message: {} , result: {}", (Object)m, (Object)(action == null ? "null" : action.name()));
                    if (action == MessageCallback.Action.CommitSuccess) {
                        this.ack(token);
                    }
                });
            }
        }
        catch (IOException e) {
            log.error("message receive error,{}, {}", (Object)e.getMessage(), (Object)streamData);
        }
    }

    public void onSettingReceive(Connection connection, Http2Settings settings) {
    }

    public void onStatusChange(ConnectionStatus status, Connection connection) {
        if (status == ConnectionStatus.CREATED) {
            connection.setDefaultStreamListener((Http2StreamListener)this);
            this.sendAuth(connection).whenComplete((http2Response, t) -> {
                if (t != null) {
                    log.error("failed to auth connection {}, {}", (Object)connection, (Object)t.getMessage());
                    connection.close();
                    return;
                }
                if (http2Response.getStatus() != HttpResponseStatus.OK) {
                    log.error("failed to auth connection {}, code: {}, content: {}, request id: {}", new Object[]{connection, http2Response.getStatus().code(), new String(http2Response.getContent()), http2Response.getRequestId()});
                    connection.close();
                    return;
                }
                connection.setStatus(ConnectionStatus.AUTHORIZED);
            });
        }
        boolean hasAuthorizedConnection = this.client.allConnections().stream().anyMatch(Connection::isAuthorized);
        boolean isConnectedBefore = this.isConnected.get();
        if (!hasAuthorizedConnection) {
            this.isConnected.set(false);
        }
        Optional.ofNullable(this.connectionCallback).ifPresent(callback -> {
            if (status == ConnectionStatus.AUTHORIZED) {
                callback.onConnected(true);
                return;
            }
            if (!hasAuthorizedConnection && isConnectedBefore) {
                callback.onConnectionLost();
            }
        });
    }

    private boolean isMatch(String topicFullName, String topicFilter) {
        String[] filterItems;
        String[] nameItems = topicFullName.split(SLASH);
        if (nameItems.length < (filterItems = topicFilter.split(SLASH)).length) {
            return false;
        }
        if (nameItems.length != filterItems.length && !CROSSHATCH.equals(filterItems[filterItems.length - 1])) {
            return false;
        }
        for (int i = 0; i < filterItems.length; ++i) {
            if (CROSSHATCH.equals(filterItems[i]) || PLUS_SIGN.equals(filterItems[i]) || nameItems[i].equals(filterItems[i])) continue;
            return false;
        }
        return true;
    }

    @Override
    public void connect(MessageCallback messageCallback) {
        if (!this.started.compareAndSet(false, true)) {
            throw new IotClientException("client is already connected!");
        }
        this.setMessageCallback(messageCallback);
        this.messageCallbackExecutorService = new ThreadPoolExecutor(this.profile.getCallbackThreadCorePoolSize(), this.profile.getCallbackThreadMaximumPoolSize(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(this.profile.getCallbackThreadBlockingQueueSize()), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iot-message-client-receiver-%d").build());
        this.publishRetryExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("iot-message-client-schedule-thread").build());
        this.client = new IotHttp2Client(this.profile, this.profile.isMultiConnection() ? -1 : 1);
        this.doConnect();
    }

    @Override
    public void disconnect() {
        this.checkStarted();
        if (this.started.compareAndSet(true, false)) {
            this.client.removeConnectionListener((ConnectionListener)this);
            this.client.shutdown();
            this.messageCallbackExecutorService.shutdown();
            this.publishRetryExecutorService.shutdown();
            this.client = null;
            this.messageCallbackExecutorService = null;
            this.subscriptionInfo.clear();
            this.started.set(false);
            this.isConnected.set(false);
        }
    }

    @Override
    public void setMessageListener(MessageCallback messageCallback) {
        this.messageCallback = messageCallback;
    }

    @Override
    public void setMessageListener(String topic, MessageCallback messageCallback) {
        if (StringUtil.isEmpty((CharSequence)topic)) {
            throw new IllegalArgumentException("topic can't be null");
        }
        Optional<SubscribeInfo> optional = this.subscriptionInfo.stream().filter(info -> topic.equals(info.getTopic())).findAny();
        if (optional.isPresent()) {
            if (messageCallback == null) {
                this.subscriptionInfo.remove(optional.get());
            } else {
                optional.get().setCallback(messageCallback);
            }
        } else {
            this.subscriptionInfo.add(new SubscribeInfo(topic, messageCallback));
        }
    }

    @Override
    public CompletableFuture<Boolean> subscribe(String topic) {
        this.checkStarted();
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        headers.path((CharSequence)(PATH_MESSAGE_SUB + topic));
        headers.add((Headers)this.getDefaultHeaders());
        return this.sendWithResult((Http2Headers)headers, null, "failed to subscribe " + topic);
    }

    private CompletableFuture<Boolean> sendWithResult(Http2Headers headers, byte[] data, String errorMessage) {
        Connection connection = this.getConnection();
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        try {
            this.client.sendRequest(connection, headers, data).whenComplete((response, throwable) -> {
                if (throwable != null) {
                    log.error(errorMessage, throwable);
                    future.completeExceptionally((Throwable)throwable);
                    return;
                }
                if (!HttpResponseStatus.OK.equals((Object)response.getStatus())) {
                    IotClientException exception = new IotClientException(errorMessage, response);
                    log.error(exception.getMessage());
                    future.completeExceptionally((Throwable)exception);
                } else {
                    future.complete(true);
                }
            });
        }
        catch (Exception e) {
            future.completeExceptionally((Throwable)new IotClientException(errorMessage, (Throwable)e));
        }
        return future;
    }

    @Override
    public CompletableFuture<Boolean> subscribe(String topic, MessageCallback messageCallback) {
        this.checkStarted();
        this.setMessageListener(topic, messageCallback);
        return this.subscribe(topic);
    }

    @Override
    public CompletableFuture<Boolean> unsubscribe(String topic) {
        this.checkStarted();
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        headers.path((CharSequence)(PATH_MESSAGE_UNSUB + topic));
        headers.add((Headers)this.getDefaultHeaders());
        return this.sendWithResult((Http2Headers)headers, null, "failed to unsubscribe " + topic);
    }

    @Override
    public MessageToken publish(String topic, Message message) {
        this.checkStarted();
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        headers.path((CharSequence)(PATH_MESSAGE_PUB + topic));
        headers.add((Object)QOS, (Object)String.valueOf(message.getQos()));
        headers.add((Headers)this.getDefaultHeaders());
        MessageToken messageToken = new MessageToken(message, null, this.client);
        messageToken.setLocalMessageId(this.localMessageId.getAndIncrement());
        this.doPublish(messageToken, (Http2Headers)headers);
        return messageToken;
    }

    @Override
    public CompletableFuture<Boolean> ack(MessageToken messageToken) {
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        String messageId = messageToken.getMessage().getMessageId();
        headers.set((Object)MESSAGE_ID, (Object)messageId);
        headers.path((CharSequence)PATH_MESSAGE_ACK);
        headers.add((Headers)this.getDefaultHeaders());
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.client.sendRequest(messageToken.getConnection(), (Http2Headers)headers, null).whenComplete((r, t) -> {
            if (t != null) {
                log.error("ack failed, messageId {}, error {}", (Object)messageId, (Object)t.getMessage());
                result.completeExceptionally((Throwable)t);
                return;
            }
            if (HttpResponseStatus.OK.equals((Object)r.getStatus())) {
                log.debug("ack success, messageId {}", (Object)messageId);
                result.complete(true);
            } else {
                result.complete(false);
                log.error("ack failed, {}, {}", (Object)r.getHeaders(), (Object)r.getContent());
            }
        });
        return result;
    }

    @Override
    public boolean isConnected() {
        return this.isConnected.get();
    }

    @Override
    public void setConnectionCallback(ConnectionCallback connectionCallback) {
        this.connectionCallback = connectionCallback;
    }

    private void doPublish(MessageToken messageToken, Http2Headers headers) {
        log.info("publish message {}", (Object)messageToken.getLocalMessageId());
        try {
            this.client.sendRequest(this.getConnection(), headers, messageToken.getMessage().getPayload()).whenComplete((response, throwable) -> {
                IotClientException exception = null;
                if (throwable != null) {
                    exception = new IotClientException("failed to publish, message id: {}", throwable);
                } else if (response == null) {
                    exception = new IotClientException("failed to publish, response is null");
                } else if (!HttpResponseStatus.OK.equals((Object)response.getStatus())) {
                    exception = new IotClientException("failed to publish", response);
                }
                if (exception == null) {
                    try {
                        Message m = this.convertStreamData2Message((Http2Response)response);
                        messageToken.getPublishFuture().complete(m);
                        return;
                    }
                    catch (Exception e) {
                        log.error("failed to receive response, error: {}", (Object)e.getMessage(), (Object)e);
                    }
                }
                this.retryPublish(messageToken, headers, (Exception)exception, false);
            });
        }
        catch (Exception e) {
            this.retryPublish(messageToken, headers, (Exception)new IotClientException("failed to publish message, error msg:" + e.getMessage()), true);
        }
    }

    private void retryPublish(MessageToken messageToken, Http2Headers headers, Exception exception, boolean isLocalError) {
        log.error("failed to publish message {}, error: {}", (Object)messageToken.getLocalMessageId(), (Object)exception.getMessage());
        if (messageToken.shouldStop(isLocalError)) {
            log.info("give up publishing, message id: {}", (Object)messageToken.getLocalMessageId());
            messageToken.getPublishFuture().completeExceptionally(exception);
        } else {
            messageToken.increaseAttemptCount();
            this.publishRetryExecutorService.schedule(() -> this.doPublish(messageToken, headers), messageToken.computeSleepTime(), TimeUnit.MILLISECONDS);
            log.info("message {} will be delivery after {} ms", (Object)messageToken.getLocalMessageId(), (Object)messageToken.computeSleepTime());
        }
    }

    private Connection getConnection() {
        return (Connection)this.client.randomConnection(Connection::isAuthorized).orElseThrow(() -> new IotClientException("fail to publish, no connection exists"));
    }

    private void checkStarted() {
        if (!this.started.get()) {
            throw new IotClientException("client is not connected, please connect first");
        }
    }

    private DefaultHttp2Headers getDefaultHeaders() {
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        headers.add((Object)HEADER_X_SDK_VERSION, (Object)X_SDK_VERSION);
        headers.add((Object)HEADER_X_SDK_VERSION_NAME, (Object)X_SDK_VERSION_NAME);
        headers.add((Object)HEADER_X_SDK_PLATFORM, (Object)X_SDK_PLATFORM);
        return headers;
    }

    public MessageCallback getMessageCallback() {
        return this.messageCallback;
    }

    public void setMessageCallback(MessageCallback messageCallback) {
        this.messageCallback = messageCallback;
    }
}

