root/trunk/components/tools/OmeroPy/src/omero/processor.py

Revision 7556, 30.5 KB (checked in by jmoore, 2 weeks ago)

s/write_text/write_bytes/ for Windows processor.py sha1 check.

original-svn-id:  file:///home/svn/omero/branches/Beta4.2@7552 05709c45-44f0-0310-885b-81a1db45b4a6

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2#
3# OMERO Grid Processor
4# Copyright 2008 Glencoe Software, Inc.  All Rights Reserved.
5# Use is subject to license terms supplied in LICENSE.txt
6#
7
8import os
9import sys
10import time
11import signal
12import logging
13import traceback
14import exceptions
15import killableprocess as subprocess
16
17from path import path
18
19import Ice
20import omero
21import omero.clients
22import omero.scripts
23import omero.util
24import omero.util.concurrency
25
26from omero.util.temp_files import create_path, remove_path
27from omero.util.decorators import remoted, perf, locked
28from omero.rtypes import *
29from omero.util.decorators import remoted, perf, wraps
30
31def with_context(func, context):
32    """ Decorator for invoking Ice methods with a context """
33    def handler(*args, **kwargs):
34        args = list(args)
35        args.append(context)
36        return func(*args, **kwargs)
37    handler = wraps(func)(handler)
38    return handler
39
40class WithGroup(object):
41    """
42    Wraps a ServiceInterfacePrx instance and applies
43    a "omero.group" to the passed context on every
44    invotation.
45
46    For example, using a job handle as root requires logging
47    manually into the group. (ticket:2044)
48    """
49
50    def __init__(self, service, group_id):
51        self._service = service
52        self._group_id = str(group_id)
53
54    def _get_ctx(self, group = None):
55        ctx = self._service.ice_getCommunicator().getImplicitContext().getContext()
56        ctx = dict(ctx)
57        ctx["omero.group"] = group
58        return ctx
59
60    def __getattr__(self, name):
61        if name.startswith("_"):
62            return self.__dict__[name]
63        elif hasattr(self._service, name):
64            method = getattr(self._service, name)
65            ctx = self._get_ctx(self._group_id)
66            return with_context(method, ctx)
67        raise AttributeError("'%s' object has no attribute '%s'" % (self.service, name))
68
69class ProcessI(omero.grid.Process, omero.util.SimpleServant):
70    """
71    Wrapper around a subprocess.Popen instance. Returned by ProcessorI
72    when a job is submitted. This implementation uses the given
73    interpreter to call a file that must be named "script" in the
74    generated temporary directory.
75
76    Call is equivalent to:
77
78    cd TMP_DIR
79    ICE_CONFIG=./config interpreter ./script >out 2>err &
80
81    The properties argument is used to generate the ./config file.
82
83    The params argument may be null in which case this process
84    is being used solely to calculate the parameters for the script
85    ("omero.scripts.parse=true")
86
87    If iskill is True, then on cleanup, this process will reap the
88    attached session completely.
89    """
90
91    def __init__(self, ctx, interpreter, properties, params, iskill = False,\
92        Popen = subprocess.Popen, callback_cast = omero.grid.ProcessCallbackPrx.uncheckedCast):
93        """
94        Popen and callback_Cast are primarily for testing.
95        """
96        omero.util.SimpleServant.__init__(self, ctx)
97        self.interpreter = interpreter        #: Executable which will be used on the script
98        self.properties = properties          #: Properties used to create an Ice.Config
99        self.params = params                  #: JobParams for this script. Possibly None if a ParseJob
100        self.iskill = iskill                  #: Whether or not, cleanup should kill the session
101        self.Popen = Popen                    #: Function which should be used for creating processes
102        self.callback_cast = callback_cast    #: Function used to cast all ProcessCallback proxies
103        # Non arguments (mutable state)
104        self.rcode = None                     #: return code from popen
105        self.callbacks = {}                   #: dictionary from id strings to callback proxies
106        self.popen = None                     #: process. if None, then this instance isn't alive.
107        self.pid = None                       #: pid of the process. Once set, isn't nulled.
108        self.started = None                   #: time the process started
109        self.stopped = None                   #: time of deactivation
110        self.final_status = None              #: status which will be sent on set_job_status
111        # Non arguments (immutable state)
112        self.uuid = properties["omero.user"]  #: session this instance is tied to
113
114        # More fields set by these methods
115        self.make_files()
116        self.make_env()
117        self.make_config()
118        self.logger.info("Created %s in %s" % (self.uuid, self.dir))
119
120    #
121    # Initialization methods
122    #
123
124    def make_env(self):
125        self.env = omero.util.Environment("PATH", "PYTHONPATH",\
126            "DYLD_LIBRARY_PATH", "LD_LIBRARY_PATH", "MLABRAW_CMD_STR", "HOME")
127        # WORKAROUND
128        # Currently duplicating the logic here as in the PYTHONPATH
129        # setting of the grid application descriptor (see etc/grid/*.xml)
130        # This should actually be taken care of in the descriptor itself
131        # by having setting PYTHONPATH to an absolute value. This is
132        # not currently possible with IceGrid (without using icepatch --
133        # see 39.17.2 "node.datadir).
134        self.env.append("PYTHONPATH", str(path.getcwd() / "lib" / "python"))
135        self.env.set("ICE_CONFIG", str(self.config_path))
136
137    def make_files(self):
138        self.dir = create_path("process", ".dir", folder = True)
139        self.script_path = self.dir / "script"
140        self.config_path = self.dir / "config"
141        self.stdout_path = self.dir / "out"
142        self.stderr_path = self.dir / "err"
143
144    def make_config(self):
145        """
146        Creates the ICE_CONFIG file used by the client.
147        """
148        config_file = open(str(self.config_path), "w")
149        try:
150            for key in self.properties.iterkeys():
151                config_file.write("%s=%s\n"%(key, self.properties[key]))
152        finally:
153            config_file.close()
154
155    def tmp_client(self):
156        """
157        Create a client for performing cleanup operations.
158        This client should be closed as soon as possible
159        by the process
160        """
161        try:
162            client = omero.client(["--Ice.Config=%s" % str(self.config_path)])
163            client.setAgent("OMERO.process")
164            client.createSession().detachOnDestroy()
165            self.logger.debug("client: %s" % client.sf)
166            return client
167        except:
168            self.logger.error("Failed to create client for %s" % self.uuid)
169            return None
170
171    #
172    # Activation / Deactivation
173    #
174
175    @locked
176    def activate(self):
177        """
178        Process creation has to wait until all external downloads, etc
179        are finished.
180        """
181
182        if self.isActive():
183            raise omero.ApiUsageException(None, None, "Already activated")
184
185        self.stdout = open(str(self.stdout_path), "w")
186        self.stderr = open(str(self.stderr_path), "w")
187        self.popen = self.Popen([self.interpreter, "./script"], cwd=str(self.dir), env=self.env(), stdout=self.stdout, stderr=self.stderr)
188        self.pid = self.popen.pid
189        self.started = time.time()
190        self.stopped = None
191        self.status("Activated")
192
193    @locked
194    def deactivate(self):
195        """
196        Cleans up the temporary directory used by the process, and terminates
197        the Popen process if running.
198        """
199
200        if not self.isActive():
201            raise omero.ApiUsageException(None, None, "Not active")
202
203        if self.stopped:
204            # Prevent recursion since we are reusing kill & cancel
205            return
206
207        self.stopped = time.time()
208        d_start = time.time()
209        self.status("Deactivating")
210
211        # None of these should throw, but just in case
212        try:
213
214            self.shutdown()     # Calls cancel & kill which recall this method!
215            self.popen = None   # Now we are finished
216
217            client = self.tmp_client()
218            try:
219                self.set_job_status(client)
220                self.cleanup_output()
221                self.upload_output(client) # Important!
222                self.cleanup_tmpdir()
223            finally:
224                if client:
225                    client.__del__() # Safe closeSession
226
227        except exceptions.Exception:
228            self.logger.error("FAILED TO CLEANUP pid=%s (%s)", self.pid, self.uuid, exc_info = True)
229
230        d_stop = time.time()
231        elapsed = int(self.stopped - self.started)
232        d_elapsed = int(d_stop - d_start)
233        self.status("Lived %ss. Deactivation took %ss." % (elapsed, d_elapsed))
234
235    @locked
236    def isActive(self):
237        """
238        Tests only if this instance has a non-None popen attribute. After activation
239        this method will return True until the popen itself returns a non-None
240        value (self.rcode) at which time it will be nulled and this method will again
241        return False
242        """
243        return self.popen is not None
244
245    @locked
246    def wasActivated(self):
247        """
248        Returns true only if this instance has either a non-null
249        popen or a non-null rcode field.
250        """
251        return self.popen is not None or self.rcode is not None
252
253    @locked
254    def isRunning(self):
255        return self.popen is not None and self.rcode is None
256
257    @locked
258    def isFinished(self):
259        return self.rcode is not None
260
261    @locked
262    def alreadyDone(self):
263        """
264        Allows short-cutting various checks if we already
265        have a rcode for this popen. A non-None return value
266        implies that a process was started and returned
267        the given non-None value itself.
268        """
269        if not self.wasActivated:
270            raise omero.InternalException(None, None, "Process never activated")
271        return self.isFinished()
272
273    #
274    # Cleanup methods
275    #
276
277    def __del__(self):
278        self.cleanup()
279
280    @perf
281    @locked
282    def check(self):
283        """
284        Called periodically to keep the session alive. Returns
285        False if this resource can be cleaned up. (Resources API)
286        """
287
288        if not self.wasActivated():
289            return True # This should only happen on startup, so ignore
290
291        try:
292            self.poll()
293            self.ctx.getSession().getSessionService().getSession(self.uuid)
294            return True
295        except:
296            self.status("Keep alive failed")
297            return False
298
299    @perf
300    @locked
301    def cleanup(self):
302        """
303        Deactivates the process (if active) and cleanups the server
304        connection. (Resources API)
305        """
306
307        if self.isRunning():
308            self.deactivate()
309
310        if not self.iskill:
311            return
312
313        try:
314            sf = self.ctx.getSession(recreate = False)
315        except:
316            self.logger.debug("Can't get session for cleanup")
317            return
318
319        self.status("Killing session")
320        svc = sf.getSessionService()
321        obj = omero.model.SessionI()
322        obj.uuid = omero.rtypes.rstring(self.uuid)
323        try:
324            while svc.closeSession(obj) > 0:
325                pass
326            # No action to be taken when iskill == False if
327            # we don't have an actual client to worry with.
328        except:
329            self.logger.error("Error on session cleanup, kill=%s" % self.iskill, exc_info = True)
330
331    def cleanup_output(self):
332        """
333        Flush and close the stderr and stdout streams.
334        """
335        try:
336            if hasattr(self, "stderr"):
337                self.stderr.flush()
338                self.stderr.close()
339        except:
340            self.logger.error("cleanup of sterr failed", exc_info = True)
341        try:
342            if hasattr(self, "stdout"):
343                self.stdout.flush()
344                self.stdout.close()
345        except:
346            self.logger.error("cleanup of sterr failed", exc_info = True)
347
348    def set_job_status(self, client):
349        """
350        Sets the job status
351        """
352        if not client:
353            self.logger.error("No client: Cannot set job status for pid=%s (%s)", self.pid, self.uuid)
354            return
355
356        gid = client.sf.getAdminService().getEventContext().groupId
357        handle = WithGroup(client.sf.createJobHandle(), gid)
358        try:
359            status = self.final_status
360            if status is None:
361                status = ( self.rcode == 0 and "Finished" or "Error" )
362            handle.attach(long(self.properties["omero.job"]))
363            oldStatus = handle.setStatus(status)
364            self.status("Changed job status from %s to %s" % (oldStatus, status))
365        finally:
366            handle.close()
367
368    def upload_output(self, client):
369        """
370        If this is not a params calculation (i.e. parms != null) and the
371        stdout or stderr are non-null, they they will be uploaded and
372        attached to the job.
373        """
374        if not client:
375            self.logger.error("No client: Cannot upload output for pid=%s (%s)", self.pid, self.uuid)
376            return
377
378        if self.params:
379            out_format = self.params.stdoutFormat
380            err_format = self.params.stderrFormat
381        else:
382            out_format = "text/plain"
383            err_format = out_format
384
385        self._upload(client, self.stdout_path, "stdout", out_format)
386        self._upload(client, self.stderr_path, "stderr", err_format)
387
388    def _upload(self, client, filename, name, format):
389
390        if not format:
391            return
392
393        filename = str(filename) # Might be path.path
394        sz = os.path.getsize(filename)
395        if not sz:
396            self.status("No %s" % name)
397            return
398
399        try:
400            ofile = client.upload(filename, name=name, type=format)
401            jobid = long(client.getProperty("omero.job"))
402            link = omero.model.JobOriginalFileLinkI()
403            if self.params is None:
404                link.parent = omero.model.ParseJobI(rlong(jobid), False)
405            else:
406                link.parent = omero.model.ScriptJobI(rlong(jobid), False)
407            link.child = ofile
408            client.getSession().getUpdateService().saveObject(link)
409            self.status("Uploaded %s bytes of %s to %s" % (sz, filename, ofile.id.val))
410        except:
411            self.logger.error("Error on upload of %s for pid=%s (%s)", filename, self.pid, self.uuid, exc_info = True)
412
413    def cleanup_tmpdir(self):
414        """
415        Remove all known files and finally the temporary directory.
416        If other files exist, an exception will be raised.
417        """
418        try:
419            remove_path(self.dir)
420        except:
421            self.logger.error("Failed to remove dir %s" % self.dir, exc_info = True)
422
423    #
424    # popen methods
425    #
426
427    def status(self, msg = ""):
428        if self.isRunning():
429            self.rcode = self.popen.poll()
430        self.logger.info("%s : %s", self, msg)
431
432    @perf
433    @remoted
434    def poll(self, current = None):
435        """
436        Checks popen.poll() (if active) and notifies all callbacks
437        if necessary. If this method returns a non-None value, then
438        the process will be marked inactive.
439        """
440
441        if self.alreadyDone():
442            return rint(self.rcode)
443
444        self.status("Polling")
445        if self.rcode is None:
446            # Haven't finished yet, so do nothing.
447            return None
448        else:
449            self.deactivate()
450            rv = rint(self.rcode)
451            self.allcallbacks("processFinished", self.rcode)
452            return rv
453
454    @perf
455    @remoted
456    def wait(self, current = None):
457        """
458        Waits on popen.wait() to return (if active) and notifies
459        all callbacks. Marks this process as inactive.
460        """
461
462        if self.alreadyDone():
463            return self.rcode
464
465        self.status("Waiting")
466        self.rcode = self.popen.wait()
467        self.deactivate()
468        self.allcallbacks("processFinished", self.rcode)
469        return self.rcode
470
471    def _term(self):
472        """
473        Attempts to cancel the process by sending SIGTERM
474        (or similar)
475        """
476        try:
477            self.status("os.kill(TERM)")
478            os.kill(self.popen.pid, signal.SIGTERM)
479        except AttributeError:
480            self.logger.debug("No os.kill(TERM). Skipping cancel")
481
482    def _send(self, iskill):
483        """
484        Helper method for sending signals. This method only
485        makes a call is the process is active.
486        """
487        if self.isRunning():
488            try:
489                if self.popen.poll() is None:
490                    if iskill:
491                        self.status("popen.kill(True)")
492                        self.popen.kill(True)
493                    else:
494                        self._term()
495
496                else:
497                    self.status("Skipped signal")
498            except OSError, oserr:
499                self.logger.debug("err on pid=%s iskill=%s : %s", self.popen.pid, iskill, oserr)
500
501    @perf
502    @remoted
503    def cancel(self, current = None):
504        """
505        Tries to cancel popen (if active) and notifies callbacks.
506        """
507
508        if self.alreadyDone():
509            return True
510
511        self.final_status = "Cancelled"
512        self._send(iskill=False)
513        finished = self.isFinished()
514        if finished:
515            self.deactivate()
516        self.allcallbacks("processCancelled", finished)
517        return finished
518
519    @perf
520    @remoted
521    def kill(self, current = None):
522
523        if self.alreadyDone():
524            return True
525
526        self.final_status = "Cancelled"
527        self._send(iskill=True)
528        finished = self.isFinished()
529        if finished:
530            self.deactivate()
531        self.allcallbacks("processKilled", finished)
532        return finished
533
534    @perf
535    @remoted
536    def shutdown(self, current = None):
537        """
538        If self.popen is active, then first call cancel, wait a period of
539        time, and finally call kill.
540        """
541
542        if self.alreadyDone():
543            return
544
545        self.status("Shutdown")
546        try:
547            for i in range(5, 0, -1):
548                if self.cancel():
549                    break
550                else:
551                    self.logger.warning("Shutdown: %s (%s). Killing in %s seconds.", self.pid, self.uuid, 6*(i-1)+1)
552                    self.stop_event.wait(6)
553            self.kill()
554        except:
555            self.logger.error("Shutdown failed: %s (%s)", self.pid, self.uuid, exc_info = True)
556
557    #
558    # Callbacks
559    #
560
561    @remoted
562    @locked
563    def registerCallback(self, callback, current = None):
564        try:
565            id = callback.ice_getIdentity()
566            key = "%s/%s" % (id.category, id.name)
567            callback = callback.ice_oneway()
568            callback = self.callback_cast(callback)
569            if not callback:
570                e = "Callback is invalid"
571            else:
572                self.callbacks[key] = callback
573                self.logger.debug("Added callback: %s", key)
574                return
575        except exceptions.Exception, ex:
576            e = ex
577        # Only reached on failure
578        msg = "Failed to add callback: %s. Reason: %s" % (callback, e)
579        self.logger.debug(msg)
580        raise omero.ApiUsageException(None, None, msg)
581
582    @remoted
583    @locked
584    def unregisterCallback(self, callback, current = None):
585        try:
586            id = callback.ice_getIdentity()
587            key = "%s/%s" % (id.category, id.name)
588            if not key in self.callback:
589                raise omero.ApiUsageException(None, None, "No callback registered with id: %s" % key)
590            del self.callbacks[key]
591            self.logger.debug("Removed callback: %s", key)
592        except exceptions.Exception, e:
593            msg = "Failed to remove callback: %s. Reason: %s" % (callback, e)
594            self.logger.debug(msg)
595            raise omero.ApiUsageException(None, None, msg)
596
597    @locked
598    def allcallbacks(self, method, arg):
599        self.status("Callback %s" % method)
600        for key, cb in self.callbacks.items():
601            try:
602                m = getattr(cb, method)
603                m(arg)
604            except Ice.LocalException, e:
605                self.logger.debug("LocalException calling callback %s on pid=%s (%s)" % (key, self.pid, self.uuid), exc_info = False)
606            except:
607                self.logger.error("Error calling callback %s on pid=%s (%s)" % (key, self.pid, self.uuid), exc_info = True)
608
609    def __str__(self):
610        return "<proc:%s,rc=%s,uuid=%s>" % (self.pid, (self.rcode is None and "-" or self.rcode), self.uuid)
611
612class UseSessionHolder(object):
613
614    def __init__(self, sf):
615        self.sf = sf
616
617    def check(self):
618        try:
619            self.sf.keepAlive(None)
620            return True
621        except:
622            return False
623
624    def cleanup(self):
625        pass
626
627class ProcessorI(omero.grid.Processor, omero.util.Servant):
628
629    def __init__(self, ctx, needs_session = True,
630                 use_session = None, accepts_list = [], cfg = None):
631
632        # Extensions for user-mode processors (ticket:1672)
633
634        self.use_session = use_session
635        """
636        If set, this session will be returned from internal_session and
637        the "needs_session" setting ignored.
638        """
639
640        if self.use_session:
641            needs_session = False
642
643        self.accepts_list = accepts_list
644        """
645        A list of contexts which will be accepted by this user-mode
646        processor.
647        """
648
649        omero.util.Servant.__init__(self, ctx, needs_session = needs_session)
650        if cfg is None:
651            self.cfg = os.path.join(os.curdir, "etc", "ice.config")
652            self.cfg = os.path.abspath(self.cfg)
653        else:
654            self.cfg = cfg
655
656        # Keep this session alive until the processor is finished
657        self.resources.add( UseSessionHolder(use_session) )
658
659    def setProxy(self, prx):
660        """
661        Overrides the default action in order to register this proxy
662        with the session's sharedResources to register for callbacks.
663        The on_newsession handler will also keep new sessions informed.
664
665        See ticket:2304
666        """
667        omero.util.Servant.setProxy(self, prx)
668        session = self.internal_session()
669        self.register_session(session)
670
671        # Keep other session informed
672        self.ctx.on_newsession = self.register_session
673
674    def user_client(self, agent):
675        """
676        Creates an omero.client instance for use by
677        users.
678        """
679        args = ["--Ice.Config=%s" % (self.cfg)]
680        rtr = self.internal_session().ice_getRouter()
681        if rtr:
682            args.insert(0, "--Ice.Default.Router=%s" % rtr) # FIXME : How do we find an internal router?
683        client = omero.client(args)
684        client.setAgent(agent)
685        return client
686
687    def internal_session(self):
688        """
689        Returns the session which should be used for lookups by this instance.
690        Some methods will create a session based on the session parameter.
691        In these cases, the session will belong to the user who is running a
692        script.
693        """
694        if self.use_session:
695            return self.use_session
696        else:
697            return self.ctx.getSession()
698
699    def register_session(self, session):
700        self.logger.info("Registering processor %s", self.prx)
701        prx = omero.grid.ProcessorPrx.uncheckedCast(self.prx)
702        session.sharedResources().addProcessor(prx)
703
704    def lookup(self, job):
705        sf = self.internal_session()
706        gid = job.details.group.id.val
707        handle = WithGroup(sf.createJobHandle(), gid)
708        try:
709            handle.attach(job.id.val)
710            if handle.jobFinished():
711                handle.close()
712                raise omero.ApiUsageException("Job already finished.")
713
714            prx = WithGroup(sf.getScriptService(), gid)
715            file = prx.validateScript(job, self.accepts_list)
716
717        except omero.SecurityViolation, sv:
718            self.logger.debug("SecurityViolation on validate job %s from group %s", job.id.val, gid)
719            file = None
720
721        return file, handle
722
723    @remoted
724    def willAccept(self, userContext, groupContext, scriptContext, cb, current = None):
725
726        userID = None
727        if userContext != None:
728            userID = userContext.id.val
729
730        groupID = None
731        if groupContext != None:
732            groupID = groupContext.id.val
733
734        scriptID = None
735        if scriptContext != None:
736            scriptID = scriptContext.id.val
737
738        if scriptID:
739            try:
740                file, handle = self.lookup(scriptContext)
741                handle.close()
742                valid = (file is not None)
743            except:
744                self.logger.error("File lookup failed: user=%s, group=%s, script=%s",\
745                    userID, groupID, scriptID, exc_info=1)
746                return # EARlY EXIT !
747        else:
748            valid = False
749            for x in self.accepts_list:
750                if isinstance(x, omero.model.Experimenter) and x.id.val == userID:
751                    valid = True
752                elif isinstance(x, omero.model.ExperimenterGroup) and x.id.val == groupID:
753                    valid = True
754
755        self.logger.debug("Accepts called on: user:%s group:%s scriptjob:%s - Valid: %s",
756            userID, groupID, scriptID, valid)
757
758        try:
759            id = self.internal_session().ice_getIdentity().name
760            cb = cb.ice_oneway()
761            cb = omero.grid.ProcessorCallbackPrx.uncheckedCast(cb)
762            cb.isAccepted(valid, id, str(self.prx))
763        except exceptions.Exception, e:
764            self.logger.warn("callback failed on willAccept: %s Exception:%s", cb, e)
765
766        return valid
767
768    @remoted
769    def requestRunning(self, cb, current = None):
770
771        try:
772            cb = cb.ice_oneway()
773            cb = omero.grid.ProcessorCallbackPrx.uncheckedCast(cb)
774            servants = list(self.ctx.servant_map.values())
775            rv = []
776            for x in servants:
777                if hasattr(x, "properties"):
778                    rv.append(long(x))
779            cb.responseRunning(rv)
780        except exceptions.Exception, e:
781            self.logger.warn("callback failed on requestRunning: %s Exception:%s", cb, e)
782
783
784    @remoted
785    def parseJob(self, session, job, current = None):
786        self.logger.info("parseJob: Session = %s, JobId = %s" % (session, job.id.val))
787        client = self.user_client("OMERO.parseJob")
788
789        try:
790            iskill = False
791            client.joinSession(session).detachOnDestroy()
792            properties = {}
793            properties["omero.scripts.parse"] = "true"
794            prx, process = self.process(client, session, job, current, None, properties, iskill)
795            process.wait()
796            rv = client.getOutput("omero.scripts.parse")
797            if rv != None:
798                return rv.val
799            else:
800                self.logger.warning("No output found for omero.scripts.parse. Keys: %s" % client.getOutputKeys())
801                return None
802        finally:
803            client.closeSession()
804            del client
805
806    @remoted
807    def processJob(self, session, params, job, current = None):
808        """
809        """
810        self.logger.info("processJob: Session = %s, JobId = %s" % (session, job.id.val))
811        client = self.user_client("OMERO.processJob")
812        try:
813            client.joinSession(session).detachOnDestroy()
814            prx, process = self.process(client, session, job, current, params, iskill = True)
815            return prx
816        finally:
817            client.closeSession()
818            del client
819
820
821    @perf
822    def process(self, client, session, job, current, params, properties = {}, iskill = True):
823        """
824        session: session uuid, used primarily if client is None
825        client: an omero.client object which should be attached to a session
826        """
827
828        if not session or not job or not job.id:
829            raise omero.ApiUsageException("No null arguments")
830
831        file, handle = self.lookup(job)
832
833        try:
834            if not file:
835                raise omero.ApiUsageException(\
836                    None, None, "Job should have one executable file attached.")
837
838            sf = self.internal_session()
839            if params:
840                self.logger.debug("Checking params for job %s" % job.id.val)
841                svc = sf.getSessionService()
842                inputs = svc.getInputs(session)
843                errors = omero.scripts.validate_inputs(params, inputs, svc, session)
844                if errors:
845                    errors = "Invalid parameters:\n%s" % errors
846                    raise omero.ValidationException(None, None, errors)
847
848            properties["omero.job"] = str(job.id.val)
849            properties["omero.user"] = session
850            properties["omero.pass"] = session
851            properties["Ice.Default.Router"] = client.getProperty("Ice.Default.Router")
852
853            process = ProcessI(self.ctx, "python", properties, params, iskill)
854            self.resources.add(process)
855
856            # client.download(file, str(process.script_path))
857            scriptText = sf.getScriptService().getScriptText(file.id.val)
858            process.script_path.write_bytes(scriptText)
859
860            self.logger.info("Downloaded file: %s" % file.id.val)
861            s = client.sha1(str(process.script_path))
862            if not s == file.sha1.val:
863                msg = "Sha1s don't match! expected %s, found %s" % (file.sha1.val, s)
864                self.logger.error(msg)
865                process.cleanup()
866                raise omero.InternalException(None, None, msg)
867            else:
868                process.activate()
869                handle.setStatus("Running")
870
871            prx = self.ctx.add_servant(current, process)
872            return omero.grid.ProcessPrx.uncheckedCast(prx), process
873
874        finally:
875            handle.close()
876
877def usermode_processor(client, serverid = "UsermodeProcessor",\
878                       cfg = None, accepts_list = None, stop_event = None):
879    """
880    Creates an activates a usermode processor for the given client.
881    It is the responsibility of the client to call "cleanup()" on
882    the ProcessorI implementation which is returned.
883
884    cfg is the path to an --Ice.Config-valid file or files. If none
885    is given, the value of ICE_CONFIG will be taken from the environment
886    if available. Otherwise, all properties will be taken from the client
887    instance.
888
889    accepts_list is the list of IObject instances which will be passed to
890    omero.api.IScripts.validateScript. If none is given, only the current
891    Experimenter's own object will be passed.
892
893    stop_event is an threading.Event. One will be acquired from
894    omero.util.concurrency.get_event if none is provided.
895    """
896
897    if cfg is None:
898        cfg = os.environ.get("ICE_CONFIG")
899
900    if accepts_list is None:
901        uid = client.sf.getAdminService().getEventContext().userId
902        accepts_list = [omero.model.ExperimenterI(uid, False)]
903
904    if stop_event is None:
905        stop_event = omero.util.concurrency.get_event()
906
907    ctx = omero.util.ServerContext(serverid, client.ic, stop_event)
908    impl = omero.processor.ProcessorI(ctx,
909        use_session=client.sf, accepts_list=accepts_list, cfg=cfg)
910    ctx.add_servant(client.adapter, impl)
911    return impl
Note: See TracBrowser for help on using the browser.

1.2.1-PRO © 2008-2009 agile42 all rights reserved (this page was served in: 0.215523 sec.)