#!/usr/bin/env python # # This is TNT.py (the ATLAS TagNavigatorTool) # Copyright (c) 2006 C.Nicholson, M. Kenyon. All rights reserved. # For licence conditions please read LICENCE file. # # 1) Queries POOL Collection Utilities to get required events and list of GUIDs # 2) (Records query in metadata system (AMI?)) # 3) Extracts list of GUIDs, ordered by DQ2 dataset, with one POOL File Catalogue # per GUID plus one with all GUIDs. # 4) splits job into one per GUID # 5) sends job(s) to worker node(s) # 6) collects output from WNs after job(s) completed # # 'Pythonised' by Mike Kenyon - 11/10/2006 # import sys, os, signal import getopt, string import re, time, datetime import shutil import popen2 import generateLCGJob import GuidExtractor import notifyUser # User options allowed _useropts = "hvbac:" _userlongopts = [ "help", "verbose", "background", "archive", "conf="] # Usage summary def _usage(exit = None): print """Command line options: [-h | --help] [-v | --verbose] [-b | --background] [-a | --archive] [-c | --conf] ].""" if exit != None: sys.exit() # some global settings myID = str(os.getpid()) logfile = "TNT.log-" + myID whoami = os.environ.get("USER") # Option handling opts = [] verbose = 0 background = "false" archive = "false" conffile = "TNT.conf" try: optlist, args = getopt.getopt( sys.argv[1:], _useropts, _userlongopts ) except getopt.error: print sys.exc_value _usage( 2 ) if args: print "Unhandled arguments:", args _usage( 2 ) for opt, arg in optlist: if opt in ("-h", "--help"): _usage() sys.exit() elif opt in ("-v", "--verbose"): verbose = 1 elif opt in ("-c", "--conf"): conffile = arg elif opt in ("-b", "--background"): background = "true" elif opt in ("-a", "--archive"): archive = "true" archiveLog = open(logfile, "w") if optlist: del opt, arg del args, optlist, opts del _useropts, _userlongopts, getopt # error codes E_DQ2_INSTALL=3 E_ATLAS_INSTALL=4 E_DQ2_REG=5 E_FILE_COPY=6 E_LCG_CR=7 E_LCG_CR_DEFAULT_SE=8 # method to add date and time to every print statement, if writing # to a log file def tntPrint(textForPrinting): if background == "true": print "[" + re.split('\.', str(datetime.datetime.today()))[0] + "] " + textForPrinting elif archive == "true": archiveLog.write("[" + re.split('\.', str(datetime.datetime.today()))[0] + "] " + textForPrinting + "\n") print textForPrinting else: print textForPrinting # method to disconnect from terminal when running in background mode # From Graeme Stewart def disconnect(signum, frame): '''Method to redirect output to a logfile and carry on''' try: out = file(logfile, 'w', 1) except IOError: out = file('/dev/null', 'w') sys.stderr.close() sys.stdout.close() sys.stdin.close() sys.stdout = sys.stderr = out def queryAndSplit(src_coll, src_type, src_connect, query, minevents): """ method to query the Tag DB using CollCopy, and split up the resultant collection with CollSplitByGUID """ collcommand="CollCopy.exe -src " + src_coll + " " + \ src_type + " -srcconnect " + src_connect + " -dst myEvents RootCollection " +\ "-queryopt \'SELECT RunNumber, EventNumber\' -query \"" + query + "\"" print collcommand # pass CollCopy command to the system outputhandle, inputhandle = popen2.popen4(collcommand) inputhandle.close() output = outputhandle.readlines() outputhandle.close() for line in output: tntPrint(line) ### split collection into sub-collections, on GUID boundaries but with ### at least minEvents events per collection # (can probably use this directly in place of CollCopy - put separately first for testing) subCollections = {} splitCommand="./CollSplitByGUID.exe -src myEvents RootCollection -minevents " + minevents print splitCommand outputhandle, inputhandle = popen2.popen4(splitCommand) inputhandle.close() output = outputhandle.readlines() outputhandle.close() for line in output: collName = "" guidList = [] if line != "\n": tntPrint(line) if line.startswith("Created"): words = line.split() collName = words[3] for word in words[6:]: guidList.append(word) # collect mapping of sub-collections to the guids they contain subCollections[collName] = guidList return subCollections def main(): # if running in background mode, set up signal handler and disconnect if background == "true": signal.signal(signal.SIGHUP, disconnect) disconnect(None, None) ### --------------------- initialisation ------------------------------------------- print "**** Welcome to TNT! ****\n\nUsing " + conffile + " as configuration file\n" print "Job name is " + whoami + "-" + myID + "\n" if (archive == "true"): print "Archiving logging output to " + logfile # if archiving, create archive directory if archive == "true": archiveRoot = "./archive" archiveDir = "./archive/TNT-" + myID if not os.path.exists(archiveRoot): os.mkdir(archiveRoot) os.mkdir(archiveDir) ### read arguments from configuration file #store config values in confDict dictionary confDict = {} if os.path.exists(conffile): f = open(conffile,mode="r") configLines = f.readlines() f.close() for line in configLines: #only interested in non-blank lines that are not comments if (line[0] != "\n") and (line[0] != "#"): #:= is the field separator in the config file separator=line.find(":=") #place the variable names and their values into a dictionary #bit of trickery to remove right/left whitespace. confDict[(line[0:separator].rstrip()).lstrip()] = (line[separator+2:].rstrip()).lstrip() else: print "File " + conffile + " not found" sys.exit(1) # if archiving, copy conf file to archive directory if archive == "true": shutil.copy(conffile, archiveDir + "/" + conffile) ##check that grid type is valid # TODO: remove OSG, NG options - probably not relevant any more if (confDict["GRID_TYPE"] != 'LCG') and (confDict["GRID_TYPE"] != 'OSG') and (confDict["GRID_TYPE"] != 'NG'): print "GRID_TYPE " + confDict["GRID_TYPE"] + " is invalid:\nMust be one of 'LCG', 'OSG' or 'NG'" sys.exit(1) ### DQ2 variables #check that REGISTER_OUTPUT is set to a sensible value if (confDict["REGISTER_OUTPUT"] != 'YES') and (confDict["REGISTER_OUTPUT"] != 'NO'): print "Error: REGISTER_OUTPUT = " + confDict["REGISTER_OUTPUT"] + " is not a valid choice.\nMust be either YES or NO" sys.exit(1) ### ----------------------------- querying and extracting of GUIDs ---------------------------- ## ### execute query and write events to myEvents.root ## # remove existing files first if os.path.exists("myEvents.root"): os.remove("myEvents.root") tntPrint("Executing query '" + confDict["QUERY"] + "' on tag database...") # run query on the tag DB and split resulting collection according to guid / min events subCollections = queryAndSplit(confDict["SRC_COLLECTION_NAME"],confDict["SRC_COLLECTION_TYPE"], \ confDict["SRC_CONNECTION_STRING"],confDict["QUERY"],confDict["MIN_EVENTS"]) # if archiving, copy the events to the archive directory if archive == "true": shutil.copy("myEvents.root", archiveDir + "/outputEvents.root") # ### remove any existing PoolFileCatalog.xml files # tntPrint("Removing any existing PoolFileCatalog files...") # for file in os.listdir("./"): # if file.find("PoolFileCatalog") >= 0: # os.remove(file) ### extract GUID list and generate POOL XML file catalogues tntPrint("Extracting GUID list from event collection...") #call external python script guidList = GuidExtractor.pythonCalled(['p']) print guidList tntPrint("GuidExtractor.py returned " + str(len(guidList)) + " GUIDs") ### ---------------------- generation and submission of grid jobs -------------------------------- ## ### make tarball of relevant code and scripts in current working directory tarCmd = popen2.Popen4("tar -c GenerateCatalogs.py install_dq2_client.sh pycurl.so > dq2.tar") tarCmd.fromchild.close() tarCmd.tochild.close() cmdReturn = tarCmd.wait() ##did it work? if cmdReturn != 0: print "'tar -c GenerateCatalogs.py install_dq2_client.sh pycurl.so > dq2.tar' failed" sys.exit(1) ### register new DQ2 dataset, if required if confDict["REGISTER_OUTPUT"] == 'YES': dq2Cmd = popen2.Popen4("./dq2 registerNewDataset " + confDict["OUTPUT_DATASET_NAME"]) outputhandle = dq2Cmd.fromchild dq2Cmd.tochild.close() output = outputhandle.readlines() outputhandle.close() cmdReturn = dq2Cmd.wait() ##did it work? if dq2Cmd != 0: print "./dq2 registerNewDataset " + confDict["OUTPUT_DATASET_NAME"] + " failed" sys.exit(1) numJobs=0 ### remove any pre-existing jdl and executable grid job files tntPrint("Removing old .jdl and .sh grid job files") jobDir = "./jobs/" if os.path.exists(jobDir): for file in os.listdir(jobDir): if file.find("gridJob_") >= 0: os.remove(jobDir + file) else: os.mkdir(jobDir) ### generate grid jobs tntPrint("Generating grid job(s) for " + confDict["GRID_TYPE"]) #for guid in guidList: for collection in subCollections: # generate correct type of job for grid if confDict["GRID_TYPE"] == "LCG": #call external python function to create .sh files for the Grid genExecFileReturn = generateLCGJob.createExecFile(collection, subCollections[collection], confDict["ATHENA_COMMAND"], \ confDict["OUTPUT_FILES"], confDict["OUTPUT_DATASET_NAME"], \ confDict["OUTPUT_DATASET_LOCATION"], str(numJobs)) #call external python function to create .jdl files for the Grid genJdlReturn = generateLCGJob.createJdlFile(collection, confDict["INPUT_SANDBOX"], \ confDict["OUTPUT_SANDBOX"]) if genExecFileReturn != 0: print "generateLCGJob.createExecFile failed. Check input parameters." sys.exit(1) elif genJdlReturn != 0: print "generateLCGJob.createJdlFile failed. Check input parameters." sys.exit(1) else: jobIDfile = "jobIDfile-" + myID #submit job to LCG if verbose == 1: submitCmd = "edg-job-submit --vo atlas -o " + jobIDfile + \ " jobs/gridJob_" + collection + ".jdl" tntPrint("Submitting job for collection = " + collection) handle = os.popen(submitCmd) handle.close() else: submitCmd = "edg-job-submit --vo atlas -o " + jobIDfile + \ " jobs/gridJob_" + collection + ".jdl 1>/dev/null" handle = os.popen(submitCmd) handle.close() """elif confDict["GRID_TYPE"] == "OSG": genJobCmd = "./generateOSGJob.sh " + str(guid + " " + confDict["OUTPUT_DATASET_NAME"] \ + " " + confDict["OUTPUT_DATASET_LOCATION"]) #Need to add command to submit to OSG print "Submission to OSG not yet implemented. Sorry." sys.exit(1) elif confDict["GRID_TYPE"] == "NG": genJobCmd = "./generateNGJob.sh " + str(guid + " " + confDict["OUTPUT_DATASET_NAME"] \ + " " + confDict["OUTPUT_DATASET_LOCATION"]) #Need to add command to submit to NG print "Submission to NG not yet implemented. Sorry." sys.exit(1) """ numJobs += 1 tntPrint(str(numJobs) + " jobs successfully submitted...") ## ### ----------------------------- wait for jobs to finish running ------------------------- ## jobsDone = 0 jobsFailed = 0 closeDataset = "true" # create output directory if it doesn't already exist outputDir = "./grid-output" if not os.path.exists(outputDir): os.mkdir(outputDir) ### poll until all jobs finished, catching errors and resubmitting jobs if necessary while jobsDone+jobsFailed < numJobs: if verbose == 1: tntPrint("##############") tntPrint("Waiting for jobs to finish...(" + str(jobsDone+jobsFailed) + " of " + str(numJobs) \ + " are complete).") time.sleep(600) handleID = open(jobIDfile,"r") jobIDs = handleID.readlines() handleID.close() ##find job status for each job ##make temporary job ID file for purpose of commenting out failed jobs tmpIDfile = open("TNT_tmpIDfile","w") ##this line is required, otherwise edg-job-submit becomes interactive tmpIDfile.write("###Submitted Job Ids###\n") for jobID in jobIDs: ##only process non-commented lines (comment = #) if str(jobID.startswith("#")) == "False": statusCmd = "edg-job-status " + jobID #find job status handleStatus = os.popen(statusCmd,"r") statusReturn = handleStatus.readlines() handleStatus.close() statusDict = {} for line in statusReturn: if (line[0] != "\n"): separator = line.find(":") statusDict[(line[0:separator].rstrip()).lstrip()] = (line[separator+2:].rstrip()).lstrip() if verbose == 1: tntPrint("jobID " + jobID + "has current status: " + str(statusDict["Current Status"])) if statusDict["Current Status"] == "Done (Success)": tntPrint("Job " + jobID + " is complete") if verbose == 1: edgCmd = popen2.Popen4("edg-job-get-output --dir " + outputDir + " " + jobID) outputhandle = edgCmd.fromchild edgCmd.tochild.close() output = outputhandle.readlines() outputhandle.close() edgReturn = edgCmd.wait() for line in output: print line else: edgCmd = popen2.Popen4("edg-job-get-output --dir " + outputDir + " " + jobID) outputhandle = edgCmd.fromchild edgCmd.tochild.close() output = outputhandle.readlines() outputhandle.close() edgReturn = edgCmd.wait() if edgReturn == 0: tntPrint("Job ran successfully") jobsDone = jobsDone + 1 if verbose == 1: tntPrint("EXIT CODE = " + str(statusDict["Exit code"])) # pre-defined exit codes for failure of software installation and file copy # in these cases, resubmit and hope another worker node is better if (int(statusDict["Exit code"]) == E_DQ2_INSTALL) or (int(statusDict["Exit code"]) == E_ATLAS_INSTALL) or \ (int(statusDict["Exit code"]) == E_FILE_COPY): shortJobID = jobID[jobID.rfind("/")+1:].strip() jobRoot = "ls " + outputDir + "/" + whoami + "_" + shortJobID + "/gridJob*.out" handleJob= os.popen(jobRoot, "r") jobOutput = handleJob.readline().strip() handleJob.close() job = jobOutput[jobOutput.rfind("/")+1:].replace("out","jdl").strip() if (int(statusDict["Exit code"]) == E_DQ2_INSTALL) or (int(statusDict["Exit code"]) == E_ATLAS_INSTALL): reason = " installation failure" else: reason = " source file copy failure" tntPrint("Resubmitting job because of" + reason) tmpIDfile.write("#" + jobID) tmpIDfile.close() edgCmd = os.popen("edg-job-submit --vo atlas -o TNT_tmpIDfile jobs/" + job) output = edgCmd.readlines() edgCmd.close() if verbose == 1: for line in output: print line tmpIDfile = open("TNT_tmpIDfile","a") jobsDone -= 1 elif int(statusDict["Exit code"]) == E_LCG_CR: # exit code for failure to register output file on LCG closeDataset="false" tntPrint("Registration of file from job " + jobID + " in LCG failed") tntPrint("DQ2 dataset will not be closed nor will location be registered.") # exit code for when a file went to the VO default SE rather than # the user-specified SE elif int(statusDict["Exit code"]) == E_LCG_CR_DEFAULT_SE: closeDataset = "false" tntPrint("Not all files were registered at site " + confDict["OUTPUT_DATASET_LOCATION"] \ + "(though all have been registered somewhere..)\n \ DQ2 dataset will not be closed nor will location be registered.") # exit code for when registration in DQ2 failed for some reason elif int(statusDict["Exit code"]) == E_DQ2_REG: closeDataset = "false" tntPrint("Registration of file from job " + jobID + " in DQ2 failed") # catch-all for any other non-zero exit codes elif int(statusDict["Exit code"]) != 0: tntPrint("BUT Job " + jobID + "finished with non-zero exit code") tntPrint("Please check output manually") jobsDone -= 1 jobsFailed += 1 #if marked as failed or aborted, comment out of jobID file elif (statusDict["Current Status"] == "Failed") or (statusDict["Current Status"] == "Aborted"): tntPrint("Job " + jobID + " was failed or aborted; please check output manually") jobsFailed += 1 tmpIDfile.write("#" + jobID) #else put it back into the jobIDfile for another cycle else: tmpIDfile.write(jobID) tmpIDfile.close() #NOT debug - the following line is required os.rename("TNT_tmpIDfile", jobIDfile) status = "All jobs completed with " + str(jobsDone) + " successes and " + str(numJobs-jobsDone) + " failures." tntPrint(status) # if everything was ok, remove the jobIDfile - we don't need it any more if jobsDone == numJobs: os.remove(jobIDfile) # If option was specified in parameters file, main output # should have been registered on grid site and in DQ2 dataset. # Now complete dataset registration, close and freeze it. if confDict["REGISTER_OUTPUT"] == "YES": #only close dataset if all jobs completed properly if (jobsDone == numJobs) and (closeDataset == "true"): dq2Cmd = os.popen("./dq2 closeDataset " + confDict["OUTPUT_DATASET_NAME"]) dq2Cmd.close() dq2Cmd = os.popen("./dq2 freezeDataset " + confDict["OUTPUT_DATASET_NAME"]) dq2Cmd.close() dq2Cmd = os.popen("./dq2 registerDatasetLocation -c " + confDict["OUTPUT_DATASET_NAME"] + confDict["OUTPUT_DATASET_LOCATION"]) dq2Cmd.close() tntPrint("Dataset " + confDict["OUTPUT_DATASET_NAME"] + " now registered at site " + confDict["OUTPUT_DATASET_LOCATION."]) else: tntPrint("Cannot close, freeze or register location of DQ2 dataset " + confDict["OUTPUT_DATASET_NAME"] + "; there were some problems.") tntPrint("Please check job output files and register location manually.") # tidy up os.remove("myEvents.root") for file in os.listdir("./"): if file.find("PoolFileCatalog") >= 0: os.remove(file) # if running in background, send mail to user saying job is finished if background == "true": address = confDict["EMAIL_ADDRESS"] job = [whoami + "-" + myID] notifyUser.sendmail(address, job, status) if archive == "true": if background == "false": archiveLog.close() shutil.move(logfile, archiveDir+"/"+logfile) #success! sys.exit(0) # end main if __name__ == "__main__": # if running in the background, fork off daemon process if background == "true": workingDir = os.getcwd() try: pid = os.fork() if pid > 0: # exit first parent sys.exit(0) except OSError, e: print "fork #1 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # decouple from parent environment os.chdir("/") #don't prevent unmounting.... os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent, print eventual PID before print "TNT is running in the background; output is written to %s" % logfile #print "Daemon PID is %d" % pid sys.exit(0) except OSError, e: print "fork #2 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) os.chdir(workingDir) main()