root/trunk/components/tools/OmeroPy/src/omero/processor.py
| Revision 3014, 13.0 kB (checked in by jmoore, 2 months ago) | |
|---|---|
|
|
| Line | |
|---|---|
| 1 | #!/usr/bin/env python |
| 2 | # |
| 3 | # OMERO Grid Processor |
| 4 | # Copyright 2008 Glencoe Software, Inc. All Rights Reserved. |
| 5 | # Use is subject to license terms supplied in LICENSE.txt |
| 6 | # |
| 7 | |
| 8 | import omero, Ice |
| 9 | import os, signal, subprocess, sys, threading, tempfile, time, traceback |
| 10 | from omero_model_OriginalFileI import OriginalFileI |
| 11 | from omero.rtypes import * |
| 12 | |
| 13 | CONFIG=""" |
| 14 | Ice.ACM.Client=0 |
| 15 | Ice.MonitorConnections=60 |
| 16 | Ice.RetryIntervals=-1 |
| 17 | Ice.Warn.Connections=1 |
| 18 | Ice.ImplicitContext=Shared |
| 19 | Ice.GC.Interval=60 |
| 20 | """ |
| 21 | |
| 22 | class Resources: |
| 23 | """ |
| 24 | Container class for storing resources which should be |
| 25 | cleaned up on close. |
| 26 | """ |
| 27 | def __init__(self): |
| 28 | self.stuff = [] |
| 29 | def add(self, object, cleanupMethod = "cleanup"): |
| 30 | lock = threading.RLock() |
| 31 | lock.acquire() |
| 32 | try: |
| 33 | self.stuff.append((object,cleanupMethod)) |
| 34 | finally: |
| 35 | lock.release() |
| 36 | def cleanupNext(self): |
| 37 | lock = threading.RLock() |
| 38 | lock.acquire() |
| 39 | try: |
| 40 | try: |
| 41 | if len(self.stuff) > 0: |
| 42 | m = self.stuff.pop(0) |
| 43 | method = getattr(m[0],m[1]) |
| 44 | method() |
| 45 | return len(self.stuff) > 0 |
| 46 | else: |
| 47 | return False |
| 48 | except: |
| 49 | print "Error cleaning resource:",m |
| 50 | traceback.print_exc() |
| 51 | finally: |
| 52 | lock.release() |
| 53 | |
| 54 | class Environment: |
| 55 | """ |
| 56 | Simple class for creating an executable environment |
| 57 | """ |
| 58 | |
| 59 | def __init__(self,*args): |
| 60 | """ |
| 61 | Takes an number of environment variable names which |
| 62 | should be copied to the target environment if present |
| 63 | in the current execution environment. |
| 64 | """ |
| 65 | self.env = {} |
| 66 | for arg in args: |
| 67 | if os.environ.has_key(arg): |
| 68 | self.env[arg] = os.environ[arg] |
| 69 | def __call__(self): |
| 70 | """ |
| 71 | Returns the environment map when called. |
| 72 | """ |
| 73 | return self.env |
| 74 | |
| 75 | def set(self, key, value): |
| 76 | """ |
| 77 | Manually sets a value in the target environment. |
| 78 | """ |
| 79 | self.env[key] = value |
| 80 | |
| 81 | def append(self, key, addition): |
| 82 | """ |
| 83 | Manually adds a value to the environment string |
| 84 | """ |
| 85 | if self.env.has_key(key): |
| 86 | self.env[key] = os.pathsep.join([self.env[key], addition]) |
| 87 | else: |
| 88 | self.set(key, addition) |
| 89 | |
| 90 | class ProcessI(omero.grid.Process): |
| 91 | """ |
| 92 | Wrapper around a subprocess.Popen instance. Returned by ProcessorI |
| 93 | when a job is submitted. This implementation uses the given |
| 94 | interpreter to call a file that must be named "script" in the |
| 95 | generated temporary directory. |
| 96 | |
| 97 | Call is equivalent to: |
| 98 | |
| 99 | cd TMP_DIR |
| 100 | ICE_CONFIG=./config interpreter ./script >out 2>err & |
| 101 | |
| 102 | The properties argument is used to generate the ./config file. |
| 103 | |
| 104 | The params argument may be null in which case this process |
| 105 | is being used solely to calculate the parameters for the script |
| 106 | ("omero.scripts.parse=true") |
| 107 | |
| 108 | """ |
| 109 | |
| 110 | def __init__(self, interpreter, properties, params, log): |
| 111 | self.active = False |
| 112 | self.dead = False |
| 113 | self.interpreter = interpreter |
| 114 | self.properties = properties |
| 115 | self.params = params |
| 116 | self.log = log |
| 117 | # Non arguments |
| 118 | self.callbacks = [] |
| 119 | self.lock = threading.Lock() |
| 120 | self.dir = tempfile.mkdtemp() |
| 121 | self.script_name = os.path.join(self.dir, "script") |
| 122 | self.config_name = os.path.join(self.dir, "config") |
| 123 | self.stdout_name = os.path.join(self.dir, "out") |
| 124 | self.stderr_name = os.path.join(self.dir, "err") |
| 125 | self.env = Environment("PATH","PYTHONPATH","DYLD_LIBRARY_PATH","LD_LIBRARY_PATH","MLABRAW_CMD_STR") |
| 126 | self.env.set("ICE_CONFIG", self.config_name) |
| 127 | # WORKAROUND |
| 128 | # Currently duplicating the logic here as in the PYTHONPATH |
| 129 | # setting of the grid application descriptor (see etc/grid/*.xml) |
| 130 | # This should actually be taken care of in the descriptor itself |
| 131 | # by having setting PYTHONPATH to an absolute value. This is |
| 132 | # not currently possible with IceGrid (without using icepatch -- |
| 133 | # see 39.17.2 "node.datadir). |
| 134 | self.env.append("PYTHONPATH", os.path.join(os.getcwd(), "lib")) |
| 135 | self.make_config() |
| 136 | |
| 137 | def activate(self): |
| 138 | """ |
| 139 | Process creation has to wait until all external downloads, etc |
| 140 | are finished. |
| 141 | """ |
| 142 | self.stdout = open(self.stdout_name, "w") |
| 143 | self.stderr = open(self.stderr_name, "w") |
| 144 | self.popen = subprocess.Popen([self.interpreter, "./script"], cwd=self.dir, env=self.env(), stdout=self.stdout, stderr=self.stderr) |
| 145 | self.active = True |
| 146 | |
| 147 | def __del__(self): |
| 148 | """ |
| 149 | Cleans up the temporary directory used by the process, and terminates |
| 150 | the Popen process if running. |
| 151 | """ |
| 152 | try: |
| 153 | self.lock.acquire() |
| 154 | if not self.dead: |
| 155 | self.cleanup_popen() |
| 156 | self.cleanup_output() |
| 157 | self.upload_output() |
| 158 | self.cleanup_tmpdir() |
| 159 | finally: |
| 160 | self.dead = True |
| 161 | self.lock.release() |
| 162 | |
| 163 | def cleanup_popen(self): |
| 164 | """ |
| 165 | If self.popen is active, then first call cancel, wait a period of |
| 166 | time, and finally call kill. |
| 167 | """ |
| 168 | if hasattr(self, "popen") and None == self.popen.poll(): |
| 169 | self.cancel() |
| 170 | |
| 171 | for i in range(5,0,-1): |
| 172 | time.sleep(6) |
| 173 | if None != self.popen.poll(): |
| 174 | self.log.warning("Process %s terminated cleanly." % str(self.popen.pid)) |
| 175 | break |
| 176 | else: |
| 177 | self.log.warning("%s still active. Killing in %s seconds." % (str(self.popen.pid),6*(i-1)+1)) |
| 178 | self.kill() |
| 179 | |
| 180 | def cleanup_output(self): |
| 181 | """ |
| 182 | Flush and close the stderr and stdout streams. |
| 183 | """ |
| 184 | if hasattr(self, "stderr"): |
| 185 | self.stderr.flush() |
| 186 | self.stderr.close() |
| 187 | if hasattr(self, "stdout"): |
| 188 | self.stdout.flush() |
| 189 | self.stdout.close() |
| 190 | |
| 191 | def upload_output(self): |
| 192 | """ |
| 193 | If this is not a params calculation (i.e. parms != null) and the |
| 194 | stdout or stderr are non-null, they they will be uploaded and |
| 195 | attached to the job. |
| 196 | """ |
| 197 | if self.params: |
| 198 | client = omero.client(["--Ice.Config=%s" % self.config_name]) |
| 199 | client.createSession() |
| 200 | self._upload(client, self.stdout_name, "stdout", self.params.stdoutFormat) |
| 201 | self._upload(client, self.stderr_name, "stderr", self.params.stderrFormat) |
| 202 | |
| 203 | def _upload(self, client, filename, name, format): |
| 204 | if format: |
| 205 | if os.path.getsize(filename): |
| 206 | ofile = client.upload(filename, name=name, type=format) |
| 207 | jobid = long(client.getProperty("omero.job")) |
| 208 | link = omero.model.JobOriginalFileLinkI() |
| 209 | link.parent = omero.model.ScriptJobI(rlong(jobid), False) |
| 210 | link.child = ofile |
| 211 | client.getSession().getUpdateService().saveObject(link) |
| 212 | |
| 213 | def cleanup_tmpdir(self): |
| 214 | """ |
| 215 | Remove all known files and finally the temporary directory. |
| 216 | If other files exist, an exception will be raised. |
| 217 | """ |
| 218 | for path in [self.config_name, self.stdout_name, self.stderr_name, self.script_name]: |
| 219 | if os.path.exists(path): |
| 220 | os.remove(path) |
| 221 | os.removedirs(self.dir) |
| 222 | |
| 223 | def make_config(self): |
| 224 | """ |
| 225 | Creates the ICE_CONFIG file used by the client. |
| 226 | """ |
| 227 | config_file = open(self.config_name, "w") |
| 228 | try: |
| 229 | config_file.write(CONFIG) |
| 230 | for key in self.properties.iterkeys(): |
| 231 | config_file.write("%s=%s\n"%(key, self.properties[key])) |
| 232 | finally: |
| 233 | config_file.close() |
| 234 | |
| 235 | def poll(self, current = None): |
| 236 | rv = self.popen.poll() |
| 237 | if None == rv: |
| 238 | return None |
| 239 | rv = rint(rv) |
| 240 | self.allcallbacks("processFinished", rv) |
| 241 | self.__del__() |
| 242 | return rv |
| 243 | |
| 244 | def wait(self, current = None): |
| 245 | rv = self.popen.wait() |
| 246 | self.allcallbacks("processFinished",rv) |
| 247 | self.__del__() |
| 248 | return rv |
| 249 | |
| 250 | def _send(self, sig): |
| 251 | if None == self.popen.poll(): |
| 252 | try: |
| 253 | os.kill(self.popen.pid, sig) |
| 254 | except OSError, oserr: |
| 255 | # Already gone |
| 256 | pass |
| 257 | return self.popen.poll() != None |
| 258 | |
| 259 | def cancel(self, current = None): |
| 260 | rv = self._send(signal.SIGTERM) |
| 261 | self.allcallbacks("processCancelled", rv) |
| 262 | if rv: |
| 263 | self.__del__() |
| 264 | return rv |
| 265 | |
| 266 | def kill(self, current = None): |
| 267 | rv = self._send(signal.SIGKILL) |
| 268 | self.allcallbacks("processKilled", rv) |
| 269 | self.__del__() |
| 270 | return rv |
| 271 | |
| 272 | def registerCallback(self, callback, current = None): |
| 273 | self.lock.acquire() |
| 274 | try: |
| 275 | self.callbacks.append(callback) |
| 276 | finally: |
| 277 | self.lock.release() |
| 278 | |
| 279 | def unregisterCallback(self, callback, current = None): |
| 280 | self.lock.acquire() |
| 281 | try: |
| 282 | self.callbacks.remove(callback) |
| 283 | finally: |
| 284 | self.lock.release() |
| 285 | |
| 286 | def allcallbacks(self, method, arg): |
| 287 | self.lock.acquire() |
| 288 | try: |
| 289 | for cb in self.callbacks: |
| 290 | try: |
| 291 | m = getattr(cb,method) |
| 292 | m(arg) |
| 293 | except: |
| 294 | print "Error calling callback " + str(cb) |
| 295 | finally: |
| 296 | self.lock.release() |
| 297 | |
| 298 | class ProcessorI(omero.grid.Processor): |
| 299 | |
| 300 | def __init__(self, log): |
| 301 | self.log = log |
| 302 | self.cfg = os.path.join(os.curdir, "etc", "ice.config") |
| 303 | self.cfg = os.path.abspath(self.cfg) |
| 304 | self.resources = Resources() |
| 305 | |
| 306 | def parseJob(self, session, job, current = None): |
| 307 | properties = {} |
| 308 | properties["omero.scripts.parse"] = "true" |
| 309 | process = self.process(session, job, current, None, properties) |
| 310 | process.wait() |
| 311 | client = omero.client(["--Ice.Config=%s" % (self.cfg)]) |
| 312 | client.joinSession(session) |
| 313 | rv = client.getOutput("omero.scripts.parse") |
| 314 | if rv != None: |
| 315 | return rv.val |
| 316 | return None |
| 317 | |
| 318 | def processJob(self, session, job, current = None): |
| 319 | """ |
| 320 | """ |
| 321 | params = self.parseJob(session, job, current) |
| 322 | return self.process(session, job, current, params) |
| 323 | |
| 324 | def process(self, session, job, current, params, properties = {}): |
| 325 | client = omero.client(["--Ice.Config=%s" % (self.cfg)]) |
| 326 | sf = client.createSession(session, session) |
| 327 | ec = sf.getAdminService().getEventContext() |
| 328 | |
| 329 | handle = sf.createJobHandle() |
| 330 | handle.attach(job.id.val) |
| 331 | if handle.jobFinished(): |
| 332 | raise omero.ApiUsageException("Job already finished.") |
| 333 | |
| 334 | file = sf.getQueryService().findByQuery(\ |
| 335 | """select o from Job j |
| 336 | join j.originalFileLinks links |
| 337 | join links.child o |
| 338 | join o.format |
| 339 | where |
| 340 | j.id = %d |
| 341 | and o.details.owner.id = 0 |
| 342 | and o.format.value = 'text/x-python' |
| 343 | """ % job.id.val, None) |
| 344 | |
| 345 | |
| 346 | if not file: |
| 347 | raise omero.ApiUsageException(\ |
| 348 | None, None, "Job should have one executable file attached.") |
| 349 | |
| 350 | properties["omero.job"] = str(job.id.val) |
| 351 | properties["omero.user"] = session |
| 352 | properties["omero.pass"] = session |
| 353 | properties["Ice.Default.Router"] = client.getProperty("Ice.Default.Router") |
| 354 | |
| 355 | process = ProcessI("python", properties, params, self.log) |
| 356 | self.resources.add(process,"__del__") |
| 357 | client.download(file, process.script_name) |
| 358 | process.activate() |
| 359 | |
| 360 | prx = current.adapter.addWithUUID(process) |
| 361 | return omero.grid.ProcessPrx.uncheckedCast(prx) |
| 362 | |
| 363 | def cleanup(self): |
| 364 | """ |
| 365 | Cleanups all resoures created by this Processor, namely |
| 366 | the Process instances. |
| 367 | """ |
| 368 | while self.resources.cleanupNext(): |
| 369 | pass |
| 370 | |
| 371 | class Server(Ice.Application): |
| 372 | """ |
| 373 | Basic server implementation |
| 374 | """ |
| 375 | def run(self,args): |
| 376 | self.shutdownOnInterrupt() |
| 377 | self.objectfactory = omero.ObjectFactory() |
| 378 | self.objectfactory.registerObjectFactory(self.communicator()) |
| 379 | for of in ObjectFactories.values(): |
| 380 | of.register(self.communicator()) |
| 381 | self.adapter = self.communicator().createObjectAdapter("ProcessorAdapter") |
| 382 | self.p = ProcessorI(self.communicator().getLogger()) |
| 383 | self.p.serverid = self.communicator().getProperties().getProperty("Ice.ServerId") |
| 384 | |
| 385 | self.adapter.add(self.p, Ice.Identity("Processor","")) |
| 386 | self.adapter.activate() |
| 387 | self.communicator().waitForShutdown() |
| 388 | self.cleanup() |
| 389 | |
| 390 | def cleanup(self): |
| 391 | """ |
| 392 | Cleans up all resources that were created by this server. |
| 393 | Primarily the one ProcessorI instance. |
| 394 | """ |
| 395 | if hasattr(self,"p"): |
| 396 | try: |
| 397 | self.p.cleanup() |
| 398 | finally: |
| 399 | self.p = None |
| 400 | |
| 401 | if __name__ == "__main__": |
| 402 | app=Server() |
| 403 | sys.exit(app.main(sys.argv)) |
| 404 |
Note: See TracBrowser
for help on using the browser.
