root/trunk/components/blitz/src/omero/grid/InteractiveProcessorI.java
| Revision 3001, 8.0 kB (checked in by jmoore, 2 months ago) |
|---|
| Line | |
|---|---|
| 1 | /* |
| 2 | * $Id$ |
| 3 | * |
| 4 | * Copyright 2008 Glencoe Software, Inc. All rights reserved. |
| 5 | * Use is subject to license terms supplied in LICENSE.txt |
| 6 | */ |
| 7 | |
| 8 | package omero.grid; |
| 9 | |
| 10 | import java.util.HashMap; |
| 11 | import java.util.Map; |
| 12 | import java.util.concurrent.locks.ReadWriteLock; |
| 13 | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| 14 | |
| 15 | import static omero.rtypes.*; |
| 16 | import ome.model.core.OriginalFile; |
| 17 | import ome.model.meta.Session; |
| 18 | import ome.parameters.Parameters; |
| 19 | import ome.services.procs.Processor; |
| 20 | import ome.services.sessions.SessionManager; |
| 21 | import ome.services.util.Executor; |
| 22 | import ome.system.EventContext; |
| 23 | import ome.system.Principal; |
| 24 | import ome.system.ServiceFactory; |
| 25 | import omero.ApiUsageException; |
| 26 | import omero.RMap; |
| 27 | import omero.RObject; |
| 28 | import omero.RType; |
| 29 | import omero.ServerError; |
| 30 | import omero.model.Job; |
| 31 | import omero.model.OriginalFileI; |
| 32 | import omero.util.IceMapper; |
| 33 | |
| 34 | import org.apache.commons.logging.Log; |
| 35 | import org.apache.commons.logging.LogFactory; |
| 36 | import org.springframework.transaction.TransactionStatus; |
| 37 | |
| 38 | import Ice.Current; |
| 39 | |
| 40 | /** |
| 41 | * {@link Processor} implementation which delegates to an omero.grid.Processor |
| 42 | * servant. Functions as a state machine. A single {@link ProcessPrx} can be |
| 43 | * active at any given time. Once it's complete, then the {@link RMap results} |
| 44 | * can be obtained, then a new {@link Job} can be submitted, until {@link #stop} |
| 45 | * is set. Any other use throws an {@link ApiUsageException}. |
| 46 | * |
| 47 | * @author Josh Moore, josh at glencoesoftware.com |
| 48 | * @since 3.0-Beta3 |
| 49 | */ |
| 50 | public class InteractiveProcessorI extends _InteractiveProcessorDisp { |
| 51 | |
| 52 | private static Session UNINITIALIZED = new Session(); |
| 53 | |
| 54 | private static Log log = LogFactory.getLog(InteractiveProcessorI.class); |
| 55 | |
| 56 | private final SessionManager mgr; |
| 57 | |
| 58 | private final ProcessorPrx prx; |
| 59 | |
| 60 | private final Executor ex; |
| 61 | |
| 62 | private final Job job; |
| 63 | |
| 64 | private final long timeout; |
| 65 | |
| 66 | private final ReadWriteLock rwl = new ReentrantReadWriteLock(); |
| 67 | |
| 68 | private final Principal principal; |
| 69 | |
| 70 | private boolean obtainResults = false; |
| 71 | |
| 72 | private boolean stop = false; |
| 73 | |
| 74 | private ProcessPrx currentProcess = null; |
| 75 | |
| 76 | private Session session; |
| 77 | |
| 78 | /** |
| 79 | * |
| 80 | * @param p |
| 81 | * @param mgr |
| 82 | * @param prx |
| 83 | * @param job |
| 84 | * Unloaded {@link Job} instance, which will be used by |
| 85 | * {@link omero.grid.Processor} to reload the {@link Job} |
| 86 | * @param timeout |
| 87 | */ |
| 88 | public InteractiveProcessorI(Principal p, SessionManager mgr, Executor ex, |
| 89 | ProcessorPrx prx, Job job, long timeout) { |
| 90 | this.principal = p; |
| 91 | this.ex = ex; |
| 92 | this.mgr = mgr; |
| 93 | this.prx = prx; |
| 94 | this.job = job; |
| 95 | this.timeout = timeout; |
| 96 | this.session = UNINITIALIZED; |
| 97 | } |
| 98 | |
| 99 | public JobParams params(Current __current) throws ServerError { |
| 100 | |
| 101 | rwl.writeLock().lock(); |
| 102 | |
| 103 | try { |
| 104 | |
| 105 | if (stop) { |
| 106 | throw new ApiUsageException(null, null, |
| 107 | "This processor is stopped."); |
| 108 | } |
| 109 | |
| 110 | // Setup new user session |
| 111 | if (session == UNINITIALIZED) { |
| 112 | session = newSession(__current); |
| 113 | } |
| 114 | |
| 115 | try { |
| 116 | return prx.parseJob(session.getUuid(), job); |
| 117 | } catch (ServerError se) { |
| 118 | log.debug("Error while parsing job", se); |
| 119 | throw se; |
| 120 | } |
| 121 | |
| 122 | } finally { |
| 123 | rwl.writeLock().unlock(); |
| 124 | } |
| 125 | |
| 126 | } |
| 127 | |
| 128 | public ProcessPrx execute(RMap inputs, Current __current) |
| 129 | throws ServerError { |
| 130 | |
| 131 | rwl.writeLock().lock(); |
| 132 | |
| 133 | try { |
| 134 | |
| 135 | if (currentProcess != null) { |
| 136 | throw new ApiUsageException(null, null, |
| 137 | "Process currently running."); |
| 138 | } |
| 139 | |
| 140 | if (obtainResults) { |
| 141 | throw new ApiUsageException(null, null, |
| 142 | "Please retrieve results."); |
| 143 | } |
| 144 | |
| 145 | if (stop) { |
| 146 | throw new ApiUsageException(null, null, |
| 147 | "This processor is stopped."); |
| 148 | } |
| 149 | |
| 150 | // Setup new user session |
| 151 | if (session == UNINITIALIZED) { |
| 152 | session = newSession(__current); |
| 153 | } |
| 154 | |
| 155 | // Setup environment |
| 156 | if (inputs != null && inputs.getValue() != null) { |
| 157 | for (String key : inputs.getValue().keySet()) { |
| 158 | mgr.setInput(session.getUuid(), key, inputs.get(key)); |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | // Execute |
| 163 | try { |
| 164 | currentProcess = prx.processJob(session.getUuid(), job); |
| 165 | } catch (ServerError se) { |
| 166 | log.debug("Error while processing job", se); |
| 167 | throw se; |
| 168 | } |
| 169 | |
| 170 | if (currentProcess == null) { |
| 171 | return null; |
| 172 | } |
| 173 | |
| 174 | obtainResults = true; |
| 175 | return currentProcess; |
| 176 | |
| 177 | } finally { |
| 178 | rwl.writeLock().unlock(); |
| 179 | } |
| 180 | |
| 181 | } |
| 182 | |
| 183 | public RMap getResults(ProcessPrx proc, Current __current) |
| 184 | throws ServerError { |
| 185 | |
| 186 | rwl.writeLock().lock(); |
| 187 | try { |
| 188 | finishedOrThrow(); |
| 189 | |
| 190 | // Gather output |
| 191 | omero.RMap output = rmap( |
| 192 | new HashMap<String, omero.RType>()); |
| 193 | Map<String, Object> env = mgr.outputEnvironment(session.getUuid()); |
| 194 | IceMapper mapper = new IceMapper(); |
| 195 | for (String key : env.keySet()) { |
| 196 | RType rt = mapper.toRType(env.get(key)); |
| 197 | output.put(key, rt); |
| 198 | } |
| 199 | optionallyLoadFile(output.getValue(), "stdout"); |
| 200 | optionallyLoadFile(output.getValue(), "stderr"); |
| 201 | currentProcess = null; |
| 202 | obtainResults = false; |
| 203 | return output; |
| 204 | } finally { |
| 205 | rwl.writeLock().unlock(); |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | public long expires(Current __current) { |
| 210 | return timeout; |
| 211 | } |
| 212 | |
| 213 | public Job getJob(Current __current) { |
| 214 | return job; |
| 215 | } |
| 216 | |
| 217 | /** |
| 218 | * Cancels the current process, nulls the value, and returns immediately. |
| 219 | */ |
| 220 | public void stop() { |
| 221 | rwl.writeLock().lock(); |
| 222 | try { |
| 223 | if (currentProcess != null) { |
| 224 | currentProcess.cancel(); |
| 225 | currentProcess = null; |
| 226 | stop = true; |
| 227 | } |
| 228 | } finally { |
| 229 | rwl.writeLock().unlock(); |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | // Helpers |
| 234 | // ========================================================================= |
| 235 | |
| 236 | private void finishedOrThrow() throws ApiUsageException { |
| 237 | if (currentProcess == null) { |
| 238 | throw new ApiUsageException(null, null, "No current process."); |
| 239 | } else if (currentProcess.poll() == null) { |
| 240 | throw new ApiUsageException(null, null, "Process still running."); |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | private final static String stdfile_query = "select file from Job job " |
| 245 | + "join job.originalFileLinks links " + "join links.child file " |
| 246 | + "where file.name = :name and job.id = :id"; |
| 247 | |
| 248 | private void optionallyLoadFile(final Map<String, RType> val, |
| 249 | final String name) { |
| 250 | this.ex.execute(this.principal, new Executor.Work() { |
| 251 | public Object doWork(TransactionStatus status, |
| 252 | org.hibernate.Session session, ServiceFactory sf) { |
| 253 | |
| 254 | OriginalFile file = sf.getQueryService().findByQuery( |
| 255 | stdfile_query, |
| 256 | new Parameters().addId(job.getId().getValue()).addString( |
| 257 | "name", name)); |
| 258 | if (file != null) { |
| 259 | val.put(name, robject(new OriginalFileI(file.getId(), |
| 260 | false))); |
| 261 | } |
| 262 | return null; |
| 263 | } |
| 264 | }); |
| 265 | } |
| 266 | |
| 267 | private Session newSession(Current __current) { |
| 268 | EventContext ec = mgr.getEventContext(principal); |
| 269 | Session newSession = mgr.create(new Principal(ec.getCurrentUserName(), |
| 270 | ec.getCurrentGroupName(), "Processing")); |
| 271 | newSession.setTimeToIdle(0L); |
| 272 | newSession.setTimeToLive(timeout); |
| 273 | newSession = mgr.update(newSession); |
| 274 | |
| 275 | return newSession; |
| 276 | } |
| 277 | } |
Note: See TracBrowser
for help on using the browser.
