#!/usr/bin/python2

#
# Copyright (C) 2004  Vrije Universiteit, Amsterdam, The Netherlands.
# For full copyright and restrictions on use, see the file COPYRIGHT
# in the Grun software distribution.
#
# Author: Kees Verstoep (versto@cs.vu.nl)
#

import getopt, sys, os, string, time, posix, signal
from threading import Condition, Lock, Thread, Event

# Avoid specific warnings on python2.2 installs; it seems to work fine..
import warnings
warnings.filterwarnings('ignore','Python C API.*')

# By default assume that pyGlobus is installed in the $GLOBUS_LOCATION/python
# tree; if not, it will have to be added to PYTHONPATH by the user.
if "GLOBUS_LOCATION" in os.environ.keys():
    globus_loc = os.environ["GLOBUS_LOCATION"]
    version = sys.version[:3]
    sys.path.append(globus_loc +
                    ("/python/lib/python%s/site-packages" % version))

from pyGlobus import gramClient
from pyGlobus.gramClient import *
from pyGlobus.gassServerEZ import *
from pyGlobus.logUtil import logging

from globus_support import gram_error

# Allow user to override griddefs:
if "HOME" in os.environ.keys():
    home_loc = os.environ["HOME"]
    sys.path.insert(0, home_loc)
import griddefs

def new_job_number():
    outdir = "grun.out"
    if os.path.exists(outdir):
        names = os.listdir(outdir)
        maxjob = 0
        for j in names:
            if string.find(j, "job-") == 0:
                strnum = string.replace(j, "job-", "")
                num = int(strnum)
                if num > maxjob:
                    maxjob = num
        return maxjob + 1
    else:
        return 0
    
# Globals/options put in a separate class for clarity
class Global:
    app             = "grun"
    grun_home       = "/usr/local/package/grun"
    if "GRUN_HOME" in os.environ.keys():
        grun_home = os.environ["GRUN_HOME"]
    def_cpucount    = 0 # defaults to 1, unless hostcount is set
    def_hostcount   = 0 # defaults to 1, unless cpucount is set
    environment     = []
    jobnumber       = new_job_number()  # was: os.getpid()
    jobtry          = 0
    maxretry        = 5
    out_on          = 1
    extern_out      = 0
    command_list    = []
    precommand_list = []
    postcommand_list= []
    logger          = None
    logger_on       = 0
    logger_level    = logging.WARNING
    preexec_command = None
    startup_time    = 3
    wall_time       = 10
    exit_time       = 3
    watchdog_timer  = None
    stage_in        = []
    stage_in_shared = []
    stage_out       = []
    terminated      = 0
    exitcode        = 0
    timed_out       = 0
    interrupted     = 0
    interrupt_ack   = 0

def new_job_token():
    Global.jobtry += 1
    Global.jobtoken = "%d-%d" % (Global.jobnumber, Global.jobtry)

def loginfo(string):
    msg = "%s: %s" % (Global.app, string)
    if Global.logger == None:
        if logging.INFO >= Global.logger_level:
            print msg
    else:
        Global.logger.info(msg)

def logdebug(string):
    msg = "%s: %s" % (Global.app, string)
    if Global.logger == None:
        if logging.DEBUG >= Global.logger_level:
            print msg
    else:
        Global.logger.debug(msg)

def logerror(string):
    msg = "%s: %s" % (Global.app, string)
    if Global.logger == None:
        if logging.ERROR >= Global.logger_level:
            print msg
    else:
        Global.logger.error(msg)

def terminate(exitcode):
    # Let main thread know we're done.
    Global.exitcode = exitcode
    Global.terminated = 1

def timedout(exitcode):
    # Let main thread know we've timed out.
    Global.exitcode = exitcode
    Global.timed_out = 1

def must_quit():
    return Global.terminated or Global.interrupted or Global.timed_out

class Task:
    numtasks = 0
    alltasks = {}
    def __init__(self, host):
        self.host       = host
        self.iosvr      = None
        self.outdir     = None
        self.statusfile = None
        self.number     = Task.numtasks
        Task.numtasks += 1

def newTask(host):
    if Task.alltasks.has_key(host):
        return Task.alltasks[host]
    task = Task(host)
    Task.alltasks[host] = task
    return task

class Globus_command:
    def __init__(self, rm, rsl, cpucount, hostcount):
        self.rm        = rm
        self.rsl       = rsl
        self.fullrsl   = rsl
        self.cpucount  = cpucount
        self.hostcount = hostcount
        self.env       = []
        self.job       = None
        self.site      = None
        self.stage_in        = []
        self.stage_in_shared = []
        self.stage_out       = []
        rmlist = string.split(rm, "/", 1)
        if len(rmlist) == 1:
            host = self.rm
            self.rsrc = "" # default jobmanager
        else:
            host = rmlist[0]
            self.rsrc = rmlist[1]
        self.task = newTask(host)

    def writeline(self, str):
        if self.task.statusfile != None:
            self.task.statusfile.write(str + "\n")
        
    def debug(self, str):
        logdebug(str)
        self.writeline(str)

    def info(self, str):
        loginfo(str)
        self.writeline(str)

    def error(self, str):
        logerror(str)
        self.writeline(str)

    def printjobinfo(self):
        self.writeline("host = %s" % self.task.host)
        self.writeline("rm = %s" % self.rm)
        self.writeline("rsl = %s" % self.rsl)
        if self.cpucount != 0:
            self.writeline("cpucount = %d" % self.cpucount)
        if self.hostcount != 0:
            self.writeline("hostcount = %d" % self.hostcount)
    
class Globus_job:
    def __init__(self, cmd, gc, state, cv, callback, jobcontact):
        self.cmd = cmd
        self.gc = gc
        self.state = state
        self.cv = cv
        self.callback = callback
        self.jobcontact = jobcontact

    def update_state(self, newstate):
        self.cv.acquire()
        self.state = newstate
        self.cv.notify()
        self.cv.release()

    def get_state(self):
        self.cv.acquire()
        curstate = self.state
        self.cv.release()
        return curstate


def job_submitted_upcall(job, jobcontact, state, newerr, newerr2 = None):
    # callback called when the asynchronously started job has
    # actually submitted.
    cmd = job.cmd
    cmd.debug("%s: job submitted; contact %s" %
              (cmd.task.host, jobcontact))
    # Hack around API changes:
    if type(newerr) == type(0):
        err = newerr
        errstr = gram_error(err)
    else:
        err, errstr = newerr
    if newerr2 == None:
        err2 = 0
        err2str = "no error"
    else:
        err2, err2str = newerr2
    if jobcontact == None or jobcontact == "None" or \
       state != 0 or err != 0 or err2 != 0:
        errmsg = ""
        if state != 0:
            errmsg += " state %d" % state
        if err != 0:
            errmsg += " %s (error %d)" % (errstr, err)
        if err2 != 0:
            errmsg += " %s (error %d)" % (err2str, err2)
        cmd.error("%s: job submission failed: %s" % (cmd.rm, errmsg))
        # Signal job failure
        job.update_state(JOB_STATE_FAILED)
    else:
        # Now the job contact string is known:
        job.jobcontact = jobcontact

def job_state_descr(state):
    if state == JOB_STATE_PENDING:
        return "pending"
    elif state == JOB_STATE_ACTIVE:
        return "active"
    elif state == JOB_STATE_FAILED:
        return "failed"
    elif state == JOB_STATE_DONE:
        return "completed"
    elif state == JOB_STATE_SUSPENDED:
        return "suspended"
    elif state == JOB_STATE_UNSUBMITTED:
        return "unsubmitted"
    elif state == JOB_STATE_STAGE_IN:
        return "stagein"
    elif state == JOB_STATE_STAGE_OUT:
        return "stageout"
    else:
        return "unknown (state %d)" % state
    
def job_state_upcall(job, jobcontact, state, newerr):
    # API changed; HACK:
    if type(newerr) == type(0):
        err = newerr
        errstr = gram_error(err)
    else:
        err, errstr = newerr
    # callback called when the submitted job changes state.
    cmd = job.cmd
    if logging.DEBUG >= Global.logger_level:
        jobdescr = "%s: %s" % (job.cmd.task.host, job.cmd.rsl)
    else:
        # jobdescr = job.cmd.task.host
        jobdescr = job.cmd.rm
    if state == JOB_STATE_FAILED:
        cmd.info("%s: job %s: %s (error %d)" %
                 (jobdescr, job_state_descr(state), errstr, err))
    else:
        cmd.info("%s: job %s" % (jobdescr, job_state_descr(state)))
    job.update_state(state)

def gram_exception(cmd, ex, comment):
    cmd.error("GRAM exception%s: %s" % (comment, ex.args[0]))
    if ex.args[0] == "Unable to initialize the module":
        logerror("May need to rerun grid-proxy-init")
    
def job_cancel(job):
    state = job.get_state()
    if job.gc != None and job.jobcontact != None and \
       state != JOB_STATE_FAILED and state != JOB_STATE_DONE:
        job.cmd.debug("Cancelling %s" % job.jobcontact)
        try:
            job.gc.cancel_job(job.jobcontact)
        except GramClientException, ex:
            gram_exception(job.cmd, ex, " while cancelling job on %s" %
                           job.cmd.rm)
            # update job state anyway, or main thread will keep waiting
            job_state_upcall(job, job.jobcontact, JOB_STATE_FAILED,
                             ERROR_JOB_CANCEL_FAILED)

def job_start_async(cmd):
    rm = cmd.rm
    rsl = cmd.fullrsl

    condV = Condition(Lock())
    try:
        gC = GramClient()
        gcAttr = GramClientAttr()
        job = Globus_job(cmd, gC, -1, condV, None, None)
        callbackContact = gC.set_callback(job_state_upcall, job)
        job.callback = callbackContact
        cmd.debug("%s: submitting rsl %s; callback contact %s" %
                  (rm, rsl, callbackContact))
        gC.register_submit_request(rm, rsl, gramClient.JOB_STATE_ALL,
                                   callbackContact, gcAttr,
                                   job_submitted_upcall, job)
        # The job contact becomes known asynchronously via the callback
        # to job_state_upcall, at which point job.jobcontact is set.
        return job
    except GramClientException, ex:
        gram_exception(cmd, ex, " while starting rsl %s on %s" % (rsl, rm))
        terminate(2)
        return None

def run_commands(cmdlist, phase,
                 stage_in_all, stage_in_shared_all, stage_out_all):
    total = 0
    for cmd in cmdlist:
        if cmd.cpucount != 0:
            nproc = cmd.cpucount
        elif cmd.hostcount != 0:
            nproc = cmd.hostcount
        else:
            nproc = 1
        total += nproc

    for cmd in cmdlist:
        fullrsl = cmd.rsl

        # Host/cpu count
	countrsl = None
        if cmd.cpucount > 0:
            countrsl = "(count=%d)" % cmd.cpucount
            fullrsl += countrsl
        if cmd.hostcount > 0:
	    if countrsl == None:
                # Also use regular "count", or it may be assumed 1
                fullrsl += "(count=%d)" % cmd.hostcount
            # A few sanity checks before actually using RSL "hostCount"
            # since only few jobmanager seem to implement it correctly..
            if (cmd.site == None or cmd.site.usehostcount) and \
               cmd.rsrc != "jobmanager-fork":
                fullrsl += "(hostCount=%d)" % cmd.hostcount
        # if neither cpucount nor hostcount is set, 1 process is started

        # Check for site-specific RSL extension (e.g., non-default queue):
        if cmd.site != None:
            if cmd.rsrc in cmd.site.rslexts.keys():
                rslext = cmd.site.rslexts[cmd.rsrc]
                fullrsl += rslext
            
        # Wall time
        if Global.wall_time != 0:
            fullrsl += "(maxWallTime=%s)" % Global.wall_time
            
        sandbox = "grun.dir/job-%03d" % Global.jobnumber
        
        # Environment
        envrsl = "(environment="
        envlist = Global.environment + cmd.env
        if cmd.site != None and cmd.site.env != None:
            envlist += cmd.site.env
        for e in envlist:
            varval = string.split(e, "=", 1)
            if len(varval) == 2:
                envrsl += '(%s "%s")' % (varval[0], varval[1])
        envrsl += "(GRUN_NUMNODES %d)" % total
        envrsl += "(GRUN_TOKEN %s)" % Global.jobtoken
        envrsl += "(GRUN_PHASE %s)" % phase
        envrsl += "(GRUN_SANDBOX %s)" % sandbox
        envrsl += "(GRUN_CLUSTER %s)" % cmd.task.host
        if cmd.task.iosvr != None:
            envrsl += "(GRUN_GASS %s)" % cmd.task.iosvr
            if cmd.task.outdir != None:
                envrsl += "(GRUN_GASS_DIR %s)" % cmd.task.outdir

        # File staging
        stagelist = cmd.stage_in + stage_in_all
        stageinenv = ""
        stageoutenv = ""
        fullstagersl = ""
        if stagelist != []:
            stage_rsl = "(file_stage_in="
            for stage in stagelist:
                stage_rsl += "(@GRUN_GASS@/./%s %s)" % \
                             (stage[1], stage[0])
                stageinenv += "%s:" % stage[0]
            stage_rsl += ")"
            fullstagersl += stage_rsl

        stagelist = cmd.stage_in_shared + stage_in_shared_all
        if stagelist != []:
            stage_rsl = "(file_stage_in_shared="
            for stage in stagelist:
                stage_rsl += "(@GRUN_GASS@/./%s %s)" % \
                             (stage[1], stage[0])
                stageinenv += "%s:" % stage[0]
            stage_rsl += ")"
            fullstagersl += stage_rsl

        stagelist = cmd.stage_out + stage_out_all
        if stagelist != []:
            stage_rsl = "(file_stage_out="
            for stage in stagelist:
                stage_rsl += "(%s @GRUN_GASS@/./%s/%s)" % \
                             (stage[1], cmd.task.outdir, stage[0])
                stageoutenv += "%s:" % stage[0]
            stage_rsl += ")"
            fullstagersl += stage_rsl
            
        if stageinenv != "":
            envrsl += "(GRUN_STAGEDINFILES %s)" % stageinenv
        if stageoutenv != "":
            envrsl += "(GRUN_STAGEDOUTFILES %s)" % stageoutenv
        envrsl += ")"
        fullrsl += envrsl

        fullrsl += fullstagersl

        # I/O
        if cmd.task.iosvr != None:
            outrsl = "(stdout=%s/dev/stdout)(stderr=%s/dev/stderr)" % \
                     (cmd.task.iosvr, cmd.task.iosvr)
            fullrsl += outrsl

            # expand GRUN_GASS (needed for file/executable staging)
            fullrslnew = string.replace(fullrsl, "@GRUN_GASS@",
                                        cmd.task.iosvr)
            fullrsl = fullrslnew
            fullrslnew = string.replace(fullrsl, "@GRUN_SANDBOX@", sandbox)
            fullrsl = fullrslnew

        cmd.fullrsl = fullrsl
        cmd.job = job_start_async(cmd)
        if cmd.job == None:
            return

def cancel_jobs(cmdlist):
    for cmd in cmdlist:
        job = cmd.job
        if job == None:
            continue
        state = job.get_state()
        if state != JOB_STATE_DONE and state != JOB_STATE_FAILED:
            cmd.info("cancelling job on %s" % cmd.task.host)
            job_cancel(job)

def run_commands_sync(cmdlist, phase,
                      stage_in_all, stage_in_shared_all, stage_out_all):
    run_commands(cmdlist, phase,
                 stage_in_all, stage_in_shared_all, stage_out_all)
    if must_quit():
        cancel_jobs(cmdlist)
        return
    await_commands(cmdlist)

def job_list_status(cmdlist):
    numjobs = 0
    pending = 0
    running = 0
    done    = 0
    failed  = 0
    for cmd in cmdlist:
        numjobs += 1
        job = cmd.job
        if cmd.job == None:
            failed += 1
        else:
            state = job.get_state()
            if state == JOB_STATE_DONE:
                done += 1
            elif state == JOB_STATE_FAILED:
                failed += 1
            elif state == JOB_STATE_ACTIVE:
                running += 1
            else:
                pending += 1
    return (numjobs, pending, running, done, failed)

class Watchdog(Thread):
    def __init__(self, startup, wall, exit, cmdlist):
        Thread.__init__(self)
        self.startup_timeout = 60 * startup
        # Make the watchdog timeout somewhat more relaxed than the
        # time passed to the resource manager:
        self.wall_timeout = 60 * (wall + 1)
        self.exit_timeout = 60 * exit
        logdebug("Watchdog: startup time %d sec, wall time %d sec" %
                 (self.startup_timeout, self.wall_timeout))
        self.cmdlist = cmdlist
        self._startup_finished = Event()
        self._first_finished = Event()
        self._job_finished = Event()
        return

    def startup_finished(self):
        self._startup_finished.set()

    def first_finished(self):
        self._first_finished.set()

    def job_finished(self):
        self._job_finished.set()

    def shutdown(self):
        self.job_finished()
        self.first_finished()
        self.startup_finished()

    def cancel(self):
        Global.exitcode = 3
        cancel_jobs(self.cmdlist)
        
    def run(self):
        # Startup timer:
        if self.startup_timeout != 0:
            self._startup_finished.wait(self.startup_timeout)
            if self._job_finished.isSet():
                # Job was shutdown
                return
            (numjobs, pending, running, done, failed) = \
                      job_list_status(self.cmdlist)
            if pending > 0:
                loginfo("%d of %d jobs pending after %d sec: ABORT" %
                        (pending, numjobs, self.startup_timeout))
                self.cancel()
                timedout(Global.exitcode)
                return
            logdebug("all %d jobs started within %d sec" %
                     (numjobs, self.startup_timeout))
        # Wall timer:
        if self.wall_timeout != 0:
            self._first_finished.wait(self.wall_timeout)
            if not self._first_finished.isSet():
                # Cancel all pending or running subjobs:
                loginfo("jobs hit wall timeout after %d sec: ABORT" %
                        self.wall_timeout)
                self.cancel()
                terminate(Global.exitcode)
                return

        # Exit timer:
        if self.exit_timeout != 0:
            self._job_finished.wait(self.exit_timeout)
            if not self._job_finished.isSet():
                # Cancel all pending or running subjobs:
                loginfo("jobs hit exit timeout after %d sec: ABORT" %
                        self.exit_timeout)
                self.cancel()
                terminate(Global.exitcode)
                return
        

def job_await(job):
    try:
        job.cmd.debug("%s: awaiting job termination" % job.cmd.task.host)
        job.cv.acquire()
        while job.state != JOB_STATE_DONE and \
              job.state != JOB_STATE_FAILED:
            # Wait for job to succeed or fail with an error
            job.cv.wait(2)
            if must_quit():
                job.cv.release()
                return
        job.cv.release()

        # Disable GRAM callbacks
        job.gc.remove_callback(job.callback)
    except GramClientException, ex:
        gram_exception(job.cmd, ex, " while awaiting job on %s" %
                       job.cmd.task.host)
        terminate(2)

def await_commands(cmdlist):
    # By default now assume the resource manager actually
    # implements the walltime check itself, and we only run
    # our own watchdog as a fallback against co-scheduling
    # problems (e.g., some sites starts, but others don't).
    Global.watchdog_timer = Watchdog(Global.startup_time, Global.wall_time,
                                     Global.exit_time, cmdlist)
    Global.watchdog_timer.start()

    # Wait for all jobs to complete.
    # Each time, observe all jobs in the cmdlist so that we can take
    # actions based on global job state.
    todo = 1
    started_rep = 0
    exited_rep = 0
    while todo:
        (numjobs, pending, running, done, failed) = job_list_status(cmdlist)
        if running == numjobs and not started_rep:
            # All running now, warn watchdog
            Global.watchdog_timer.startup_finished()
            started_rep = 1
        if started_rep and running < numjobs and not exited_rep:
            # Some exited now, warn watchdog
            Global.watchdog_timer.first_finished()
            exited_rep = 1
        if pending + running == 0:
            Global.watchdog_timer.job_finished()
            break

        if Global.interrupted:
            loginfo("await_commands: interrupted")
            cancel_jobs(cmdlist)
            loginfo("await_commands: jobs cancelled")
            Global.interrupt_ack = 1
            break

        time.sleep(2)

    # Just in case?
    Global.watchdog_timer.shutdown()

def usage():
    useStr = """Usage: grun [options]
Options:
    -c '<cmdline>'
        Specifies grid job command line to be used.
        Should precede subjob(s) specified with -m option.
        Use    -c '@GRUN_GASS@/./<script> <args>' to transfer file
        <script> from local directory to remote system and run it there.
    -C '<cmdline>'
        A convenience version of the -c option: the commandline is assumed to
        start with a shell script on the local system, which is transferred,
        made executable, and run from the default Grun file staging script
        during the run phase.
    -r <rsl>
        Same as '-c' option, only specifies grid job using Globus RSL syntax.
        Example:   -r '(executable=/bin/uname)(arguments=-a)'
    -m <resource manager contact> 
        Create subjob on the grid resource specified, e.g.,
            fs0.das2.cs.vu.nl/jobmanager-pbs
        Grid resource may also be an alias, e.g., 'fs0', as defined in
        file griddefs.py.
    -N <cpucount>
        Set default number of cpus used for all subsequent subjobs
    -n <cpucount>
        Set non-default number of cpus used for single subjob
    -H <hostcount>
        Set default number of hosts used for all subsequent subjobs
        (for fork jobmanagers, -H is interpreted as -N)
    -h <hostcount>
        Set non-default number of hosts used for single subjob
        (for fork jobmanagers, -H is interpreted as -N)
    -E <var>=<val>
        Add environment setting for each job
    -e <var>=<val>
        Add environment setting for single subjob
    -s <cmdline>
        Run local command prior to grid job, e.g., to start name server
        or startup barrier.
    -t <minutes>
        Sets the maximum collective job start-up time in minutes.
        The subjobs already started are terminated if some other site
        is not starting its job within the specified time.
        The default is 3 minutes.
        Use -t 0 to disable the collective startup time check.
    -T <minutes>
        Sets the maximum job 'wall' time in minutes.
        The default is 10 minutes.
        -T 0 causes the use of (site-dependent) standard maximum walltimes.
    -x <minutes>
        Sets the maximum job 'exit' time in minutes, i.e., the time
        other subjobs have to complete termination after the first subjob
        exits.  After that, remaining subjobs are cancelled explicitly.
        The default is 3 minutes.
    -X <maxretry>
        Set maximum retry count for failed job submissions due to timeout.
        The default is 5 retries.
    --pre '<cmdline>'
        Specifies grid job pre-execution command line to be used,
        that are used to stage files in, and possibly do additional
        processing.  Hosts for which this applies are specified
        with subsequent -m opions.
    --pre-all '<cmdline>'
        Like --pre, but run the pre-execution cmdline on all resources
        used for regular execution.
    -p
        Like --pre-all, only the cmdline is assumed to be the same as for
        the -c option (environment variable GRUN_PHASE is set to 'stagein'
        rather than 'run', so the job can see the difference).
    --post '<cmdline>'
        Specifies grid job post-execution command line to be used,
        that are used to stage files out.  Hosts for which this
        applies are specified with subsequent -m opions.
    --post-all '<cmdline>'
        Like --post, but run the post-execution cmdline on all resources
        used for regular execution.
    -P
        Like --post-all, only the cmdline is assumed to be the same as for
        the -c option (environment variable GRUN_PHASE is set to 'stageout'
        rather than 'run', so the job can see the difference).
    --output-enable
        Use the Globus GASS server module to redirect standard output and
        standard error to grun (this is the default)
    --output-disable
        Do NOT use the Globus GASS server module to redirect standard out and
        standard error to grun (default is enabled)
    -o
        Create files stdout/stderr/status in job-specific directories:
           grun.out/job-<jobnumber>/site-<sitenumber>/
    --stage-in <file>|<remotefile>=<localfile>
        Copy file <localfile> to <remotefile> in single subjob
        Single argument <file> is a shortcut for <file>=<file>
    [--stage-in-all | -I] <file>|<remotefile>=<localfile>
        Same as --stage-in, only for all subjobs
    --stage-in-shared[-all] <file>|<remotefile>=<localfile>
        Same as --stage-in[-all], only creates a cache with symlinks
    --stage-out <file>|<localfile>=<remotefile>
        Copy back file <remotefile> to <localfile> in single subjob.
        File <localfile> will be placed in the job-specific directories
        (see -o option).
    --stage-out-all <file>|<localfile>=<remotefile>
        Same as --stage-out, only for all subjobs
    -l <level>
        specifies the level of diagnostics printed:
        -l CRITICAL: only print critical errors
        -l ERROR:    print any errors
        -l WARN:     also print warnings
        -l INFO:     also print informational diagnostics (e.g. job scheduling)
        -l DEBUG:    be very verbose
        The default level is WARN.
    -v
        An alias for '-l INFO'
    """
    print useStr
    sys.exit(1)
    
def check_preceeding_cmd(cmd, option):
    if cmd == None:
        logerror("option %s: no preceeding command" % option)
        usage()

def get_stage_arg(a, option, argdescr):
    args = string.split(a, "=", 1)
    if len(args) == 1:
        # "filename" is shortcut for "lastcomponent=filename"
        filename = args[0]
	slash = string.rfind(filename, "/")
        if slash > 0:
            dest = filename[slash + 1:]
            if dest == "":
                logerror("option %s requires %s arg" % (option, argdescr))
                usage()
        else:
            dest = filename
        return (dest, filename)
    elif len(args) == 2:
        return (args[0], args[1])
    else:
        logerror("option %s requires %s arg" % (option, argdescr))
        usage()
        return None

def stage_arg(stagelist, a, option, argdescr):
    stage = get_stage_arg(a, option, argdescr)
    if stage != None:
        if not os.path.isfile(stage[1]):
            logerror("option %s: %s not a file" % (option, stage[1]))
            usage()
        stagelist.append(stage)

def stage_out_arg(stagelist, a, option, argdescr):
    stage = get_stage_arg(a, option, argdescr)
    if stage != None:
        stagelist.append(stage)

def int_arg(arg, option):
    try:
        return long(arg)
    except ValueError:
        logerror("option %s requires integer argument" % option)
        usage()
        
def cmd_to_rsl(cmdline):
    cmdargs = string.split(cmdline, None, 1)
    rsl = "&(executable=%s)" % cmdargs[0]
    if len(cmdargs) == 2:
        # need escapes for certain characters in RSL:
        args = ""
	first = 1
	ind = 0
        for a in cmdargs[1]:
	    ind += 1;
	    last = (ind == len(cmdargs[1]))
            if a == '"':
		if not first:
		    a = "#'%s'" % a
		else:
	            a = "'%s'" % a
	        # a = "#'%s'#" % a
		if not last:
		    a = "%s#" % a
            else:
                for i in "[^\v\n+&|()=<>!'^$]*":
                    if a == i:
			if not first:
		    	    a = '#"%s"' % a
			else:
		            a = '"%s"' % a
                        # a = '#"%s"#' % a
			if not last:
		    	    a = "%s#" % a
                        break
            args += a
	    first = 0
        rsl += "(arguments=%s)" % args
    return rsl

def getArgs():
    argLen = len(sys.argv)
    if argLen == 1:
        usage()
    try:
        opts, args = \
          getopt.getopt(
            sys.argv[1:],
            "c:C:e:E:g:h:H:I:n:N:l:Lm:opPr:s:t:T:vx:X:",
            ["help", "version", "pre=", "pre-all=", "post=", "post-all=",
             "stage-in=", "stage-in-all=", "stage-in-shared=",
             "stage-in-shared-all=", "stage-out=", "stage-out-all=",
             "output-enable", "output-disable"])
    except getopt.GetoptError:
        usage()

    cur_cmd = None
    cur_cmd_list = None
    cur_rsl = None
    cur_rm = None
    cur_site = None
    prerun_all = None
    postrun_all = None
    for o, a in opts:
        if o == "--help":
            usage()
        elif o == "--version":
            print "grun version %s" % __version__
            sys.exit(0)
        elif o == "-n":
            check_preceeding_cmd(cur_cmd, "-n")
            cur_cmd.cpucount = int_arg(a, o)
        elif o == "-N":
            Global.def_cpucount = int_arg(a, o)
        elif o == "-h":
            check_preceeding_cmd(cur_cmd, "-h")
            cur_cmd.hostcount = int_arg(a, o)
        elif o == "-H":
            Global.def_hostcount = int_arg(a, o)
        elif o == "-e":
            check_preceeding_cmd(cur_cmd, "-e")
            cur_cmd.env.append(a)
        elif o == "-E":
            Global.environment.append(a)
        elif o == "-m":
            cur_rm = a
            rmlist = string.split(cur_rm, "/")
            if len(rmlist) == 1:
                # No explicit jobmanager: use default for site if available
                if griddefs.Grid_site.allsites.has_key(cur_rm):
                    site = griddefs.Grid_site.allsites[cur_rm]
                    if cur_cmd_list is Global.command_list:
                        use_rm = "%s/%s" % (site.host, site.jobmanager)
                    else:
                        use_rm = "%s/%s" % \
                                 (site.xfer_host, site.xfer_jobmanager)
                    logdebug("Using resource manager %s for %s" %
                             (use_rm, cur_rm))
                    cur_rm = use_rm
                    cur_site = site
            if cur_rsl != None:
                cur_cmd = Globus_command(cur_rm, cur_rsl, Global.def_cpucount,
                                         Global.def_hostcount)
                cur_cmd_list.append(cur_cmd)
                cur_rm = None
                cur_cmd.site = cur_site
                cur_site = None
        elif o == "-c" or o == "-C" or o == "-r":
            if o == "-c":
                cur_rsl = cmd_to_rsl(a)
            elif o == "-C":
                cmdlist = string.split(a)
                cmd = cmdlist[0]
                stage = get_stage_arg(cmd, o, "<command>")
                if not os.path.isfile(cmd):
                    logerror("option %s: %s not a file" % (option, cmd))
                    usage()
                Global.stage_in.append(stage)
                # cmd might be path, so replace it by staged version:
                newcmdline = "@GRUN_GASS@/./%s/etc/stage %s%s" % \
                    (Global.grun_home, stage[0], a[len(cmd):])
                cur_rsl = cmd_to_rsl(newcmdline)
            else:
                cur_rsl = a
            cur_cmd_list = Global.command_list
            if cur_rm != None:
                cur_cmd = Globus_command(cur_rm, cur_rsl, Global.def_cpucount,
                                         Global.def_hostcount)
                cur_cmd_list.append(cur_cmd)
                cur_rsl = None
        elif o == "-s":
            Global.preexec_command = a
        elif o == "-t":
            Global.startup_time = int_arg(a, o)
        elif o == "-T":
            Global.wall_time = int_arg(a, o)
        elif o == "-x":
            Global.exit_time = int_arg(a, o)
        elif o == "-X":
            Global.maxretry = int_arg(a, o)
        elif o == "-p":
            prerun_all = "default"
        elif o == "-P":
            postrun_all = "default"
        elif o == "--pre":
            cur_cmd = None
            cur_rsl = cmd_to_rsl(a)
            cur_cmd_list = Global.precommand_list
        elif o == "--post":
            cur_cmd = None
            cur_rsl = cmd_to_rsl(a)
            cur_cmd_list = Global.postcommand_list
        elif o == "--pre-all":
            prerun_all = cmd_to_rsl(a)
        elif o == "--post-all":
            postrun_all = cmd_to_rsl(a)
        elif o == "-o":
            Global.extern_out = 1
        elif o == "--output-enable":
            Global.out_on = 1
        elif o == "--output-disable":
            Global.out_on = 0
        elif o == "--stage-in":
            check_preceeding_cmd(cur_cmd, o)
            stage_arg(cur_cmd.stage_in, a, o, "<remotefile>=<localfile>")
        elif o == "--stage-in-all" or o == "-I":
            stage_arg(Global.stage_in,   a, o, "<remotefile>=<localfile>")
        elif o == "--stage-in-shared":
            check_preceeding_cmd(cur_cmd, o)
            stage_arg(cur_cmd.stage_in_shared, a, o,
                      "<remotefile>=<localfile>")
        elif o == "--stage-in-shared-all":
            stage_arg(Global.stage_in_shared,  a, o,
                      "<remotefile>=<localfile>")
        elif o == "--stage-out":
            check_preceeding_cmd(cur_cmd, o)
            stage_out_arg(cur_cmd.stage_out, a, o, "<localfile>=<remotefile>")
        elif o == "--stage-out-all":
            stage_out_arg(Global.stage_out,   a, o, "<localfile>=<remotefile>")
        elif o == "-L":
            Global.logger_on = 1
        elif o == "-v":
            Global.logger_level = logging.INFO
        elif o == "-l":
            if a == "DEBUG":
                Global.logger_level = logging.DEBUG
            elif a == "INFO":
                Global.logger_level = logging.INFO
            elif a == "WARNING":
                Global.logger_level = logging.WARNING
            elif a == "ERROR":
                Global.logger_level = logging.ERROR
            elif a == "CRITICAL":
                Global.logger_level = logging.CRITICAL
            else:
                logerror("Unrecognized logging level '%s'" % a)
                usage()
        else:
            logerror("unimplemented option '%s'" % o)
            usage()

    if prerun_all != None or postrun_all != None:
        # Create list of file xfers resource managers required
        xfer_rm_list = []
        for cmd in Global.command_list:
            if prerun_all == "default":
                prerun_all = cmd.rsl
            if postrun_all == "default":
                postrun_all = cmd.rsl
            if griddefs.Grid_site.allsites.has_key(cmd.task.host):
                site = griddefs.Grid_site.allsites[cmd.task.host]
                xfer_rm = "%s/%s" % (site.xfer_host, site.xfer_jobmanager)
            else:
                # By default use "fork" jobmanager on execution host
                xfer_rm = "%s/jobmanager-fork" % cmd.task.host
                loginfo("Using %s to xfer files to/from %s" %
                        (xfer_rm, cmd.task.host))
            xfer_rm_list.append(xfer_rm)

        if prerun_all != None:
            for xfer in xfer_rm_list:
                Global.precommand_list.append(
                    Globus_command(xfer, prerun_all, 1, 0))
        if postrun_all != None:
            for xfer in xfer_rm_list:
                Global.postcommand_list.append(
                    Globus_command(xfer, postrun_all, 1, 0))

    if len(Global.command_list) == 0:
        logerror("Resource manager/command option missing")
        usage()

def create_site_dirs(cmdlist):
    for cmd in cmdlist:
        cmd.task.outdir = "grun.out/job-%03d/site-%d" % \
                          (Global.jobnumber, cmd.task.number)
        cmdline = "mkdir -p %s" % cmd.task.outdir
        logdebug(cmdline)
        err = os.system(cmdline)
        if err != 0:
            logerror("%s failed, error %d" % (cmdline, err))
            terminate(2)
    
def start_io_servers(cmdlist):
    # First start all GASS servers, then pick up the https:// output
    # to save some waiting time per job.
    #cmdnum = 0
    for cmd in cmdlist:
        # start a separate GASS server for each subjob and redirect
        # output to file in a subdirectory per resource
        try:
            cmd.task.statusfile = open("%s/status" % cmd.task.outdir, "w", 1)
        except IOError:
            logerror("error opening %s/status" % cmd.task.outdir)
            terminate(2)

        cmd.printjobinfo()
        
        stdout = "%s/stdout" % cmd.task.outdir
        stderr = "%s/stderr" % cmd.task.outdir
        cmdline = "globus-gass-server -w -r -o -e -c -l >%s 2>%s &" % \
                  (stdout, stderr)
        cmd.debug("%s: started %s" % (cmd.task.host, cmdline))
        os.system(cmdline)

    for cmd in cmdlist:
        stdout = "%s/stdout" % cmd.task.outdir
        for i in range(0, 10):
            try:
                fd = open(stdout)
                lines = fd.readlines()
            except IOError:
                cmd.debug("error opening GASS server output %s; retry" %
                          stdout)
            else:
                fd.close()
                if len(lines) >= 1:
                    break
            time.sleep(1)
        if len(lines) < 1 or string.find(lines[0], "https://") != 0:
            cmd.error("No https:// in GASS server output %s" % stdout)
            terminate(2)
        ioUrl = string.rstrip(lines[0])
        cmd.task.iosvr = ioUrl
        cmd.debug("%s: using GASS server %s" % (cmd.task.host, cmd.task.iosvr))

def stop_io_servers(cmdlist):
    if not Global.extern_out:
        return
    for cmd in cmdlist:
        # Do the shutdown on background since I've seen it hanging.. :-(
        cmdline = "globus-gass-server-shutdown %s &" % cmd.task.iosvr
        cmd.debug("%s: %s" % (cmd.task.host, cmdline))
        os.system(cmdline)

def check_interrupt():
    if must_quit():
        if Global.interrupted:
            Global.interrupt_ack = 1
        return 1
    else:
        return 0
        
def run_main_prepare():
    # set up error logging first
    if Global.logger_on:
        Global.logger = logging.getLogger("grun")
        handler = logging.StreamHandler(sys.stderr)
        #formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
        formatter = logging.Formatter("%(message)s")
        handler.setFormatter(formatter)
        Global.logger.addHandler(handler)
        Global.logger.setLevel(Global.logger_level)

    if check_interrupt():
        return

    loginfo("starting job %03d" % Global.jobnumber)

    # We start the preexecution command only once.  When co-allocation
    # is retried we supply a new GRUN_TOKEN, so any servers started
    # can differentiate between two sessions.
    # In cases where this isn't possible, the startup script should
    # implement a startup barrier.
    if Global.preexec_command != None:
	os.system(Global.preexec_command)
	Global.preexec_done = 1

def run_main():
    start = time.time()
    new_job_token()

    stage_in_all = Global.stage_in
    stage_in_shared_all = Global.stage_in_shared
    if Global.precommand_list != []:
        run_commands_sync(Global.precommand_list, "stagein",
                          stage_in_all, stage_in_shared_all, [])
        if check_interrupt():
            return
        stage_in_all = []
        stage_in_shared_all = []

    stage_out_all = []
    if Global.postcommand_list == []:
        stage_out_all = Global.stage_out
    run_commands_sync(Global.command_list, "run",
                      stage_in_all, stage_in_shared_all, stage_out_all)
    # It's sometimes useful to pick up results of (partially) failed runs,
    # so we don't check must_quit() here
    if Global.postcommand_list != []:
        run_commands_sync(Global.postcommand_list, "stageout",
                          [], [], Global.stage_out)
    if check_interrupt():
        return

    stop = time.time()
    loginfo("Total running time: %.1f sec" % (stop - start))
    if Global.exitcode == 0:
        loginfo("job %03d try %d done" % (Global.jobnumber, Global.jobtry))
    else:
        loginfo("job %03d try %d failed (exit code %d)" %
                (Global.jobnumber, Global.jobtry, Global.exitcode))
    logging.shutdown()

    # Keep exitcode
    terminate(Global.exitcode)

class StarterThread(Thread):
    def __init__(self):
        Thread.__init__(self)
        self._finished = Event()
        return

    def shutdown(self):
        # set the finished flag and wakeup sleeping thread
        self._finished.set()

    def run(self):
        run_main_prepare()

	if Global.out_on:
            # Note: also create job/site dirs when not starting separate
            # I/O servers, since the remote commands may still want
            # to use the single GASS server, and also the job id needs to
            # be reserved.
            create_site_dirs(Global.command_list)
            if not Global.extern_out:
                # Start a single GASS server that redirects stdout/err
                # of all subjobs.   HACK: to keep this server alive, it
                # appears this has to be done HERE, and not in subfunction?!
                ioserver = GassServerEZ(STDOUT_ENABLE | STDERR_ENABLE |
                                        READ_ENABLE | WRITE_ENABLE)
                ioUrl = ioserver.getURL()
                for cmd in Global.command_list:
                    cmd.task.iosvr = ioUrl
            else:
                start_io_servers(Global.command_list)

        while not (Global.terminated or Global.interrupted):
            run_main()
            # See if we may still retry:
            if Global.timed_out:
                if Global.jobtry <= Global.maxretry:
                    Global.timed_out = 0
                    Global.exitcode  = 0
                else:
                    terminate(Global.exitcode)

def main():
    # Python having problems writing to pipe?
    signal.signal(signal.SIGPIPE, signal.SIG_IGN)
    # Move ourselves to new process group so that we can easily terminate
    # all children, but not the shell invoking us.
    oldgroup = posix.getpgrp()
    group = posix.getpid()
    # print "Old group %d, new group %d" % (oldgroup, group)
    posix.setpgid(0, group)

    griddefs.sitedefs()
    getArgs()
    starter = StarterThread()
    starter.start()

    try:
        # After having started the subprocesses, move back to original group
        # to handle ctrl-C
        time.sleep(2)
        posix.setpgid(0, oldgroup)
        # Don't check Global.timed_out here, since jobs may be retried
        while not (Global.terminated or Global.interrupted):
            time.sleep(1)
    except KeyboardInterrupt:
	# Just in case we have not moved back to the original group yet:
        posix.setpgid(0, oldgroup)
        print "Interrupted: cancelling jobs; please wait.."
        Global.exitcode = 2
        Global.interrupted = 1
        while Global.interrupt_ack == 0:
            time.sleep(1)
        print "Jobs cancelled."

    if Global.out_on:
        stop_io_servers(Global.command_list)
        time.sleep(2)

    # kill process group in case external servers don't go away by themselves;
    # but don't kill ourselves in the process:
    signal.signal(signal.SIGTERM, signal.SIG_IGN)
    try:
        posix.kill(-group, signal.SIGTERM)
    except OSError:
        # Maybe we didn't started any subprocesses, or they already exited
	# print "Kill %d failed" % group
        pass
    sys.exit(Global.exitcode)

if __name__ == "__main__":
    main()

