日付変わってしまったけど。勉強中の練習プログラム。TestNG使ったテストケースで。 一応、ドメイン限定で、非同期ジョブ制御用のAPIに挑戦しようと思ってるので、使ってないメソッドとかも想像上のものとして残してたり。 HogeTest.java: #code|java|> package hoge; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; /** * * @author FengJing */ public class HogeTest { public HogeTest() { } class JobMailBox extends LinkedBlockingQueue { protected boolean isShutdown = false; public void shutdown() { this.isShutdown = true; } @Override public boolean offer(T e) { if (this.isShutdown) { return false; } return super.offer(e); } @Override public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException { if (this.isShutdown) { return false; } return super.offer(e, timeout, unit); } @Override public void put(T e) throws InterruptedException { if (this.isShutdown) { return; } super.put(e); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return super.poll(timeout, unit); } @Override public T take() throws InterruptedException { return super.take(); } } enum JobEvent { STARTED, RUNNING, CANCELLED, TERMINATED; } class JobSignal { public final JobEvent event; public final String key; public final String message; public JobSignal(JobEvent event, String key, String message) { this.event = event; this.key = key; this.message = message; } } interface JobHandle { public R main(BlockingQueue mailbox) throws Exception; } class JobButler { protected ExecutorService executorService; protected Map jobs = new ConcurrentHashMap(); protected Map mailboxes = new ConcurrentHashMap(); protected final JobMailBox butlerMailBox = new JobMailBox<>(); public JobButler(ExecutorService executorService) { this.executorService = executorService; } public JobMailBox getButlersMailBox() { return this.butlerMailBox; } public Future submitNewJob(final String key, final JobHandle job) throws InterruptedException { final BlockingQueue mailbox = new ArrayBlockingQueue(100); Callable c = new Callable() { public R call() throws Exception { butlerMailBox.put(new JobSignal(JobEvent.RUNNING, key, "running")); R result = job.main(mailbox); butlerMailBox.put(new JobSignal(JobEvent.TERMINATED, key, "terminated")); return result; } }; butlerMailBox.put(new JobSignal(JobEvent.STARTED, key, "started")); Future f = this.executorService.submit(c); this.jobs.put(key, f); this.mailboxes.put(key, mailbox); return f; } public Future getFuture(String key) { return this.jobs.get(key); } public Future terminateJob(String key) { Future f = this.jobs.get(key); if (f.isCancelled() || f.isDone()) { return f; } f.cancel(true); return f; } public void shutdown() { this.executorService.shutdown(); } } class HogeJob implements JobHandle { private final String name; public HogeJob(String name) { this.name = name; } public String main(BlockingQueue mailbox) throws Exception { StringBuilder sb = new StringBuilder(1024); sb.append(this.name); sb.append(":"); for (int i = 0; i < 10; i++) { sb.append(i); } return sb.toString(); } } class CountJob implements JobHandle { private int sum = 0; public Integer main(BlockingQueue mailbox) throws Exception { for (int i = 0; i < 10; i++) { this.sum += i; } return new Integer(this.sum); } } @Test public void hoge2() throws InterruptedException, ExecutionException { JobButler jb = new JobButler(Executors.newCachedThreadPool()); Future f1 = jb.submitNewJob("job1", new HogeJob("jon")); Future f2 = jb.submitNewJob("job2", new HogeJob("bob")); Future f3 = jb.submitNewJob("job3", new CountJob()); assertEquals(f1.get(), "jon:0123456789"); assertEquals(f2.get(), "bob:0123456789"); assertEquals(f3.get(), new Integer(45)); jb.shutdown(); } class SlowCountJob implements JobHandle { private int sum = 0; private final String name; public SlowCountJob(String name) { this.name = name; } @Override public Integer main(BlockingQueue mailbox) throws Exception { int i = 0; while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) { this.sum += i; i++; System.out.println(this.name + ":" + this.sum); } return new Integer(this.sum); } } @Test(expectedExceptions = CancellationException.class) public void hoge3() throws InterruptedException, ExecutionException, TimeoutException { JobButler jb = new JobButler(Executors.newCachedThreadPool()); Future f1 = jb.submitNewJob("job1", new SlowCountJob("scj-1")); try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // cancel flag was set, but not interrupted ... orphan thread!!! f1.cancel(false); assertTrue(f1.isCancelled()); assertTrue(f1.isDone()); try { f1.get(1, TimeUnit.SECONDS); } finally { jb.shutdown(); } } @Test(expectedExceptions = CancellationException.class) public void hoge4() throws InterruptedException, ExecutionException, TimeoutException { JobButler jb = new JobButler(Executors.newCachedThreadPool()); Future f1 = jb.submitNewJob("job1", new SlowCountJob("scj-2")); try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // cancel flag was set, and, inerrupted -> thread terminates. f1.cancel(true); assertTrue(f1.isCancelled()); assertTrue(f1.isDone()); try { f1.get(1, TimeUnit.SECONDS); } finally { jb.shutdown(); } } class AbnormalEndCountJob implements JobHandle { private int sum = 0; private final String name; private final int stopper; public AbnormalEndCountJob(String name, int stopper) { this.name = name; this.stopper = stopper; } @Override public Integer main(BlockingQueue mailbox) throws Exception { int i = 0; while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) { this.sum += i; i++; System.out.println(this.name + ":" + this.sum); if (i > stopper) { throw new IllegalStateException("stopper threashold over."); } } return new Integer(this.sum); } } @Test(expectedExceptions = IllegalStateException.class) public void hoge5() throws InterruptedException, ExecutionException, TimeoutException, Throwable { JobButler jb = new JobButler(Executors.newCachedThreadPool()); Future f1 = jb.submitNewJob("job1", new AbnormalEndCountJob("abend-job-1", 3)); try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // job is already done (stopper = 3 means "After 3 seconds, throw IllegalStateException") // so, cancel() does not effect any more. isCancelled() returns false. f1.cancel(true); assertFalse(f1.isCancelled()); assertTrue(f1.isDone()); try { f1.get(1, TimeUnit.SECONDS); } catch (ExecutionException expected) { Throwable t = expected.getCause(); assertEquals(t.getMessage(), "stopper threashold over."); throw t; } finally { jb.shutdown(); } } @Test public void hoge() throws InterruptedException, ExecutionException { ExecutorService es = Executors.newCachedThreadPool(); Callable c = new Callable() { public String call() throws Exception { StringBuilder sb = new StringBuilder(1024); for (int i = 0; i < 10; i++) { // Thread.sleep(1000); sb.append(i); } return sb.toString(); } }; Future f = es.submit(c); assertEquals(f.get(), "0123456789"); } } ||< BoheTest.java: #code|java|> package hoge; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; /** * * @author FengJing */ public class BoheTest { public BoheTest() { } class JobMailBox extends LinkedBlockingQueue { protected boolean isShutdown = false; public void shutdown() { this.isShutdown = true; } @Override public boolean offer(T e) { if (this.isShutdown) { return false; } return super.offer(e); } @Override public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException { if (this.isShutdown) { return false; } return super.offer(e, timeout, unit); } @Override public void put(T e) throws InterruptedException { if (this.isShutdown) { return; } super.put(e); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return super.poll(timeout, unit); } @Override public T take() throws InterruptedException { return super.take(); } } interface JobHandle { public R main(BlockingQueue mailbox) throws Exception; } class JobTask extends FutureTask { protected final String key; protected final JobButler butler; public JobTask(JobButler butler, String key, Callable callable) { super(callable); this.key = key; this.butler = butler; } public JobTask(JobButler butler, String key, Runnable runnable, V result) { super(runnable, result); this.key = key; this.butler = butler; } @Override protected void done() { butler.notifyJobTermination(key); } } class JobGateway { final Future future; final JobMailBox mailbox; public JobGateway(Future f, JobMailBox mb) { this.future = f; this.mailbox = mb; } } class JobButler { protected ExecutorService executorService; protected Map jobs = new ConcurrentHashMap<>(); public JobButler(ExecutorService executorService) { this.executorService = executorService; } public synchronized JobGateway submitNewJob(final String key, final JobHandle job) throws InterruptedException { final JobMailBox mailbox = new JobMailBox<>(); JobTask task = new JobTask<>(this, key, new Callable() { @Override public R call() throws Exception { R result = job.main(mailbox); return result; } }); this.executorService.execute(task); JobGateway jobgw = new JobGateway(task, mailbox); this.jobs.put(key, jobgw); return jobgw; } public synchronized JobGateway getJobGateway(String key) { return this.jobs.get(key); } synchronized void notifyJobTermination(String key) { if (!this.jobs.containsKey(key)) { return; } JobGateway jobgw = this.jobs.get(key); jobgw.mailbox.shutdown(); this.jobs.remove(key); } public void shutdown() { this.executorService.shutdown(); } } class HogeJob implements JobHandle { private final String name; public HogeJob(String name) { this.name = name; } @Override public String main(BlockingQueue mailbox) throws Exception { StringBuilder sb = new StringBuilder(1024); sb.append(this.name); sb.append(":"); for (int i = 0; i < 10; i++) { sb.append(i); } return sb.toString(); } } class CountJob implements JobHandle { private int sum = 0; @Override public Integer main(BlockingQueue mailbox) throws Exception { for (int i = 0; i < 10; i++) { this.sum += i; } return new Integer(this.sum); } } @Test public void hoge2() throws InterruptedException, ExecutionException { JobButler jb = new JobButler(Executors.newCachedThreadPool()); JobGateway jgw1 = jb.submitNewJob("job1", new HogeJob("jon")); JobGateway jgw2 = jb.submitNewJob("job2", new HogeJob("bob")); JobGateway jgw3 = jb.submitNewJob("job3", new CountJob()); assertEquals(jgw1.future.get(), "jon:0123456789"); assertEquals(jgw2.future.get(), "bob:0123456789"); assertEquals(jgw3.future.get(), new Integer(45)); assertTrue(jgw1.future.isDone()); assertTrue(jgw2.future.isDone()); assertTrue(jgw3.future.isDone()); assertNull(jb.getJobGateway("job1")); assertNull(jb.getJobGateway("job2")); assertNull(jb.getJobGateway("job3")); jb.shutdown(); } class SlowCountJob implements JobHandle { private int sum = 0; private final String name; public SlowCountJob(String name) { this.name = name; } @Override public Integer main(BlockingQueue mailbox) throws Exception { int i = 0; while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) { this.sum += i; i++; System.out.println(this.name + ":" + this.sum); } return new Integer(this.sum); } } @Test(expectedExceptions = CancellationException.class) public void hoge3() throws InterruptedException, ExecutionException, TimeoutException { JobButler jb = new JobButler(Executors.newCachedThreadPool()); JobGateway jgw = jb.submitNewJob("job1", new SlowCountJob("scj-1")); try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // cancel flag was set, but not interrupted ... orphan thread!!! jgw.future.cancel(false); assertTrue(jgw.future.isCancelled()); assertTrue(jgw.future.isDone()); try { jgw.future.get(1, TimeUnit.SECONDS); } finally { // cancel(false) internally called done(). assertNull(jb.getJobGateway("job1")); jb.shutdown(); } } @Test(expectedExceptions = CancellationException.class) public void hoge4() throws InterruptedException, ExecutionException, TimeoutException { JobButler jb = new JobButler(Executors.newCachedThreadPool()); JobGateway jgw = jb.submitNewJob("job1", new SlowCountJob("scj-2")); try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // cancel flag was set, and, inerrupted -> thread terminates. jgw.future.cancel(true); assertTrue(jgw.future.isCancelled()); assertTrue(jgw.future.isDone()); try { jgw.future.get(1, TimeUnit.SECONDS); } finally { assertNull(jb.getJobGateway("job1")); jb.shutdown(); } } class AbnormalEndCountJob implements JobHandle { private int sum = 0; private final String name; private final int stopper; public AbnormalEndCountJob(String name, int stopper) { this.name = name; this.stopper = stopper; } @Override public Integer main(BlockingQueue mailbox) throws Exception { int i = 0; while (!"terminate".equals(mailbox.poll(1000, TimeUnit.MILLISECONDS))) { this.sum += i; i++; System.out.println(this.name + ":" + this.sum); if (i > stopper) { throw new IllegalStateException("stopper threashold over."); } } return new Integer(this.sum); } } @Test(expectedExceptions = IllegalStateException.class) public void hoge5() throws InterruptedException, ExecutionException, TimeoutException, Throwable { JobButler jb = new JobButler(Executors.newCachedThreadPool()); JobGateway jgw = jb.submitNewJob("job1", new AbnormalEndCountJob("abend-job-1", 3)); try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // job is already done (stopper = 3 means "After 3 seconds, throw IllegalStateException") // so, cancel() does not effect any more. isCancelled() returns false. jgw.future.cancel(true); assertFalse(jgw.future.isCancelled()); assertTrue(jgw.future.isDone()); try { jgw.future.get(1, TimeUnit.SECONDS); } catch (ExecutionException expected) { Throwable t = expected.getCause(); assertEquals(t.getMessage(), "stopper threashold over."); throw t; } finally { assertNull(jb.getJobGateway("job1")); jb.shutdown(); } } class TypicalMessageLoopJob implements JobHandle { @Override public String main(BlockingQueue mailbox) throws Exception { StringBuilder sb = new StringBuilder(1024); while (true) { String message = mailbox.poll(); if ("__END__".equals(message)) { break; } sb.append(message); } return sb.toString(); } } @Test public void hoge6() throws InterruptedException, ExecutionException, TimeoutException, Throwable { JobButler jb = new JobButler(Executors.newCachedThreadPool()); JobGateway jgw = jb.submitNewJob("job1", new TypicalMessageLoopJob()); jgw.mailbox.put("hello, "); jgw.mailbox.put("bob. "); jgw.mailbox.put("Good "); jgw.mailbox.put("Morning!"); jgw.mailbox.put("__END__"); jgw.mailbox.put("(this message must be ignored."); assertEquals(jgw.future.get(), "hello, bob. Good Morning!"); assertFalse(jgw.future.isCancelled()); assertTrue(jgw.future.isDone()); try { Thread.sleep(1000); } catch (InterruptedException ignore) {} assertNull(jb.getJobGateway("job1")); jb.shutdown(); } } ||< BoheTest.javaの方が一応、洗練されてるつもり。