001package io.jboot.support.seata.tcc;
002
003/**
004 * @author zhangxn
005 * @date 2022/5/30  21:32
006 */
007
008import com.jfinal.plugin.activerecord.Db;
009import com.jfinal.plugin.activerecord.DbPro;
010import com.jfinal.plugin.activerecord.IAtom;
011import io.jboot.db.datasource.DataSourceBuilder;
012import io.jboot.db.datasource.DataSourceConfig;
013import io.jboot.db.datasource.DataSourceConfigManager;
014import io.jboot.utils.StrUtil;
015import io.seata.common.exception.FrameworkErrorCode;
016import io.seata.common.thread.NamedThreadFactory;
017import io.seata.rm.tcc.TwoPhaseResult;
018import io.seata.rm.tcc.constant.TCCFenceConstant;
019import io.seata.rm.tcc.exception.TCCFenceException;
020import io.seata.rm.tcc.store.TCCFenceDO;
021import io.seata.rm.tcc.store.TCCFenceStore;
022import io.seata.rm.tcc.store.db.TCCFenceStoreDataBaseDAO;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026import javax.sql.DataSource;
027import java.lang.reflect.Method;
028import java.sql.Connection;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033
034/**
035 * TCC Fence Handler(idempotent, non_rollback, suspend)
036 *
037 * @author kaka2code
038 */
039public class TCCFenceHandler {
040
041    private TCCFenceHandler() {
042        throw new IllegalStateException("Utility class");
043    }
044
045    private static final Logger LOGGER = LoggerFactory.getLogger(io.seata.rm.tcc.TCCFenceHandler.class);
046
047    private static final TCCFenceStore TCC_FENCE_DAO = TCCFenceStoreDataBaseDAO.getInstance();
048
049    private static DataSource dataSource;
050
051
052    private static final int MAX_THREAD_CLEAN = 1;
053
054    private static final int MAX_QUEUE_SIZE = 500;
055
056    private static final LinkedBlockingQueue<FenceLogIdentity> LOG_QUEUE = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
057
058    private static FenceLogCleanRunnable fenceLogCleanRunnable;
059
060    private static ExecutorService logCleanExecutor;
061
062    static {
063        try {
064            initLogCleanExecutor();
065        } catch (Exception e) {
066            LOGGER.error("init fence log clean executor error", e);
067        }
068    }
069
070    /**
071     * tcc prepare method enhanced
072     *
073     * @param xid            the global transaction id
074     * @param branchId       the branch transaction id
075     * @param actionName     the action name
076     * @return the boolean
077     */
078    public static Object prepareFence(String xid, Long branchId, String actionName) {
079        DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig();
080        DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build();
081        IAtom runnable = () -> {
082            Connection connection = dataSource.getConnection();
083            boolean result = insertTCCFenceLog(connection, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
084            LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId);
085            if (!result) {
086                throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId),
087                        FrameworkErrorCode.InsertRecordError);
088            }
089            return result;
090        };
091        DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName());
092        return dbPro.tx(runnable);
093    }
094
095    /**
096     * tcc commit method enhanced
097     *
098     * @param commitMethod          commit method
099     * @param targetTCCBean         target tcc bean
100     * @param xid                   the global transaction id
101     * @param branchId              the branch transaction id
102     * @param args                  commit method's parameters
103     * @return the boolean
104     */
105    public static boolean commitFence(Method commitMethod, Object targetTCCBean,
106                                      String xid, Long branchId, Object[] args) {
107
108        DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig();
109        DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build();
110        IAtom runnable = () -> {
111            Connection connection = dataSource.getConnection();
112            TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(connection, xid, branchId);
113            if (tccFenceDO == null){
114                throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId),
115                        FrameworkErrorCode.InsertRecordError);
116            }
117            if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
118                LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
119                return true;
120            }
121            if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
122                if (LOGGER.isWarnEnabled()) {
123                    LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
124                }
125                return false;
126            }
127            try {
128                return updateStatusAndInvokeTargetMethod(connection, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, args);
129            } catch (Exception ex) {
130                throw new TCCFenceException(ex.getCause());
131            }
132        };
133        DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName());
134        return dbPro.tx(runnable);
135    }
136
137    /**
138     * tcc rollback method enhanced
139     *
140     * @param rollbackMethod        rollback method
141     * @param targetTCCBean         target tcc bean
142     * @param xid                   the global transaction id
143     * @param branchId              the branch transaction id
144     * @param args                  rollback method's parameters
145     * @param actionName            the action name
146     * @return the boolean
147     */
148    public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
149                                        String xid, Long branchId, Object[] args, String actionName) {
150        DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig();
151        DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build();
152        IAtom runnable = () -> {
153            try {
154                Connection connection = dataSource.getConnection();
155                TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(connection, xid, branchId);
156                if (tccFenceDO == null){
157                    boolean result = insertTCCFenceLog(connection, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED);
158                    LOGGER.info("Insert tcc fence record result: {}. xid: {}, branchId: {}", result, xid, branchId);
159                    if (!result) {
160                        throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId),
161                                FrameworkErrorCode.InsertRecordError);
162                    }
163                    return true;
164                } else {
165                    if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
166                        LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
167                        return true;
168                    }
169                    if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
170                        if (LOGGER.isWarnEnabled()) {
171                            LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
172                        }
173                        return false;
174                    }
175                }
176                return updateStatusAndInvokeTargetMethod(connection, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED,  args);
177            } catch (Throwable ex) {
178                throw new TCCFenceException(ex.getCause());
179            }
180        };
181        DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName());
182        return dbPro.tx(Connection.TRANSACTION_READ_UNCOMMITTED, runnable);
183    }
184
185    /**
186     * Insert TCC fence log
187     *
188     * @param conn     the db connection
189     * @param xid      the xid
190     * @param branchId the branchId
191     * @param status   the status
192     * @return the boolean
193     */
194    private static boolean insertTCCFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) {
195        TCCFenceDO tccFenceDO = new TCCFenceDO();
196        tccFenceDO.setXid(xid);
197        tccFenceDO.setBranchId(branchId);
198        tccFenceDO.setActionName(actionName);
199        tccFenceDO.setStatus(status);
200        return TCC_FENCE_DAO.insertTCCFenceDO(conn, tccFenceDO);
201    }
202
203    /**
204     * Update TCC Fence status and invoke target method
205     *
206     * @param method                target method
207     * @param targetTCCBean         target bean
208     * @param xid                   the global transaction id
209     * @param branchId              the branch transaction id
210     * @param status                the tcc fence status
211     * @return the boolean
212     */
213    private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean,
214                                                             String xid, Long branchId, int status, Object[] args) throws Exception {
215        boolean result = TCC_FENCE_DAO.updateTCCFenceDO(conn, xid, branchId, status, TCCFenceConstant.STATUS_TRIED);
216        if (result) {
217            // invoke two phase method
218            Object ret = method.invoke(targetTCCBean, args);
219            if (null != ret) {
220                if (ret instanceof TwoPhaseResult) {
221                    result = ((TwoPhaseResult) ret).isSuccess();
222                } else {
223                    result = (boolean) ret;
224                }
225            }
226        }
227        return result;
228    }
229
230    private static void initLogCleanExecutor() {
231        logCleanExecutor = new ThreadPoolExecutor(MAX_THREAD_CLEAN, MAX_THREAD_CLEAN, Integer.MAX_VALUE,
232                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
233                new NamedThreadFactory("fenceLogCleanThread", MAX_THREAD_CLEAN, true)
234        );
235        fenceLogCleanRunnable = new FenceLogCleanRunnable();
236        logCleanExecutor.submit(fenceLogCleanRunnable);
237    }
238
239    /**
240     * Delete TCC Fence
241     *
242     * @param xid      the global transaction id
243     * @param branchId the branch transaction id
244     * @return the boolean
245     */
246    public static boolean deleteFence(String xid, Long branchId) {
247
248        DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig();
249        DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build();
250        IAtom runnable = () -> {
251            try {
252                Connection connection = dataSource.getConnection();
253                boolean ret = TCC_FENCE_DAO.deleteTCCFenceDO(connection, xid, branchId);
254                return ret;
255            } catch (Throwable ex) {
256                return false;
257            }
258        };
259        DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName());
260        return dbPro.tx(Connection.TRANSACTION_READ_UNCOMMITTED, runnable);
261    }
262
263    private static void addToLogCleanQueue(final String xid, final long branchId) {
264        FenceLogIdentity logIdentity = new FenceLogIdentity();
265        logIdentity.setXid(xid);
266        logIdentity.setBranchId(branchId);
267        try {
268            LOG_QUEUE.add(logIdentity);
269        } catch (Exception e) {
270            LOGGER.warn("Insert tcc fence record into queue for async delete error,xid:{},branchId:{}", xid, branchId, e);
271        }
272    }
273
274    /**
275     * clean fence log that has the final status runnable.
276     *
277     * @see TCCFenceConstant
278     */
279    private static class FenceLogCleanRunnable implements Runnable {
280        @Override
281        public void run() {
282            while (true) {
283                try {
284                   FenceLogIdentity logIdentity = LOG_QUEUE.take();
285                    boolean ret = deleteFence(logIdentity.getXid(), logIdentity.getBranchId());
286                    if (!ret) {
287                        LOGGER.error("delete fence log failed, xid: {}, branchId: {}", logIdentity.getXid(), logIdentity.getBranchId());
288                    }
289                } catch (InterruptedException e) {
290                    LOGGER.error("take fence log from queue for clean be interrupted", e);
291                } catch (Exception e) {
292                    LOGGER.error("exception occur when clean fence log", e);
293                }
294            }
295        }
296    }
297
298    private static class FenceLogIdentity {
299        /**
300         * the global transaction id
301         */
302        private String xid;
303
304        /**
305         * the branch transaction id
306         */
307        private Long branchId;
308
309        public String getXid() {
310            return xid;
311        }
312
313        public Long getBranchId() {
314            return branchId;
315        }
316
317        public void setXid(String xid) {
318            this.xid = xid;
319        }
320
321        public void setBranchId(Long branchId) {
322            this.branchId = branchId;
323        }
324    }
325}