• Views
  • Iteration Report
  • My Iteration Report
  •  
OMERO.server
  • Login
  • Help/Guide
  • About Trac
  • Preferences
  • Wiki
  • Timeline
  • Roadmap
  • Browse Source
  • View Tickets
  • Search

Context Navigation

  • Last Change
  • Annotate
  • Revision Log

root/trunk/components/blitz/src/omero/grid/InteractiveProcessorI.java

Revision 3001, 8.0 kB (checked in by jmoore, 2 months ago)

ticket:1106 - protecting Rtype.val

  • Added protected? metadata on all rtypes
  • Added blitz/src/omero/rtypes.java with the complete implementation
  • Added getValue() to all rtypes
  • Added methods to collections et al. to simplify use.
  • Added omero.rtypes.Conversion interface to simplify IceMapper
  • Missing object factories
  • Removed all omero.J* subclasses (unused)
Line 
1/*
2 *   $Id$
3 *
4 *   Copyright 2008 Glencoe Software, Inc. All rights reserved.
5 *   Use is subject to license terms supplied in LICENSE.txt
6 */
7
8package omero.grid;
9
10import java.util.HashMap;
11import java.util.Map;
12import java.util.concurrent.locks.ReadWriteLock;
13import java.util.concurrent.locks.ReentrantReadWriteLock;
14
15import static omero.rtypes.*;
16import ome.model.core.OriginalFile;
17import ome.model.meta.Session;
18import ome.parameters.Parameters;
19import ome.services.procs.Processor;
20import ome.services.sessions.SessionManager;
21import ome.services.util.Executor;
22import ome.system.EventContext;
23import ome.system.Principal;
24import ome.system.ServiceFactory;
25import omero.ApiUsageException;
26import omero.RMap;
27import omero.RObject;
28import omero.RType;
29import omero.ServerError;
30import omero.model.Job;
31import omero.model.OriginalFileI;
32import omero.util.IceMapper;
33
34import org.apache.commons.logging.Log;
35import org.apache.commons.logging.LogFactory;
36import org.springframework.transaction.TransactionStatus;
37
38import Ice.Current;
39
40/**
41 * {@link Processor} implementation which delegates to an omero.grid.Processor
42 * servant. Functions as a state machine. A single {@link ProcessPrx} can be
43 * active at any given time. Once it's complete, then the {@link RMap results}
44 * can be obtained, then a new {@link Job} can be submitted, until {@link #stop}
45 * is set. Any other use throws an {@link ApiUsageException}.
46 *
47 * @author Josh Moore, josh at glencoesoftware.com
48 * @since 3.0-Beta3
49 */
50public class InteractiveProcessorI extends _InteractiveProcessorDisp {
51
52    private static Session UNINITIALIZED = new Session();
53
54    private static Log log = LogFactory.getLog(InteractiveProcessorI.class);
55
56    private final SessionManager mgr;
57
58    private final ProcessorPrx prx;
59
60    private final Executor ex;
61
62    private final Job job;
63
64    private final long timeout;
65
66    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
67
68    private final Principal principal;
69
70    private boolean obtainResults = false;
71
72    private boolean stop = false;
73
74    private ProcessPrx currentProcess = null;
75
76    private Session session;
77
78    /**
79     *
80     * @param p
81     * @param mgr
82     * @param prx
83     * @param job
84     *            Unloaded {@link Job} instance, which will be used by
85     *            {@link omero.grid.Processor} to reload the {@link Job}
86     * @param timeout
87     */
88    public InteractiveProcessorI(Principal p, SessionManager mgr, Executor ex,
89            ProcessorPrx prx, Job job, long timeout) {
90        this.principal = p;
91        this.ex = ex;
92        this.mgr = mgr;
93        this.prx = prx;
94        this.job = job;
95        this.timeout = timeout;
96        this.session = UNINITIALIZED;
97    }
98
99    public JobParams params(Current __current) throws ServerError {
100
101        rwl.writeLock().lock();
102
103        try {
104
105            if (stop) {
106                throw new ApiUsageException(null, null,
107                        "This processor is stopped.");
108            }
109
110            // Setup new user session
111            if (session == UNINITIALIZED) {
112                session = newSession(__current);
113            }
114
115            try {
116                return prx.parseJob(session.getUuid(), job);
117            } catch (ServerError se) {
118                log.debug("Error while parsing job", se);
119                throw se;
120            }
121
122        } finally {
123            rwl.writeLock().unlock();
124        }
125
126    }
127
128    public ProcessPrx execute(RMap inputs, Current __current)
129            throws ServerError {
130
131        rwl.writeLock().lock();
132
133        try {
134
135            if (currentProcess != null) {
136                throw new ApiUsageException(null, null,
137                        "Process currently running.");
138            }
139
140            if (obtainResults) {
141                throw new ApiUsageException(null, null,
142                        "Please retrieve results.");
143            }
144
145            if (stop) {
146                throw new ApiUsageException(null, null,
147                        "This processor is stopped.");
148            }
149
150            // Setup new user session
151            if (session == UNINITIALIZED) {
152                session = newSession(__current);
153            }
154
155            // Setup environment
156            if (inputs != null && inputs.getValue() != null) {
157                for (String key : inputs.getValue().keySet()) {
158                    mgr.setInput(session.getUuid(), key, inputs.get(key));
159                }
160            }
161
162            // Execute
163            try {
164                currentProcess = prx.processJob(session.getUuid(), job);
165            } catch (ServerError se) {
166                log.debug("Error while processing job", se);
167                throw se;
168            }
169
170            if (currentProcess == null) {
171                return null;
172            }
173
174            obtainResults = true;
175            return currentProcess;
176
177        } finally {
178            rwl.writeLock().unlock();
179        }
180
181    }
182
183    public RMap getResults(ProcessPrx proc, Current __current)
184            throws ServerError {
185
186        rwl.writeLock().lock();
187        try {
188            finishedOrThrow();
189
190            // Gather output
191            omero.RMap output = rmap(
192                    new HashMap<String, omero.RType>());
193            Map<String, Object> env = mgr.outputEnvironment(session.getUuid());
194            IceMapper mapper = new IceMapper();
195            for (String key : env.keySet()) {
196                RType rt = mapper.toRType(env.get(key));
197                output.put(key, rt);
198            }
199            optionallyLoadFile(output.getValue(), "stdout");
200            optionallyLoadFile(output.getValue(), "stderr");
201            currentProcess = null;
202            obtainResults = false;
203            return output;
204        } finally {
205            rwl.writeLock().unlock();
206        }
207    }
208
209    public long expires(Current __current) {
210        return timeout;
211    }
212
213    public Job getJob(Current __current) {
214        return job;
215    }
216
217    /**
218     * Cancels the current process, nulls the value, and returns immediately.
219     */
220    public void stop() {
221        rwl.writeLock().lock();
222        try {
223            if (currentProcess != null) {
224                currentProcess.cancel();
225                currentProcess = null;
226                stop = true;
227            }
228        } finally {
229            rwl.writeLock().unlock();
230        }
231    }
232
233    // Helpers
234    // =========================================================================
235
236    private void finishedOrThrow() throws ApiUsageException {
237        if (currentProcess == null) {
238            throw new ApiUsageException(null, null, "No current process.");
239        } else if (currentProcess.poll() == null) {
240            throw new ApiUsageException(null, null, "Process still running.");
241        }
242    }
243
244    private final static String stdfile_query = "select file from Job job "
245            + "join job.originalFileLinks links " + "join links.child file "
246            + "where file.name = :name and job.id = :id";
247
248    private void optionallyLoadFile(final Map<String, RType> val,
249            final String name) {
250        this.ex.execute(this.principal, new Executor.Work() {
251            public Object doWork(TransactionStatus status,
252                    org.hibernate.Session session, ServiceFactory sf) {
253
254                OriginalFile file = sf.getQueryService().findByQuery(
255                        stdfile_query,
256                        new Parameters().addId(job.getId().getValue()).addString(
257                                "name", name));
258                if (file != null) {
259                    val.put(name, robject(new OriginalFileI(file.getId(),
260                            false)));
261                }
262                return null;
263            }
264        });
265    }
266
267    private Session newSession(Current __current) {
268        EventContext ec = mgr.getEventContext(principal);
269        Session newSession = mgr.create(new Principal(ec.getCurrentUserName(),
270                ec.getCurrentGroupName(), "Processing"));
271        newSession.setTimeToIdle(0L);
272        newSession.setTimeToLive(timeout);
273        newSession = mgr.update(newSession);
274
275        return newSession;
276    }
277}
Note: See TracBrowser for help on using the browser.

Download in other formats:

  • Plain Text
  • Original Format

Trac Powered

Powered by Trac 0.11
By Edgewall Software.

Visit the Trac open source project at
http://trac.edgewall.org/