001 /**
002 * Copyright (C) 2012 FuseSource, Inc.
003 * http://fusesource.com
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.fusesource.hawtdispatch;
019
020 import java.util.LinkedList;
021 import java.util.concurrent.Executor;
022
023 import static org.fusesource.hawtdispatch.Dispatch.*;
024
025 /**
026 * Sends runnable tasks to a DispatchQueue via a an EventAggregator
027 * so that they first batch up on the sender side before being
028 * sent to the DispatchQueue which then executes that tasks.
029 *
030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031 */
032 public class AggregatingExecutor implements Executor {
033
034 final DispatchQueue queue;
035 final CustomDispatchSource<Runnable, LinkedList<Runnable>> source;
036
037 public AggregatingExecutor(DispatchQueue queue) {
038 this.queue = queue;
039 this.source = createSource(EventAggregators.<Runnable>linkedList(), queue);
040 this.source.setEventHandler(new Task() {
041 public void run() {
042 for (Runnable runnable: source.getData() ) {
043 try {
044 runnable.run();
045 } catch (Exception e) {
046 Thread thread = Thread.currentThread();
047 thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
048 }
049 }
050 }
051 });
052 this.source.resume();
053 }
054
055
056 public void suspend() {
057 source.suspend();
058 }
059
060 public void resume() {
061 source.resume();
062 }
063
064 public void execute(Runnable task) {
065 if (getCurrentQueue() == null) {
066 queue.execute(new TaskWrapper(task));
067 } else {
068 source.merge(task);
069 }
070 }
071
072 }