#!/bin/bash "exec" "python" "-Wignore" "$0" "$@" def _usage(): print \ """ NAME dq2_put - register datasets to DQ2 SYNOPSIS dq2_put [ -h | --help] [ -v | --verbose ] [ -o | --official ] [ -g | --gsiftp host:port ] [ -d | --directory directory ] [ -p | --pool poolfilecatalog ] [ -e | --exist ] [ -f | --fast-registration ] datasetname DESCRIPTION Registers files to LRC, creates a dataset which is composed of the files, and then registers the dataset to DQ2. If a PoolFileCatalog is given, a list of files is extracted from the PoolFileCatalog. Otherwise, GUIDs are generated for files under a directory using uuidgen. OPTIONS -h | --help Print this message -v | --verbose Verbosity -o | --official Enable to use official datasetname. Normal users should follow the naming convention for user datasets, i.e., user.. lfn=lfns2[index] print "ERROR: inconsistent File Catalog" sys.exit(EC_Merge) elif lfn in lfns2: index = lfns2.index(lfn) if strict and guids2[index] and giud != guids2[index]: # if strict the GUID in guids1[i] was not random (a new one) and cannot be changed print "ERROR: inconsistent File Catalog" sys.exit(EC_Merge) # either is the same or it is the right one giud = guids2[index] # fill non Key attributes (pfn, size, md5sum) pfn = pfns1[i] if index == -1: if files_exist: print "ERROR : a files is not registered in LRC (%s,%s), run withot option -e (-exist)" % (lfn,guid) sys.exit(EC_Main) # most likely sizes and md5sums are empty ('') size = sizes1[i] md5sum = md5sums1[i] else: # hopefully either '' or valid size = sizes2[index] md5sum = md5sums2[index] # TODO: better check for invalid size/md5sum values (e.g. 0, ...) if size=='' or md5sum=='': # get size. md5 only if required file = _prepareProperties(lfn,pfn,guid,defaultStorage,protocol,tmpDir) size = file['size'] md5sum = file['md5sum'] pfn = file['pfn'] else: # pfn as evaluated in _prepareProperties() pfn = '%s%s' % (defaultStorage,pfn) if newBaseURL: pfn = os.path.join(newBaseURL, os.path.basename(pfn)) # check long(size) # assembling return value retVal = { # (SURL,LFN,GUID,size,md5sum) 'lfn': lfn, 'pfn': pfn, 'guid': guid, 'size': size, 'md5sum': md5sum } retGuids.append(guid) retList.append(retVal) if index==-1: retToRegister.append(retVal) return retList,retGuids,retToRegister # extract PFN and LFN from PoolFileCatalog def _getFNsPFC_old(stringValue,fromFile=True): lfns = [] pfns = [] guids = [] # instantiate parser try: if fromFile: root = xml.dom.minidom.parse(stringValue) else: root = xml.dom.minidom.parseString(stringValue) files = root.getElementsByTagName('File') for file in files: # GUID guid = str(file.getAttribute('ID')) # get PFN node physical = file.getElementsByTagName('physical')[0] pfnNode = physical.getElementsByTagName('pfn')[0] # convert UTF8 to Raw pfn = str(pfnNode.getAttribute('name')) # remove protocol::/host pfn = re.sub('^[^:]+://[^/]+','',pfn) # remove protocol pfn = re.sub('^[^:]+:','',pfn) # get LFN node try: logical = file.getElementsByTagName('logical')[0] lfnNode = logical.getElementsByTagName('lfn')[0] # convert UTF8 to Raw lfn = str(lfnNode.getAttribute('name')) except: lfn = pfn.split('/')[-1] # append lfns.append(lfn) pfns.append(pfn) guids.append(guid) except: type, value, traceBack = sys.exc_info() print "ERROR : could not parse XML - %s %s" % (type, value) sys.exit(EC_XML) # return return (lfns,pfns,guids) # extract PFN and LFN from PoolFileCatalog def _getFNsPFC(stringValue,fromFile=True,pstrict=False): """Parses Athena or LRC procuced PoolFileCatalog.xml files strict - if True size and md5sum metadata is required. Athena produced PFC may have no metadata """ lfns = [] pfns = [] guids = [] fsizes = [] fmd5sums = [] # instantiate parser try: if fromFile: root = xml.dom.minidom.parse(stringValue) else: root = xml.dom.minidom.parseString(stringValue) files = root.getElementsByTagName('File') for file in files: # GUID guid = str(file.getAttribute('ID')) # get PFN node physical = file.getElementsByTagName('physical')[0] pfnNode = physical.getElementsByTagName('pfn')[0] # convert UTF8 to Raw pfn = str(pfnNode.getAttribute('name')) # remove protocol::/host pfn = re.sub('^[^:]+://[^/]+','',pfn) # remove protocol pfn = re.sub('^[^:]+:','',pfn) # get LFN node try: logical = file.getElementsByTagName('logical')[0] lfnNode = logical.getElementsByTagName('lfn')[0] # convert UTF8 to Raw lfn = str(lfnNode.getAttribute('name')) except: lfn = pfn.split('/')[-1] fsize = '' #None fmd5sum = '' #None meta_list = file.getElementsByTagName('metadata') for metaNode in meta_list: meta_name = str(metaNode.getAttribute('att_name')) if meta_name=='fsize': fsize = str(metaNode.getAttribute('att_value')) elif meta_name=='md5sum': fmd5sum = str(metaNode.getAttribute('att_value')) # Athena produced PFC may have no metadata if pstrict and (not fsize or not fmd5sum): print "ERROR : invalid XML, file %s/%s has no valid size or md5sum metadata" % (lfn, guid) sys.exit(EC_XML) # append lfns.append(lfn) pfns.append(pfn) guids.append(guid) fsizes.append(fsize) fmd5sums.append(fmd5sum) except: type, value, traceBack = sys.exc_info() print "ERROR : could not parse XML - %s %s" % (type, value) sys.exit(EC_XML) # return return (lfns,pfns,guids,fsizes,fmd5sums) # get PFNs from LRC def _getPFNsLRC_old(lfns,siteID=DQ2LOCALSITEID): retLFNs = [] retGUIDs = [] # instantiate curl curl = _Curl() # get PoolFileCatalog iLFN = 0 outXML ='' strLFNs = '' # if no local site service if toaHelper.getLRC(siteID) == None: return retLFNs url = toaHelper.getLRC(siteID) + 'lrc/PoolFileCatalog' usePOST = True for lfn in lfns: iLFN += 1 # make argument strLFNs += '%s ' % lfn if iLFN % 40 == 0 or iLFN == len(lfns): # get PoolFileCatalog strLFNs = strLFNs.rstrip() data = {'lfns':strLFNs} # avoid too long argument strLFNs = '' # execute if usePOST: status,out = curl.post(url,data) if status == 0 and re.search('Must GET',out) != None: # use GET usePOST = False if not usePOST: status,out = curl.get(url,data) if out.startswith('Error'): # LNF not found continue if status != 0 or (not out.startswith(' 4: project = dots[0] type = dots[4] return '%s/%s/%s/%s/%s' % (bpath, project, type, dsn, lfn) # if no dots elif len(dots) == 1: return '%s/other/%s/%s' % (bpath, dsn, lfn) # some dots eg user.name.something datasets else: project = dots[0] return '%s/%s/%s/%s' % (bpath, project, dsn, lfn) # register files to LRC def _registerFilesLRC(files, check_catalog=True): realGUID = [] lfns = [] if check_catalog: # get LFN list to check if files already registered tmp_lfns = [] for file in files: tmp_lfns.append(file['lfn']) lfns,guids,sizes,md5sums = _getPFNsLRC(tmp_lfns) # make argument for registration strPFN = '' strLFN = '' strGUID = '' strSize = '' strMD5 = '' strARC = '' for file in files: if file['lfn'] in lfns: index = lfns.index(file['lfn']) realGUID.append(guids[index]) else: realGUID.append(file['guid']) strPFN += '+%s' % file['pfn'] strLFN += '+%s' % file['lfn'] strGUID += '+%s' % file['guid'] strSize += '+%s' % file['size'] strMD5 += '+%s' % file['md5sum'] strARC += '+P' # query if strPFN != '': # instantiate curl curl = _Curl() # register url = toaHelper.getLRC(DQ2LOCALSITEID) + 'lrc/files' data = {} data['pfns'] = strPFN[1:] data['lfns'] = strLFN[1:] data['guids'] = strGUID[1:] data['fsizes'] = strSize[1:] data['md5sums'] = strMD5[1:] data['archivals'] = strARC[1:] status,out = curl.post(url,data) if status != 0 or out != 1: print out print "ERROR : could not register files to LRC" sys.exit(EC_RegisterLRC) # return return realGUID # register files to LFC def _registerFilesLFC(files,datasetname): realGUID = [] # set LFC HOST os.environ['LFC_HOST'] = toaHelper.getLFC(DQ2LOCALSITEID) # lfc_list structure stat = lfc.lfc_filestatg() # start LFC session try: lfc.lfc_startsess('','') except NameError: pass if globalVerbose: print "LFC_HOST=%s" % os.environ['LFC_HOST'] # loop over all files for file in files: # get LFC directory lfcLFN = _toNativeLFN(datasetname,file['lfn']) lfcDir = os.path.dirname(lfcLFN) # check if the rectory already exists com = 'lfc-ls -d %s' % lfcDir if globalVerbose: print com status,output = commands.getstatusoutput(com) if status != 0: # create directory com = 'lfc-mkdir -p %s' % lfcDir if globalVerbose: print com status,output = commands.getstatusoutput(com) if status != 0: print output print "ERROR : could not create LFC directory:%s" % lfcDir sys.exit(EC_RegisterLRC) if globalVerbose: print output else: if globalVerbose: print output # check if already registered com = 'lcg-lg --vo atlas %s' % file['pfn'] if globalVerbose: print com status,output = commands.getstatusoutput(com) if globalVerbose: print output if status == 0: # correct GUID file['guid'] = re.sub('^guid:','',output) else: # register to LFC com = 'lcg-rf --vo atlas -t 120 -l %s -g %s %s' % \ (lfcLFN,file['guid'],file['pfn']) if globalVerbose: print com nTry = 5 for iTry in range(nTry): status,output = commands.getstatusoutput(com) if status == 0: break # failed if (iTry+1) < nTry: time.sleep(30) continue print output print "ERROR : could not register to LFC:%s" % lfcLFN sys.exit(EC_RegisterLRC) if globalVerbose: print output # register size and md5sum for iTry in range(nTry): if globalVerbose: print "lfc_setfsizeg(%s,%s,%s,%s)" % (file['guid'],long(file['size']), "MD",file['md5sum']) ret = lfc.lfc_setfsizeg(file['guid'],long(file['size']),"MD",file['md5sum']) if ret == 0: if globalVerbose: print output break # failed if (iTry+1) < nTry: time.sleep(30) continue print output print "ERROR : could not register md5sum/size to LFC:%s" % lfcLFN return False # set GUID realGUID.append(file['guid']) # end session try: lfc.lfc_endsess() except NameError: pass # return return realGUID # register files to LRC using MySQLdb def _registerFilesLRC_SQL(files): # import mysql-python try: import MySQLdb except: print "ERROR : could not import MySQLdb" sys.exit(EC_RegisterLRC) # parse contact string urlEnvVar = 'DQ2_LRC_CONTACT_URL' if not urlEnvVar in os.environ: print "ERROR : %s is undefined" % urlEnvVar sys.exit(EC_RegisterLRC) match = re.search('^mysql://([^:]+):([^@]+)@([^:]+):(\d+)/(.+)$', os.environ[urlEnvVar]) if match == None: print "ERROR : Bad URL to connect to DB [%s]" % os.environ[urlEnvVar] sys.exit(EC_RegisterLRC) # connect try: if globalVerbose: "connect to %s" % match.group(3) conn = MySQLdb.connect(host=match.group(3),user=match.group(1), port=int(match.group(4)),passwd=match.group(2), db=match.group(5)) cur = conn.cursor() cur.execute("SET AUTOCOMMIT=1") except: type, value, traceBack = sys.exc_info() print type,value print "ERROR : could not connect to %s" % match.group(3) sys.exit(EC_RegisterLRC) # register files realGUID = [] for file in files: try: # insert pfn sql = "INSERT INTO t_pfn (pfname,guid) VALUES ('%s','%s')" % \ (file['pfn'],file['guid']) if globalVerbose: print sql cur.execute(sql) # insert lfn sql = "INSERT INTO t_lfn (lfname,guid) VALUES ('%s','%s')" % \ (file['lfn'],file['guid']) if globalVerbose: print sql cur.execute(sql) # insert fsize, md5, last modified, archival bits sql = "INSERT INTO t_meta (fsize,md5sum,lastmodified,archival,guid) VALUES" sql += " ('%s','%s','%s','%s','%s')" % \ (file['size'], file['md5sum'],str(time.time())[:10],'P',file['guid']) if globalVerbose: print sql cur.execute(sql) except: type, value, traceBack = sys.exc_info() print type,value print "ERROR : could not register %s" % file['lfn'] sys.exit(EC_RegisterLRC) # append realGUID.append(file['guid']) # close connection conn.close() # return return realGUID # register dataset def _registerDataset(datasetname,filesList,guids,siteID): # generate DUID/VUID duid = commands.getoutput("uuidgen") vuid = commands.getoutput("uuidgen") # add dataset if USEDQ2MOD: # use dq2 mod try: # get repository client repo = dq2api.repositoryClient out = repo.addDataset(datasetname,duid,vuid) status = 0 except: type, value, traceBack = sys.exc_info() print "ERROR : addDataset %s %s" % (type, value) sys.exit(EC_RegisterDQ2) else: # instantiate curl curl = _Curl() curl.sslCert = _x509() curl.sslKey = _x509() # add url = baseURLDQ2SSL + 'ws_repository/rpc' data = {'operation':'addDataset','dsn': datasetname,'duid': duid,'vuid':vuid, 'API':'0_3_0','tuid':commands.getoutput('uuidgen'),'update':'yes'} status,out = curl.post(url,data) if status != 0 or (out != None and re.search('Exception',out) != None): print status,out print "ERROR : could not add dataset to repository" sys.exit(EC_RegisterDQ2) # add content files = [] for i in range(len(filesList)): fItem=filesList[i] files.append({'guid':guids[i],'lfn':fItem['lfn'],'size':long(fItem['size']),'checksum':'md5:'+fItem['md5sum']}) if USEDQ2MOD: # use dq2 mod try: # get content client cont = dq2api.contentClient out = cont.addFilesToDataset(vuid,[],files) status = 0 except: type, value, traceBack = sys.exc_info() print "ERROR : addFilesToDataset %s %s" % (type, value) sys.exit(EC_RegisterDQ2) else: # add url = baseURLDQ2SSL + 'ws_content/rpc' data = {'operation':'addFilesToDataset','vuid': vuid,'vuids':[],'files':files, 'API':'0_3_0','tuid':commands.getoutput('uuidgen'),'update':'yes'} status,out = curl.post(url,data) if status != 0 or (out != None and out != [] and re.search('Exception',out) != None): print status,out print "ERROR : could not add content of dataset" sys.exit(EC_RegisterDQ2) # freeze dataset if USEDQ2MOD: # use dq2 mod try: # get repository client repo = dq2api.repositoryClient out = repo.setState(datasetname,2) status = 0 except: type, value, traceBack = sys.exc_info() print "ERROR : setState %s %s" % (type, value) sys.exit(EC_RegisterDQ2) else: url = baseURLDQ2SSL + 'ws_repository/rpc' data = {'operation':'setState','dsn':datasetname,'state':2, 'API':'0_3_0','tuid':commands.getoutput('uuidgen'),'update':'yes'} status,out = curl.post(url,data) if status != 0 or (out != None and re.search('Exception',out) != None): print status,out print "ERROR : could not freeze dataset" sys.exit(EC_RegisterDQ2) # register location if USEDQ2MOD: # use dq2 mod try: # get location client loc = dq2api.locationClient out = loc.addDatasetReplica(vuid,siteID,complete=1) status = 0 except: type, value, traceBack = sys.exc_info() print "ERROR : addDatasetReplica %s %s" % (type, value) sys.exit(EC_Location) else: # get location url = baseURLDQ2SSL + 'ws_location/rpc' data = {'operation':'addDatasetReplica','vuid':vuid,'site':siteID,'dsn':datasetname,'complete':1, 'API':'0_3_0','tuid':commands.getoutput('uuidgen'),'update':'yes'} status,out = curl.post(url,data) if status != 0 or (isinstance(out,types.StringType) and re.search('Exception',out) != None): print status,out print "ERROR : could not register location" sys.exit(EC_RegisterDQ2) def _preparePropertiesForAll(lfns,pfns,guids,storageHost,configLOCALPROTOCOL,tmpDir,newBaseURL): # prepare properties filesList = [] for i in range(len(lfns)): file = _prepareProperties(lfns[i],pfns[i],guids[i],storageHost,configLOCALPROTOCOL,tmpDir) filesList.append(file) # replace PFNs if different baseURL if newBaseURL: for file in filesList: file['pfn'] = os.path.join(newBaseURL, os.path.basename(file['pfn'])) return filesList def _guidMissing(guids,filesToRegisterList): tmp_guids = [] for i in filesToRegisterList: if i['guid'] in guids: tmp_guids.append(i['guid']) else: return True try: for i in guids: tmp_guids.remove(i) except ValueError: return True if tmp_guids: return True return False #################################################################### # main def main(): import getopt # option class class _options: def __init__(self): pass options = _options() del _options # set default values options.verbose = False options.directory = '' options.newBaseURL = '' options.exist = False options.partialReg = False options.pool = '' options.official = False options.siteID = DQ2LOCALSITEID options.mysql = False options.tmpDir = '/tmp' options.gsiftp = '' # get command-line parameters try: opts, args = getopt.getopt(sys.argv[1:],"hvp:d:n:efos:mt:g:", ["help","verbose","pool=","directory=","newbase=", "exist","fast-registration","official", "site=","mysql","tmpdir=","gsiftp="]) except: _usage() print "ERROR : Invalid options" sys.exit(EC_Main) # set options for o, a in opts: if o in ("-h","--help"): _usage() sys.exit() if o in ("-v","--verbose"): options.verbose = True if o in ("-d","--directory"): options.directory = a if o in ("-n","--newbase"): options.newBaseURL = a if o in ("-e","--exist"): options.exist = True if o in ("-f","--fast-registration"): options.partialReg = True if o in ("-p","--pool"): options.pool = a if o in ("-o","--official"): options.official = True if o in ("-s","--site"): options.siteID = a if o in ("-m","--mysql"): options.mysql = True if o in ("-t","--tmpdir"): options.tmpDir = a if o in ("-g","--gsiftp"): options.gsiftp = a # check flags if options.exist or options.partialReg: if options.mysql: print "ERROR: options -e and -f are incompatible with -m" sys.exit(EC_Main) #if options.pool == '': # print "WARNING: if you use no PoolFileCatalog.xml be sure that your LFNs be unique." # print "If LFNs are not unique you will endup registering the wrong file." # global flags global globalVerbose globalVerbose = options.verbose # use gsiftp if options.gsiftp != '': global configLOCALPROTOCOL configLOCALPROTOCOL = 'glite' global configGSIFTPHOST configGSIFTPHOST = 'gsiftp://%s' % options.gsiftp # check grid-proxy status,output = commands.getstatusoutput('grid-proxy-info -e') if status != 0: print "ERROR : No valid grid-proxy. Do 'grid-proxy-init'" sys.exit(EC_Main) # datasetname if len(args) == 0: print "ERROR : no datasetname" sys.exit(EC_Main) datasetname = args[0] # check if dataset follows the naming convention if not options.official: # get DN dn = '' output = commands.getoutput('grid-proxy-info -identity') for line in output.split('/'): if line.startswith('CN='): dn = re.sub('^CN=','',line) dn = re.sub('\d+$','',dn) dn = dn.replace(' ','') break if dn == '': print 'could not get DN from %s' % output sys.exit(EC_Main) # check if re.match('^user\.%s\.' % dn,datasetname) == None: print "ERROR : invalid datasetname : %s" % datasetname print " datasetname must be 'user.. needs to be consistent with DN" print " e.g., user.%s.csc11.002.Gee_500_pythia_photos_reson" % dn sys.exit(EC_Main) # get PFN/LFNs if options.pool != '': # extract list of files from PoolFileCatalog lfns,pfns,guids,sizes,md5sums = _getFNsPFC(options.pool) elif options.directory != '': # get list of files in directory lfns,pfns,guids = _getFNsDir(options.directory,configLOCALPROTOCOL) sizes = [] md5sums = [] for i in range(len(lfns)): sizes.append('') md5sums.append('') else: print "ERROR : -p or -d should be defined" sys.exit(EC_Main) # get storage host and ID storageHost,storageID = _getDefaultStorage(options.siteID) # register files to LRC filesList = [] if options.mysql: # prepare properties filesList = _preparePropertiesForAll(lfns,pfns,guids,storageHost,configLOCALPROTOCOL,options.tmpDir,options.newBaseURL) # do register guids = _registerFilesLRC_SQL(filesList) elif toaHelper.getLFC(DQ2LOCALSITEID) != None: # prepare properties filesList = _preparePropertiesForAll(lfns,pfns,guids,storageHost,configLOCALPROTOCOL,options.tmpDir,options.newBaseURL) # do register guids = _registerFilesLFC(filesList,datasetname) else: if not options.exist and not options.partialReg: # prepare properties filesList = _preparePropertiesForAll(lfns,pfns,guids,storageHost,configLOCALPROTOCOL,options.tmpDir,options.newBaseURL) # do register guids = _registerFilesLRC(filesList) else: # get LFN list to check if files already registered (from LRC) lfns2,guids2,sizes2,md5sums2 = _getPFNsLRC(lfns) # _mergeProperties(5 lists from directory, 4 lists from catalog -no pnn-, options) filesList,guidsList,filesToRegisterList = _mergeProperties(guids,lfns,pfns,sizes,md5sums,guids2,lfns2,sizes2,md5sums2, storageHost,configLOCALPROTOCOL,options.tmpDir,options.newBaseURL, options.exist) if globalVerbose: print "%s files in dataset, %s not in LRC" % (len(filesList), len(filesToRegisterList)) if options.exist and filesToRegisterList: print "ERROR : some files are not registered in LRC, run withot option -e (-exist)" sys.exit(EC_Main) else: guids = _registerFilesLRC(filesToRegisterList,False) # check that all missing files registered if len(guids) != len(filesToRegisterList) or _guidMissing(guids,filesToRegisterList): print "ERROR : Not all files registered in LRC (%s out of %s)" % (len(guids),len(filesToRegisterList)) sys.exit(EC_Main) # the newly registered guids are not different (catalog was checked before) guids = guidsList # register dataset if globalVerbose: print "Registering %s files as %s" % (len(filesList), datasetname) _registerDataset(datasetname,filesList,guids,storageID) # return return if __name__ == "__main__": main()