home ホーム search 検索 -  login ログイン  | reload edit datainfo version cmd icon diff delete  | help ヘルプ

日記/2014/04/07/Javaのconcurrentパッケージ勉強中・・・ (v1)

日記/2014/04/07/Javaのconcurrentパッケージ勉強中・・・ (v1)

日記 / 2014 / 04 / 07 / Javaのconcurrentパッケージ勉強中・・・ (v1)
id: 1272 所有者: msakamoto-sf    作成日: 2014-04-07 00:05:28
カテゴリ: Java 

日付変わってしまったけど。勉強中の練習プログラム。TestNG使ったテストケースで。

一応、ドメイン限定で、非同期ジョブ制御用のAPIに挑戦しようと思ってるので、使ってないメソッドとかも想像上のものとして残してたり。

HogeTest.java:

package hoge;
 
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.testng.annotations.Test;
 
/**
 *
 * @author FengJing
 */
public class HogeTest {
 
    public HogeTest() {
    }
 
    class JobMailBox<T> extends LinkedBlockingQueue<T> {
 
        protected boolean isShutdown = false;
 
        public void shutdown() {
            this.isShutdown = true;
        }
 
        @Override
        public boolean offer(T e) {
            if (this.isShutdown) {
                return false;
            }
            return super.offer(e);
        }
 
        @Override
        public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
            if (this.isShutdown) {
                return false;
            }
            return super.offer(e, timeout, unit);
        }
 
        @Override
        public void put(T e) throws InterruptedException {
            if (this.isShutdown) {
                return;
            }
            super.put(e);
        }
 
        @Override
        public T poll(long timeout, TimeUnit unit) throws InterruptedException {
            return super.poll(timeout, unit);
        }
 
        @Override
        public T take() throws InterruptedException {
            return super.take();
        }
 
    }
 
    enum JobEvent {
 
        STARTED, RUNNING, CANCELLED, TERMINATED;
    }
 
    class JobSignal {
 
        public final JobEvent event;
        public final String key;
        public final String message;
 
        public JobSignal(JobEvent event, String key, String message) {
            this.event = event;
            this.key = key;
            this.message = message;
        }
    }
 
    interface JobHandle<R, M> {
 
        public R main(BlockingQueue<M> mailbox) throws Exception;
    }
 
    class JobButler {
 
        protected ExecutorService executorService;
        protected Map<String, Future> jobs = new ConcurrentHashMap<String, Future>();
        protected Map<String, BlockingQueue> mailboxes = new ConcurrentHashMap<String, BlockingQueue>();
        protected final JobMailBox<JobSignal> butlerMailBox = new JobMailBox<>();
 
        public JobButler(ExecutorService executorService) {
            this.executorService = executorService;
        }
 
        public JobMailBox<JobSignal> getButlersMailBox() {
            return this.butlerMailBox;
        }
 
        public <R, M> Future<R> submitNewJob(final String key, final JobHandle<R, M> job) throws InterruptedException {
            final BlockingQueue<M> mailbox = new ArrayBlockingQueue<M>(100);
            Callable<R> c = new Callable<R>() {
                public R call() throws Exception {
                    butlerMailBox.put(new JobSignal(JobEvent.RUNNING, key, "running"));
                    R result = job.main(mailbox);
                    butlerMailBox.put(new JobSignal(JobEvent.TERMINATED, key, "terminated"));
                    return result;
                }
            };
            butlerMailBox.put(new JobSignal(JobEvent.STARTED, key, "started"));
            Future<R> f = this.executorService.submit(c);
            this.jobs.put(key, f);
            this.mailboxes.put(key, mailbox);
            return f;
        }
 
        public Future getFuture(String key) {
            return this.jobs.get(key);
        }
 
        public Future terminateJob(String key) {
            Future f = this.jobs.get(key);
            if (f.isCancelled() || f.isDone()) {
                return f;
            }
            f.cancel(true);
            return f;
        }
 
        public void shutdown() {
            this.executorService.shutdown();
        }
    }
 
    class HogeJob implements JobHandle<String, String> {
 
        private final String name;
 
        public HogeJob(String name) {
            this.name = name;
        }
 
        public String main(BlockingQueue<String> mailbox) throws Exception {
            StringBuilder sb = new StringBuilder(1024);
            sb.append(this.name);
            sb.append(":");
            for (int i = 0; i < 10; i++) {
                sb.append(i);
            }
            return sb.toString();
        }
    }
 
    class CountJob implements JobHandle<Integer, String> {
 
        private int sum = 0;
 
        public Integer main(BlockingQueue<String> mailbox) throws Exception {
            for (int i = 0; i < 10; i++) {
                this.sum += i;
            }
            return new Integer(this.sum);
        }
 
    }
 
    @Test
    public void hoge2() throws InterruptedException, ExecutionException {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        Future<String> f1 = jb.submitNewJob("job1", new HogeJob("jon"));
        Future<String> f2 = jb.submitNewJob("job2", new HogeJob("bob"));
        Future<Integer> f3 = jb.submitNewJob("job3", new CountJob());
 
        assertEquals(f1.get(), "jon:0123456789");
        assertEquals(f2.get(), "bob:0123456789");
        assertEquals(f3.get(), new Integer(45));
 
        jb.shutdown();
    }
 
    class SlowCountJob implements JobHandle<Integer, String> {
 
        private int sum = 0;
        private final String name;
 
        public SlowCountJob(String name) {
            this.name = name;
        }
 
        @Override
        public Integer main(BlockingQueue<String> mailbox) throws Exception {
            int i = 0;
            while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) {
                this.sum += i;
                i++;
                System.out.println(this.name + ":" + this.sum);
            }
            return new Integer(this.sum);
        }
 
    }
 
    @Test(expectedExceptions = CancellationException.class)
    public void hoge3() throws InterruptedException, ExecutionException, TimeoutException {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        Future<Integer> f1 = jb.submitNewJob("job1", new SlowCountJob("scj-1"));
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignore) {
        }
        // cancel flag was set, but not interrupted ... orphan thread!!!
        f1.cancel(false);
        assertTrue(f1.isCancelled());
        assertTrue(f1.isDone());
        try {
            f1.get(1, TimeUnit.SECONDS);
        } finally {
            jb.shutdown();
        }
    }
 
    @Test(expectedExceptions = CancellationException.class)
    public void hoge4() throws InterruptedException, ExecutionException, TimeoutException {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        Future<Integer> f1 = jb.submitNewJob("job1", new SlowCountJob("scj-2"));
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignore) {
        }
        // cancel flag was set, and, inerrupted -> thread terminates.
        f1.cancel(true);
        assertTrue(f1.isCancelled());
        assertTrue(f1.isDone());
        try {
            f1.get(1, TimeUnit.SECONDS);
        } finally {
            jb.shutdown();
        }
    }
 
    class AbnormalEndCountJob implements JobHandle<Integer, String> {
 
        private int sum = 0;
        private final String name;
        private final int stopper;
 
        public AbnormalEndCountJob(String name, int stopper) {
            this.name = name;
            this.stopper = stopper;
        }
 
        @Override
        public Integer main(BlockingQueue<String> mailbox) throws Exception {
            int i = 0;
            while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) {
                this.sum += i;
                i++;
                System.out.println(this.name + ":" + this.sum);
                if (i > stopper) {
                    throw new IllegalStateException("stopper threashold over.");
                }
            }
            return new Integer(this.sum);
        }
 
    }
 
    @Test(expectedExceptions = IllegalStateException.class)
    public void hoge5() throws InterruptedException, ExecutionException, TimeoutException, Throwable {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        Future<Integer> f1 = jb.submitNewJob("job1", new AbnormalEndCountJob("abend-job-1", 3));
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignore) {
        }
        // job is already done (stopper = 3 means "After 3 seconds, throw IllegalStateException")
        // so, cancel() does not effect any more. isCancelled() returns false.
        f1.cancel(true);
        assertFalse(f1.isCancelled());
        assertTrue(f1.isDone());
        try {
            f1.get(1, TimeUnit.SECONDS);
        } catch (ExecutionException expected) {
            Throwable t = expected.getCause();
            assertEquals(t.getMessage(), "stopper threashold over.");
            throw t;
        } finally {
            jb.shutdown();
        }
    }
 
    @Test
    public void hoge() throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newCachedThreadPool();
        Callable<String> c = new Callable<String>() {
            public String call() throws Exception {
                StringBuilder sb = new StringBuilder(1024);
                for (int i = 0; i < 10; i++) {
//                    Thread.sleep(1000);
                    sb.append(i);
                }
                return sb.toString();
            }
        };
        Future<String> f = es.submit(c);
        assertEquals(f.get(), "0123456789");
    }
}

BoheTest.java:

package hoge;
 
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import org.testng.annotations.Test;
 
/**
 *
 * @author FengJing
 */
public class BoheTest {
 
    public BoheTest() {
    }
 
    class JobMailBox<T> extends LinkedBlockingQueue<T> {
 
        protected boolean isShutdown = false;
 
        public void shutdown() {
            this.isShutdown = true;
        }
 
        @Override
        public boolean offer(T e) {
            if (this.isShutdown) {
                return false;
            }
            return super.offer(e);
        }
 
        @Override
        public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
            if (this.isShutdown) {
                return false;
            }
            return super.offer(e, timeout, unit);
        }
 
        @Override
        public void put(T e) throws InterruptedException {
            if (this.isShutdown) {
                return;
            }
            super.put(e);
        }
 
        @Override
        public T poll(long timeout, TimeUnit unit) throws InterruptedException {
            return super.poll(timeout, unit);
        }
 
        @Override
        public T take() throws InterruptedException {
            return super.take();
        }
 
    }
 
    interface JobHandle<R, M> {
 
        public R main(BlockingQueue<M> mailbox) throws Exception;
    }
 
    class JobTask<V> extends FutureTask<V> {
 
        protected final String key;
 
        protected final JobButler butler;
 
        public JobTask(JobButler butler, String key, Callable<V> callable) {
            super(callable);
            this.key = key;
            this.butler = butler;
        }
 
        public JobTask(JobButler butler, String key, Runnable runnable, V result) {
            super(runnable, result);
            this.key = key;
            this.butler = butler;
        }
 
        @Override
        protected void done() {
            butler.notifyJobTermination(key);
        }
    }
 
    class JobGateway<R, M> {
 
        final Future<R> future;
        final JobMailBox<M> mailbox;
 
        public JobGateway(Future<R> f, JobMailBox<M> mb) {
            this.future = f;
            this.mailbox = mb;
        }
    }
 
    class JobButler {
 
        protected ExecutorService executorService;
        protected Map<String, JobGateway> jobs = new ConcurrentHashMap<>();
 
        public JobButler(ExecutorService executorService) {
            this.executorService = executorService;
        }
 
        public synchronized <R, M> JobGateway<R, M> submitNewJob(final String key, final JobHandle<R, M> job) throws InterruptedException {
            final JobMailBox<M> mailbox = new JobMailBox<>();
            JobTask<R> task = new JobTask<>(this, key, new Callable<R>() {
                @Override
                public R call() throws Exception {
                    R result = job.main(mailbox);
                    return result;
                }
            });
            this.executorService.execute(task);
            JobGateway<R, M> jobgw = new JobGateway(task, mailbox);
            this.jobs.put(key, jobgw);
            return jobgw;
        }
 
        public synchronized JobGateway getJobGateway(String key) {
            return this.jobs.get(key);
        }
 
        synchronized void notifyJobTermination(String key) {
            if (!this.jobs.containsKey(key)) {
                return;
            }
            JobGateway jobgw = this.jobs.get(key);
            jobgw.mailbox.shutdown();
            this.jobs.remove(key);
        }
 
        public void shutdown() {
            this.executorService.shutdown();
        }
    }
 
    class HogeJob implements JobHandle<String, String> {
 
        private final String name;
 
        public HogeJob(String name) {
            this.name = name;
        }
 
        @Override
        public String main(BlockingQueue<String> mailbox) throws Exception {
            StringBuilder sb = new StringBuilder(1024);
            sb.append(this.name);
            sb.append(":");
            for (int i = 0; i < 10; i++) {
                sb.append(i);
            }
            return sb.toString();
        }
    }
 
    class CountJob implements JobHandle<Integer, String> {
 
        private int sum = 0;
 
        @Override
        public Integer main(BlockingQueue<String> mailbox) throws Exception {
            for (int i = 0; i < 10; i++) {
                this.sum += i;
            }
            return new Integer(this.sum);
        }
 
    }
 
    @Test
    public void hoge2() throws InterruptedException, ExecutionException {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        JobGateway<String, String> jgw1 = jb.submitNewJob("job1", new HogeJob("jon"));
        JobGateway<String, String> jgw2 = jb.submitNewJob("job2", new HogeJob("bob"));
        JobGateway<Integer, String> jgw3 = jb.submitNewJob("job3", new CountJob());
 
        assertEquals(jgw1.future.get(), "jon:0123456789");
        assertEquals(jgw2.future.get(), "bob:0123456789");
        assertEquals(jgw3.future.get(), new Integer(45));
 
        assertTrue(jgw1.future.isDone());
        assertTrue(jgw2.future.isDone());
        assertTrue(jgw3.future.isDone());
 
        assertNull(jb.getJobGateway("job1"));
        assertNull(jb.getJobGateway("job2"));
        assertNull(jb.getJobGateway("job3"));
 
        jb.shutdown();
    }
 
    class SlowCountJob implements JobHandle<Integer, String> {
 
        private int sum = 0;
        private final String name;
 
        public SlowCountJob(String name) {
            this.name = name;
        }
 
        @Override
        public Integer main(BlockingQueue<String> mailbox) throws Exception {
            int i = 0;
            while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) {
                this.sum += i;
                i++;
                System.out.println(this.name + ":" + this.sum);
            }
            return new Integer(this.sum);
        }
 
    }
 
    @Test(expectedExceptions = CancellationException.class)
    public void hoge3() throws InterruptedException, ExecutionException, TimeoutException {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        JobGateway<Integer, String> jgw = jb.submitNewJob("job1", new SlowCountJob("scj-1"));
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignore) {
        }
        // cancel flag was set, but not interrupted ... orphan thread!!!
        jgw.future.cancel(false);
        assertTrue(jgw.future.isCancelled());
        assertTrue(jgw.future.isDone());
        try {
            jgw.future.get(1, TimeUnit.SECONDS);
        } finally {
            // cancel(false) internally called done().
            assertNull(jb.getJobGateway("job1"));
            jb.shutdown();
        }
    }
 
    @Test(expectedExceptions = CancellationException.class)
    public void hoge4() throws InterruptedException, ExecutionException, TimeoutException {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        JobGateway<Integer, String> jgw = jb.submitNewJob("job1", new SlowCountJob("scj-2"));
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignore) {
        }
        // cancel flag was set, and, inerrupted -> thread terminates.
        jgw.future.cancel(true);
        assertTrue(jgw.future.isCancelled());
        assertTrue(jgw.future.isDone());
        try {
            jgw.future.get(1, TimeUnit.SECONDS);
        } finally {
            assertNull(jb.getJobGateway("job1"));
            jb.shutdown();
        }
    }
 
    class AbnormalEndCountJob implements JobHandle<Integer, String> {
 
        private int sum = 0;
        private final String name;
        private final int stopper;
 
        public AbnormalEndCountJob(String name, int stopper) {
            this.name = name;
            this.stopper = stopper;
        }
 
        @Override
        public Integer main(BlockingQueue<String> mailbox) throws Exception {
            int i = 0;
            while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) {
                this.sum += i;
                i++;
                System.out.println(this.name + ":" + this.sum);
                if (i > stopper) {
                    throw new IllegalStateException("stopper threashold over.");
                }
            }
            return new Integer(this.sum);
        }
 
    }
 
    @Test(expectedExceptions = IllegalStateException.class)
    public void hoge5() throws InterruptedException, ExecutionException, TimeoutException, Throwable {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        JobGateway<Integer, String> jgw = jb.submitNewJob("job1", new AbnormalEndCountJob("abend-job-1", 3));
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignore) {
        }
        // job is already done (stopper = 3 means "After 3 seconds, throw IllegalStateException")
        // so, cancel() does not effect any more. isCancelled() returns false.
        jgw.future.cancel(true);
        assertFalse(jgw.future.isCancelled());
        assertTrue(jgw.future.isDone());
        try {
            jgw.future.get(1, TimeUnit.SECONDS);
        } catch (ExecutionException expected) {
            Throwable t = expected.getCause();
            assertEquals(t.getMessage(), "stopper threashold over.");
            throw t;
        } finally {
            assertNull(jb.getJobGateway("job1"));
            jb.shutdown();
        }
    }
 
    class TypicalMessageLoopJob implements JobHandle<String, String> {
 
        @Override
        public String main(BlockingQueue<String> mailbox) throws Exception {
            StringBuilder sb = new StringBuilder(1024);
            while (true) {
                String message = mailbox.poll();
                if ("__END__".equals(message)) {
                    break;
                }
                sb.append(message);
            }
            return sb.toString();
        }
    }
 
    @Test
    public void hoge6() throws InterruptedException, ExecutionException, TimeoutException, Throwable {
        JobButler jb = new JobButler(Executors.newCachedThreadPool());
        JobGateway<String, String> jgw = jb.submitNewJob("job1", new TypicalMessageLoopJob());
 
        jgw.mailbox.put("hello, ");
        jgw.mailbox.put("bob. ");
        jgw.mailbox.put("Good ");
        jgw.mailbox.put("Morning!");
        jgw.mailbox.put("__END__");
        jgw.mailbox.put("(this message must be ignored.");
        assertEquals(jgw.future.get(), "hello, bob. Good Morning!");
        assertFalse(jgw.future.isCancelled());
        assertTrue(jgw.future.isDone());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignore) {}
        assertNull(jb.getJobGateway("job1"));
        jb.shutdown();
    }
 
}

BoheTest.javaの方が一応、洗練されてるつもり。


プレーンテキスト形式でダウンロード
現在のバージョン : 1
更新者: msakamoto-sf
更新日: 2014-04-07 00:08:11
md5:0948c8a86b5a99e11fa87424efee12b7
sha1:04c1e2d23a48e8a6966c41a80f647a027e1acaf4
コメント
コメントを投稿するにはログインして下さい。