• Views
  • Iteration Report
  • My Iteration Report
  •  
OMERO.server
  • Login
  • Help/Guide
  • About Trac
  • Preferences
  • Wiki
  • Timeline
  • Roadmap
  • Browse Source
  • View Tickets
  • Search

Context Navigation

  • Last Change
  • Annotate
  • Revision Log

root/trunk/components/tools/OmeroPy/src/omero/processor.py

Revision 3014, 13.0 kB (checked in by jmoore, 2 months ago)

ticket:1106 - Final OmeroPy testing for "protected" rtypes and details

  • Included getattr for "val" in all rtypes
  • Included getattr, setattr for all fields in DetailsI
  • Fixed a missing "self._" in combined.vm
  • Searched and replaced all omero.R constructors (hopefully)
  • Now using etc/ice.config for testing
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2#
3# OMERO Grid Processor
4# Copyright 2008 Glencoe Software, Inc.  All Rights Reserved.
5# Use is subject to license terms supplied in LICENSE.txt
6#
7
8import omero, Ice
9import os, signal, subprocess, sys, threading, tempfile, time, traceback
10from omero_model_OriginalFileI import OriginalFileI
11from omero.rtypes import *
12
13CONFIG="""
14Ice.ACM.Client=0
15Ice.MonitorConnections=60
16Ice.RetryIntervals=-1
17Ice.Warn.Connections=1
18Ice.ImplicitContext=Shared
19Ice.GC.Interval=60
20"""
21
22class Resources:
23    """
24    Container class for storing resources which should be
25    cleaned up on close.
26    """
27    def __init__(self):
28        self.stuff = []
29    def add(self, object, cleanupMethod = "cleanup"):
30        lock = threading.RLock()
31        lock.acquire()
32        try:
33            self.stuff.append((object,cleanupMethod))
34        finally:
35            lock.release()
36    def cleanupNext(self):
37        lock = threading.RLock()
38        lock.acquire()
39        try:
40            try:
41                if len(self.stuff) > 0:
42                    m = self.stuff.pop(0)
43                    method = getattr(m[0],m[1])
44                    method()
45                    return len(self.stuff) > 0
46                else:
47                    return False
48            except:
49                print "Error cleaning resource:",m
50                traceback.print_exc()
51        finally:
52            lock.release()
53
54class Environment:
55    """
56    Simple class for creating an executable environment
57    """
58
59    def __init__(self,*args):
60        """
61        Takes an number of environment variable names which
62        should be copied to the target environment if present
63        in the current execution environment.
64        """
65        self.env = {}
66        for arg in args:
67            if os.environ.has_key(arg):
68                self.env[arg] = os.environ[arg]
69    def __call__(self):
70        """
71        Returns the environment map when called.
72        """
73        return self.env
74
75    def set(self, key, value):
76        """
77        Manually sets a value in the target environment.
78        """
79        self.env[key] = value
80
81    def append(self, key, addition):
82        """
83        Manually adds a value to the environment string
84        """
85        if self.env.has_key(key):
86            self.env[key] = os.pathsep.join([self.env[key], addition])
87        else:
88            self.set(key, addition)
89
90class ProcessI(omero.grid.Process):
91    """
92    Wrapper around a subprocess.Popen instance. Returned by ProcessorI
93    when a job is submitted. This implementation uses the given
94    interpreter to call a file that must be named "script" in the
95    generated temporary directory.
96
97    Call is equivalent to:
98
99    cd TMP_DIR
100    ICE_CONFIG=./config interpreter ./script >out 2>err &
101
102    The properties argument is used to generate the ./config file.
103   
104    The params argument may be null in which case this process
105    is being used solely to calculate the parameters for the script
106    ("omero.scripts.parse=true")
107
108    """
109
110    def __init__(self, interpreter, properties, params, log):
111        self.active = False
112        self.dead = False
113        self.interpreter = interpreter
114        self.properties = properties
115        self.params = params
116        self.log = log
117        # Non arguments
118        self.callbacks = []
119        self.lock = threading.Lock()
120        self.dir = tempfile.mkdtemp()
121        self.script_name = os.path.join(self.dir, "script")
122        self.config_name = os.path.join(self.dir, "config")
123        self.stdout_name = os.path.join(self.dir, "out")
124        self.stderr_name = os.path.join(self.dir, "err")
125        self.env = Environment("PATH","PYTHONPATH","DYLD_LIBRARY_PATH","LD_LIBRARY_PATH","MLABRAW_CMD_STR")
126        self.env.set("ICE_CONFIG", self.config_name)
127        # WORKAROUND
128        # Currently duplicating the logic here as in the PYTHONPATH
129        # setting of the grid application descriptor (see etc/grid/*.xml)
130        # This should actually be taken care of in the descriptor itself
131        # by having setting PYTHONPATH to an absolute value. This is
132        # not currently possible with IceGrid (without using icepatch --
133        # see 39.17.2 "node.datadir).
134        self.env.append("PYTHONPATH", os.path.join(os.getcwd(), "lib"))
135        self.make_config()
136
137    def activate(self):
138        """
139        Process creation has to wait until all external downloads, etc
140        are finished.
141        """
142        self.stdout = open(self.stdout_name, "w")
143        self.stderr = open(self.stderr_name, "w")
144        self.popen = subprocess.Popen([self.interpreter, "./script"], cwd=self.dir, env=self.env(), stdout=self.stdout, stderr=self.stderr)
145        self.active = True
146
147    def __del__(self):
148        """
149        Cleans up the temporary directory used by the process, and terminates
150        the Popen process if running.
151        """
152        try:
153            self.lock.acquire()
154            if not self.dead:
155                self.cleanup_popen()
156                self.cleanup_output()
157                self.upload_output()
158                self.cleanup_tmpdir()
159        finally:
160            self.dead = True
161            self.lock.release()
162
163    def cleanup_popen(self):
164        """
165        If self.popen is active, then first call cancel, wait a period of
166        time, and finally call kill.
167        """
168        if hasattr(self, "popen") and None == self.popen.poll():
169            self.cancel()
170
171            for i in range(5,0,-1):
172                time.sleep(6)
173                if None != self.popen.poll():
174                    self.log.warning("Process %s terminated cleanly." % str(self.popen.pid))
175                    break
176                else:
177                    self.log.warning("%s still active. Killing in %s seconds." % (str(self.popen.pid),6*(i-1)+1))
178            self.kill()
179
180    def cleanup_output(self):
181        """
182        Flush and close the stderr and stdout streams.
183        """
184        if hasattr(self, "stderr"):
185            self.stderr.flush()
186            self.stderr.close()
187        if hasattr(self, "stdout"):
188            self.stdout.flush()
189            self.stdout.close()
190
191    def upload_output(self):
192        """
193        If this is not a params calculation (i.e. parms != null) and the
194        stdout or stderr are non-null, they they will be uploaded and
195        attached to the job.
196        """
197        if self.params:
198            client = omero.client(["--Ice.Config=%s" % self.config_name])
199            client.createSession()
200            self._upload(client, self.stdout_name, "stdout", self.params.stdoutFormat)
201            self._upload(client, self.stderr_name, "stderr", self.params.stderrFormat)
202
203    def _upload(self, client, filename, name, format):
204        if format:
205            if os.path.getsize(filename):
206                ofile = client.upload(filename, name=name, type=format)
207                jobid = long(client.getProperty("omero.job"))
208                link = omero.model.JobOriginalFileLinkI()
209                link.parent = omero.model.ScriptJobI(rlong(jobid), False)
210                link.child = ofile
211                client.getSession().getUpdateService().saveObject(link)
212
213    def cleanup_tmpdir(self):
214        """
215        Remove all known files and finally the temporary directory.
216        If other files exist, an exception will be raised.
217        """
218        for path in [self.config_name, self.stdout_name, self.stderr_name, self.script_name]:
219            if os.path.exists(path):
220                os.remove(path)
221        os.removedirs(self.dir)
222
223    def make_config(self):
224        """
225        Creates the ICE_CONFIG file used by the client.
226        """
227        config_file = open(self.config_name, "w")
228        try:
229            config_file.write(CONFIG)
230            for key in self.properties.iterkeys():
231                config_file.write("%s=%s\n"%(key, self.properties[key]))
232        finally:
233            config_file.close()
234
235    def poll(self, current = None):
236        rv = self.popen.poll()
237        if None == rv:
238            return None
239        rv = rint(rv)
240        self.allcallbacks("processFinished", rv)
241        self.__del__()
242        return rv
243
244    def wait(self, current = None):
245        rv = self.popen.wait()
246        self.allcallbacks("processFinished",rv)
247        self.__del__()
248        return rv
249
250    def _send(self, sig):
251        if None == self.popen.poll():
252            try:
253                os.kill(self.popen.pid, sig)
254            except OSError, oserr:
255                # Already gone
256                pass
257        return self.popen.poll() != None
258
259    def cancel(self, current = None):
260        rv = self._send(signal.SIGTERM)
261        self.allcallbacks("processCancelled", rv)
262        if rv:
263            self.__del__()
264        return rv
265
266    def kill(self, current = None):
267        rv = self._send(signal.SIGKILL)
268        self.allcallbacks("processKilled", rv)
269        self.__del__()
270        return rv
271
272    def registerCallback(self, callback, current = None):
273        self.lock.acquire()
274        try:
275            self.callbacks.append(callback)
276        finally:
277            self.lock.release()
278
279    def unregisterCallback(self, callback, current = None):
280        self.lock.acquire()
281        try:
282            self.callbacks.remove(callback)
283        finally:
284            self.lock.release()
285
286    def allcallbacks(self, method, arg):
287        self.lock.acquire()
288        try:
289                for cb in self.callbacks:
290                    try:
291                        m = getattr(cb,method)
292                        m(arg)
293                    except:
294                        print "Error calling callback " + str(cb)
295        finally:
296            self.lock.release()
297
298class ProcessorI(omero.grid.Processor):
299
300    def __init__(self, log):
301        self.log = log
302        self.cfg = os.path.join(os.curdir, "etc", "ice.config")
303        self.cfg = os.path.abspath(self.cfg)
304        self.resources = Resources()
305
306    def parseJob(self, session, job, current = None):
307        properties = {}
308        properties["omero.scripts.parse"] = "true"
309        process = self.process(session, job, current, None, properties)
310        process.wait()
311        client = omero.client(["--Ice.Config=%s" % (self.cfg)])
312        client.joinSession(session)
313        rv = client.getOutput("omero.scripts.parse")
314        if rv != None:
315            return rv.val
316        return None
317
318    def processJob(self, session, job, current = None):
319        """
320        """
321        params = self.parseJob(session, job, current)
322        return self.process(session, job, current, params)
323
324    def process(self, session, job, current, params, properties = {}):
325        client = omero.client(["--Ice.Config=%s" % (self.cfg)])
326        sf = client.createSession(session, session)
327        ec = sf.getAdminService().getEventContext()
328
329        handle = sf.createJobHandle()
330        handle.attach(job.id.val)
331        if handle.jobFinished():
332            raise omero.ApiUsageException("Job already finished.")
333
334        file = sf.getQueryService().findByQuery(\
335            """select o from Job j
336             join j.originalFileLinks links
337             join links.child o
338             join o.format
339             where
340                 j.id = %d
341             and o.details.owner.id = 0
342             and o.format.value = 'text/x-python'
343             """ % job.id.val, None)
344
345
346        if not file:
347            raise omero.ApiUsageException(\
348                None, None, "Job should have one executable file attached.")
349
350        properties["omero.job"]  = str(job.id.val)
351        properties["omero.user"] = session
352        properties["omero.pass"] = session
353        properties["Ice.Default.Router"] = client.getProperty("Ice.Default.Router")
354
355        process = ProcessI("python", properties, params, self.log)
356        self.resources.add(process,"__del__")
357        client.download(file, process.script_name)
358        process.activate()
359
360        prx = current.adapter.addWithUUID(process)
361        return omero.grid.ProcessPrx.uncheckedCast(prx)
362
363    def cleanup(self):
364        """
365        Cleanups all resoures created by this Processor, namely
366        the Process instances.
367        """
368        while self.resources.cleanupNext():
369            pass
370
371class Server(Ice.Application):
372    """
373    Basic server implementation
374    """
375    def run(self,args):
376        self.shutdownOnInterrupt()
377        self.objectfactory = omero.ObjectFactory()
378        self.objectfactory.registerObjectFactory(self.communicator())
379        for of in ObjectFactories.values():
380            of.register(self.communicator())
381        self.adapter = self.communicator().createObjectAdapter("ProcessorAdapter")
382        self.p = ProcessorI(self.communicator().getLogger())
383        self.p.serverid = self.communicator().getProperties().getProperty("Ice.ServerId")
384
385        self.adapter.add(self.p, Ice.Identity("Processor",""))
386        self.adapter.activate()
387        self.communicator().waitForShutdown()
388        self.cleanup()
389
390    def cleanup(self):
391        """
392        Cleans up all resources that were created by this server.
393        Primarily the one ProcessorI instance.
394        """
395        if hasattr(self,"p"):
396            try:
397                self.p.cleanup()
398            finally:
399                self.p = None
400
401if __name__ == "__main__":
402    app=Server()
403    sys.exit(app.main(sys.argv))
404
Note: See TracBrowser for help on using the browser.

Download in other formats:

  • Plain Text
  • Original Format

Trac Powered

Powered by Trac 0.11
By Edgewall Software.

Visit the Trac open source project at
http://trac.edgewall.org/