Jenkins job…. put it running every 5 minutes…
from objects import Jenkins import logging import sys import re import pysvn import os jenkins_uid="brand_admin" jenkins_pwd="1qazxsw2" jenkins_url="http://jenkins.company.com" jenkins_job_file="config.xml" device_list=("android", "ios", "blackberry") svn_url="https://test.freerange360.com/svn/" jenkins_project_list={} jenkins_project_list["android"] = "android_trunk" jenkins_project_list["ios"] = "ios_trunk" jenkins_project_list["blackberry"] = "blackberry_trunk" """ get_svn_login is required for pysvn to set credentials """ def get_svn_login(realm, username, may_save): return True, "svninfoforbuild", "g3tV3rs10n", False """ query SVN for a list of tags. Look through that list for tags matching regex. figure out which of these was created last. return this tag. """ def svn_get_latest_tag_list(device, client, quantity): print("svn_get_latest_tag_list(" + device + ", " + str(client) + ", " + quantity + ")") reponame = svn_url + device + "/tags" # Get list of svn tags in the repo. svn_tag_list = client.list(reponame) svn_tag_coll = {} to_return = [] # Create a collection of tags & sort the keys (timestamp) for tag in svn_tag_list: m = re.search(r"" + reponame + "/(" + device + "-\d+\.\d+\.\d+\.\d+)$", tag[0]["path"]) if m is not None: svn_tag_coll[tag[0]["time"]] = m.group(1) sorted_keys = sorted(svn_tag_coll) key_count = len(sorted_keys) # Pull only the quantity specified as an arg. startnum = key_count - int(quantity) endnum = key_count for l in reversed(range(startnum, endnum)): to_return.append(svn_tag_coll[sorted_keys[l]]) # Reverse the order. return to_return # Define initial variables. device_type = None quantity = 10 # Get args provided by user. for arg in sys.argv: args = re.search(r"^--(\S+)=(\S+)$", arg) if args is not None: if re.search(r"^device_type$", args.group(1)): device_type = args.group(2).lower() if re.search(r"^quantity$", args.group(1)): quantity = args.group(2).lower() if device_type in device_list: print(device_type + " is valid device") else: sys.exit("ERROR: device_type does not match the required list of options (android, ios, blackberry)") svn_trunk_url=svn_url + device_type + "/trunk" jenkins_job_name = device_type + "_batch" # accept arg: blackberry, android, ios # use this arg. look in SVN//tags & get list of tags # populate _batch job_to_execute with this list. logging.basicConfig(level=logging.DEBUG) jenkins = Jenkins.Jenkins(jenkins_url, jenkins_uid, jenkins_pwd, logging) client = pysvn.Client() client.callback_get_login = get_svn_login tag_list = svn_get_latest_tag_list(device_type, client, quantity) #tag_list.insert(0, device_type + "_trunk") tag_list.append(device_type + "_trunk") xml_file_orig = jenkins.get_project_xml(jenkins_job_name) xml_file_new = jenkins.batch_project_replace_job_list(xml_file_orig, tag_list, jenkins_job_name) if(xml_file_new): print(jenkins.push_xml_to_project(jenkins_job_name, xml_file_new)) os.remove(xml_file_new) os.remove(xml_file_orig)
objects/Jenkins.py
import re import time import xml.etree.ElementTree as ET #from xml.etree.ElementTree import ElementTree, Element, SubElement import logging import base64 import urllib import urllib2 import JenkinsJob import Brand from urllib2 import URLError, HTTPError class Jenkins: url = None username = None password = None logging = None base64string = None job_list_all = None logging = None et = None def __init__(self, url, username, password, logging): self = self self.url = url self.username = username self.password = password self.logging = logging self.base64string = base64.encodestring("%s:%s" % (username, password))[:-1] self.job_list_all = {} self.et = ET.ElementTree() def fetch_url(self, url): req = urllib2.Request(url) req.add_header("Authorization", "Basic %s" % self.base64string) results = None try: response = urllib2.urlopen(req) results = response.read() except URLError, e: #self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e)) return False except HTTPError, e: #self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e)) return False return results def get_project_xml(self, job_name): time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) jobxmlurl = self.url + "/job/" + job_name + "/config.xml" self.logging.debug("jobxmlurl = " + jobxmlurl) jobxml = self.fetch_url(jobxmlurl) if(jobxml): #self.logging.debug("jobxml = " + str(jobxml)) filename = "./" + job_name + "_" + time_str + ".xml" f = open(filename, "w") f.write(jobxml) f.close() return filename else: return False #print jobxmlurl ''' project_xml_replace_brand_list takes 2 args: xml_file_orig => the path to your xml file you would like modified. blist => a list of Brand objects which you would like as options in the dropdown. The XML will be modified to have all brands as options. XML STRUCTURE:... ''' def project_xml_replace_brand_list(self, xml_file_orig, blist, job_name): time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) project_elem = self.et.parse(xml_file_orig) if project_elem.find("properties") is not None: if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty") is not None: if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") is not None: try: parameterDef = project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") parameters = list(parameterDef) for param in parameters: if param.find("name").text == "brand": # THIS IS THE brand VAR try: # LOAD CORRECT PORTION OF XML paramchoicesa = param.find("choices").find("a") allchoices = list(paramchoicesa) self.logging.debug("ORIGINAL CHOICE COUNT = " + str(len(allchoices))) # DELETE ALL PARAM CHOICES paramchoicesa.clear() # DEFINE ALL PARAM CHOICES paramchoicesa.set("class", "string-array") #elem = ElementTree.Element("string") #elem.text = "foo" #paramchoicesa.append(elem) for b in blist: elem = ET.Element("string") elem.text = b paramchoicesa.append(elem) #for b in blist.keys(): # #print blist[b] # elem = Element("string") # elem.text = blist[b].id # paramchoicesa.append(elem) allchoices = list(paramchoicesa) print len(allchoices) self.logging.debug("FINAL CHOICE COUNT = " + str(len(allchoices))) except Exception, e: self.logging.exception("EXCEPTION IN project_xml_replace_brand_list: " + str(e)) # MODIFICATION IS DONE. WRITE TO FILE. newxmlname = "./" + job_name + "_new_" + time_str + ".xml" self.et.write(newxmlname) return newxmlname except Exception, e: logging.exception(job_name + " does not follow appropriate xml format") return False def batch_project_replace_job_list(self, job_xml, job_list, job_name): time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) # 1) fetch job xml #jobxml = self.get_project_xml(job_name) project_elem = self.et.parse(job_xml) # 2) replace job_to_execute options with job_list if project_elem.find("properties") is not None: if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty") is not None: if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") is not None: try: parameterDef = project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") parameters = list(parameterDef) for param in parameters: if param.find("name").text == "job_to_execute": try: # LOAD CORRECT PORTION OF XML paramchoicesa = param.find("choices").find("a") allchoices = list(paramchoicesa) self.logging.debug("ORIGINAL CHOICE COUNT = " + str(len(allchoices))) # DELETE ALL PARAM CHOICES paramchoicesa.clear() # DEFINE ALL PARAM CHOICES paramchoicesa.set("class", "string-array") for b in job_list: elem = ET.Element("string") elem.text = b paramchoicesa.append(elem) allchoices = list(paramchoicesa) self.logging.debug("FINAL CHOICE COUNT = " + str(len(allchoices))) print "job_to_execute" except Exception, e: self.logging.exception("EXCEPTION IN batch_project_replace_job_list: " + str(e)) # MODIFICATION IS DONE. WRITE TO FILE. newxmlname = "./" + job_name + "_new_" + time_str + ".xml" self.et.write(newxmlname) return newxmlname except Exception, e: logging.exception(job_name + " does not follow appropriate xml format") ''' XML STRUCTURE:... ... brand ...choice1 choice2 ... REPLACE THE StringParameterDefinition WITH:... ... brand http://test.freerange360.com/freenews/brandadmin?user=brandadmin&pw=s247-brand&type=brand ''' def project_xml_replace_brand_with_choice(self, xml_file_orig, blist, job_name): #self.logging.debug("project_xml_replace_brand_with_choice(self, " + xml_file_orig + ", " + job_name + ")") # 1) Create the element which will be inserted. choicedef = ET.Element("hudson.model.ChoiceParameterDefinition") namedef = ET.Element("name") namedef.text = "brand" descdef = ET.Element("description") descdef.text = "The brand you would like to build" choicesdef = ET.Element("choices") choicesdef.set("class", "java.util.Arrays$ArrayList") adef = ET.Element("a") adef.set("class", "string-array") #choice1 = ET.Element("string") #choice1.text = "foo" choicedef.append(namedef) choicedef.append(descdef) for b in blist: choice = ET.Element("string") choice.text = b adef.append(choice) #adef.append(choice1) choicesdef.append(adef) choicedef.append(choicesdef) # AT THIS POINT, choicedef IS THE XML WHICH MUST BE INSERTED. # NOW IT'S TIME TO DELETE THE APPROPRIATE PARAM FROM THE XML project_elem = self.et.parse(xml_file_orig) if project_elem.find("properties") is not None: if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty") is not None: if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") is not None: try: parameterDef = project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") parameters = list(parameterDef) for param in parameters: if param.tag == "hudson.model.StringParameterDefinition" and param.find("name").text == "brand" : self.logging.debug("---------------------------------------------------------") print job_name print str(param.tag) # DELETE THIS PARAM parameterDef.remove(param) # ADD OUR NEW PARAM parameterDef.insert(0, choicedef) #if param.find("name").text == "brand": # parameters.remove(param) self.logging.debug("---------------------------------------------------------") time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) newxmlname = "./" + job_name + "_new_" + time_str + ".xml" self.et.write(newxmlname) return newxmlname except Exception, e: logging.exception(job_name + " does not follow appropriate xml format") return False def copy_project_xml(self, new_proj_name, source_proj, new_proj_xml): # READ XML INTO MEMORY xml_contents = "" file = open(new_proj_xml, "r") for line in file: xml_contents += line file.close() # 1) CREATE A NEW JOB IN JENKINS job_create_url = self.url + "/createItem?name=" + new_proj_name self.logging.debug(job_create_url) req = urllib2.Request(job_create_url, data=xml_contents, headers={"Content-Type":"text/xml"}) #base64string = base64.encodestring("%s:%s" % (jenkins_id, jenkins_pwd))[:-1] req.add_header("Authorization", "Basic %s" % self.base64string) response = urllib2.urlopen(req) create_results = response.read() def push_xml_to_project(self, proj_name, proj_xml): self.logging.debug("push_xml_to_project(self, " + proj_name + ", " + proj_xml + ")") jobxmlurl = self.url + "/job/" + proj_name + "/config.xml" #jobxmlurl = self.url + "/createItem?name=" + proj_name #jobxmlurl = self.url + "/job/" + proj_name + "/api/?" #jobxmlurl = "http://jenkins.freerange360.lan:8080/job/android_build_james/config.xml" xml_contents = "" file = open(proj_xml, "r") for line in file: xml_contents += line file.close() try: req = urllib2.Request(jobxmlurl, data=xml_contents, headers={"Content-Type":"text/xml"}) #req = urllib2.Request(jobxmlurl, data=proj_xml) #req.get_method = lambda: 'POST' base64string = base64.encodestring("%s:%s" % (self.username, self.password))[:-1] req.add_header("Authorization", "Basic %s" % base64string) response = urllib2.urlopen(req) create_results = response.read() return create_results except Exception, e: logging.exception(str(e)) return False def project_exists(self, pname): jobxmlurl = self.url + "/job/" + pname results = self.fetch_url(jobxmlurl) if results: return True else: return False def load_job_list_all(self): jl = {} myurl = self.url + "/api/xml" results = self.fetch_url(myurl) hudson_elem = ET.fromstring(results) jobs = hudson_elem.findall("job") for j in jobs: #self.logging.debug("j = " + str(j)) self.logging.debug("job = JenkinsJob.JenkinsJob(" + j.find("name").text + ", " + j.find("url").text + ", " + self.username + ", " + self.password + ", self.logging.getLogger())") job = JenkinsJob.JenkinsJob(j.find("name").text, j.find("url").text, self.username, self.password, self.logging.getLogger()) self.job_list_all[j.find("name").text] = job #self.logging.debug("name = " + j.find("name").text) #self.logging.debug("url = " + j.find("url").text) return len(self.job_list_all) def get_job_list_xml_view(self, view): myurl = self.url + "/view/" + view + "/api/xml" results = self.fetch_url(myurl) #print results name The brand you would like to build. JayHiggs
objects/JenkinsJob.py
import logging import os import re import base64 import urllib import urllib2 import xml.etree.ElementTree as ET from objects import Helper from objects import JenkinsBuild class JenkinsJob: name = None url = None builds = {} logdir = None base64string = None username = None password = None def __init__(self, name, url, username, password, logger): self = self self.name = name self.url = url self.logger = logger self.username = username self.password = password self.base64string = base64.encodestring("%s:%s" % (username, password))[:-1] self.helper = Helper.Helper(self.logger) def fetch_url(self, url): req = urllib2.Request(url) req.add_header("Authorization", "Basic %s" % self.base64string) results = None try: response = urllib2.urlopen(req) results = response.read() except URLError, e: #self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e)) return False except HTTPError, e: #self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e)) return False return results ''' Load a list of all executions of this job ''' def load_build_list_all(self): self.logger.debug("START JenkinsJob.load_build_list_all()") xmlurl = self.url + "/api/xml" results = self.fetch_url(xmlurl) jenkins_elem = ET.fromstring(results) builds = jenkins_elem.findall("build") temp_build_list = {} for b in builds: num = b.find("number").text url = b.find("url").text jb = JenkinsBuild.JenkinsBuild(int(num), url, self.username, self.password, self.logger) self.logger.debug("jb.number = '" + str(jb.number) + "'") #self.builds[k] = temp_build_list[k] #temp_build_list[jb.number] = jb self.builds[jb.number] = jb # This list is out of order. Order list based on jb.number #sorted_keys = sorted(temp_build_list) #for k in sorted_keys: # self.logger.debug(k) # self.builds[k] = temp_build_list[k] self.logger.debug("END JenkinsJob.load_build_list_all() => " + str(len(self.builds))) ''' Iterate through self.builds & load detail xml of each. ''' def load_build_list_details(self): for bk in self.builds.keys(): b = self.builds[bk] b.fetch_details_via_xml()
objects/JenkinsBuild.py
import base64 import urllib import urllib2 import re from objects import Helper import xml.etree.ElementTree as ET #import JenkinsJob class JenkinsBuild: def __init__(self, number, url, username, password, logger): self = self self.number = number self.started_by = None self.timestamp = None self.fullDisplayName = None self.result = None self.building = None self.builtOn=None self.failure_reason = None self.url = url self.logger = logger self.username = username self.password = password self.parameters = {} self.base64string = base64.encodestring("%s:%s" % (username, password))[:-1] self.helper = Helper.Helper(self.logger) def fetch_url(self, url): req = urllib2.Request(url) req.add_header("Authorization", "Basic %s" % self.base64string) results = None try: response = urllib2.urlopen(req) results = response.read() except URLError, e: #self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e)) return False except HTTPError, e: #self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e)) return False return results def fetch_details_via_xml(self): self.logger.debug("START JenkinsBuild.fetch_details_via_xml") tempurl = self.url + "/api/xml" #http://jenkins.freerange360.lan:8080/job/android-4.0.0.16/47/api/xml self.logger.debug("fetch_url(" + tempurl + ")") xml = self.fetch_url(tempurl) fsb = ET.fromstring(xml) try: actions = fsb.findall("action") for a in actions: if a.findall("parameter"): parameters = a.findall("parameter") for p in parameters: self.parameters[p.find("name").text] = p.find("value").text elif a.findall("cause"): cause = a.find("cause") self.started_by = cause.find("userName").text #parameters = fsb.find("action").findall("parameter") #for p in parameters: # self.parameters[p.find("name").text = p.find("value").text #print p.find("name").text + " = " + p.find("value").text #self.started_by = fsb.find("action").find("cause").find("userName") self.building = fsb.find("building").text self.duration = fsb.find("duration").text self.timestamp = fsb.find("id").text self.result = fsb.find("result").text self.builtOn = fsb.find("builtOn").text if self.result == "FAILURE": self.fetch_why_project_failed() except AttributeError, e: self.logger.debug("ERROR -> " + str(e)) return False #self.logger.debug("END JenkinsBuild.fetch_details_via_xml") def fetch_why_project_failed(self): self.logger.debug("START JenkinsBuild.fetch_why_project_failed") tempurl = self.url + "/consoleText" self.logger.debug(tempurl) log = self.fetch_url(tempurl) self.log = log m = re.search("(No resource found that matches the given name)", log) if re.search("SVNException.*request\sfailed", log): self.failure_reason = "SVNException" elif re.search("UnknownHostException:\sfreerangeinc3\.virtual\.vps-host\.net", log): self.failure_reason = "Unable to FTP build" elif re.search("\[BEROR\]Code\sSign\serror", log): self.failure_reason = "Signing error" elif re.search("No\sresource\sidentifier\sfound\sfor\sattribute", log): self.failure_reason = "No resource identifier found error" elif re.search("(\[ERROR\]\sclient\.config\skey.*)", log): n = re.search("(\[ERROR\]\sclient\.config\skey.*)", log) self.failure_reason = n.group(1) elif re.search("error:\sError:\sNo\sresource\sfound\sthat\smatches\sthe\sgiven\sname", log): n = re.search("(error:\sError:\sNo\sresource\sfound\sthat\smatches\sthe\sgiven\sname.*)", log) self.failure_reason = n.group(1) elif re.search("ABORTED", log): self.failure_reason = "Aborted" elif m is not None: self.failure_reason = m.group(1) self.logger.debug("END JenkinsBuild.fetch_why_project_failed => " + self.failure_reason) elif re.search("Failure", log): self.failure_reason = "Unknown - look @ job"
objects/Helper.py
import os import re import time import logging class Helper: def __init__(self, logger): self = self self.logger = logger #def get_subdir_list(self, branddir): # #print "get_subdir_list(self, " + branddir + ")" # subdirs = [] # for adir in os.listdir(branddir): # if re.search(r"^\.", adir): # pass # else: # if os.path.isdir(os.path.join(branddir,adir)): # subdirs.append(adir) # subdirs.append(".") # return subdirs def get_timestamp(self): return time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime()) def dir_get_file_list(self, parentdir): self.logger.debug("START Helper.dir_get_file_list(self, " + parentdir + ")") files = [] for f in os.listdir(parentdir): self.logger.debug("f = " + str(f)) if os.path.isfile(os.path.join(parentdir, f)): self.logger.debug("files.append(" + str(f) + ")") files.append(f) self.logger.debug("END Helper.dir_get_file_list(self, " + parentdir + ")") return files def dir_get_subdir_list(self, branddir): self.logger.debug("START get_subdir_list(self, " + branddir + ")") subdirs = [] for adir in os.listdir(branddir): self.logger.debug("adir = " + str(adir)) if re.search(r"^\.", adir): pass else: if os.path.isdir(os.path.join(branddir,adir)): self.logger.debug("subdirs.append(" + str(adir) + ")") subdirs.append(adir) else: self.logger.debug(str(adir) + " is not a dir") self.logger.debug("END get_subdir_list(self, " + branddir + ")") return subdirs def milliseconds_to_human_readable(self, millis): millis = int(millis) seconds_total = millis / 1000 seconds = seconds_total % 60 minutes_total = seconds_total / 60 minutes = minutes_total % 60 hours = minutes_total / 60 return (hours, minutes, seconds)
objects/Brand.py
class Brand: id = None name = None def __init__(self, id, name): self = self self.id = id self.name = name
objects/Subversion.py
import pysvn import re import sys import time import datetime import logging class Subversion: user = "" pwd = "" url = "" checkout_path = None url = None client = None # status_list will be a collection of status_list[${file_path}] = ${is_versioned} status_list = {} logging = None def __init__(self, logging): self.user = "brandcreation" self.pwd = "newbrand" self.checkout_path = "" self.url = "" self.client = pysvn.Client() self.logging = logging try: self.client.callback_get_login = self.get_svn_login except Exception, e: sys.exit("Error: " + str(e)) ''' get_svn_login is required for pysvn to set credentials ''' def get_svn_login(self, realm, username, may_save): return True, "brandcreation", "newbrand", False #return True, "cmuser@freerangeinc.com", "D3pl0y", False ''' get_log_message is required for pysvn to for certain activities (such as copy) ''' def get_log_message(self): #return True, "brandadmin created activity " + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime()) return True, "FB-9390 - brandadmin created activity " + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime()) ''' Get the svn status of self.checkout_path Used to determine a list of modified / new files which need to be committed. Return size of list loaded. ''' def load_status(self): self.status_list = self.client.status(self.checkout_path) def test_credentials(self): return 0 ''' Get a list of only modified files. REQUIRE: load_status must be called first. ''' def get_changedfile_list(self): if len(self.status_list) is 0: return 0 else: toreturn = {} for stat in self.status_list: if(str(stat.text_status) == "modified"): toreturn.append(stat) return toreturn ''' Get a list of only new files. REQUIRE: load_status must be called first. ''' #def get_newfile_list(self): # if len(self.status_list) is 0: # return 0 # else: # toreturn = {} # for change in self.status_list.keys(): # if self.status_list[change] is "unversioned": # toreturn[change] = self.status_list[change] # return toreturn ''' Add new files to SVN REQUIRE: load_status must be called first. ''' #def commit_new_and_modded_files(self, message): # if len(self.status_list) is 0: # return 0 # else: # for change in self.status_list.keys(): # if self.status_list[change] is 0: # # SVN ADD # self.client.add(change) # result = self.client.checkin(self.checkout_path, message) # return(result) def get_subdir_list(self): subdirlist = self.client.list(self.url) toreturn = [] for d in subdirlist: bn = re.search(r".*/([\w\-]*)$", d[0]["path"]) if bn is not None: toreturn.append(bn.group(1)) return toreturn ''' Add new files to svn REQUIRE: load_status must be called first. ''' def add_new_files(self): self.logging.debug("Subversion.add_new_files(self)") self.logging.debug("status_list size = " + str(len(self.status_list))) count = 0 if len(self.status_list) is 0: return 0 else: for stat in self.status_list: if(str(stat.text_status) == "unversioned"): self.client.add(stat.path) count = count + 1 self.logging.debug("END Subversion.add_new_files(self) => " + str(count)) return count def commit(self, message): log_message = message self.callback_get_log_message = self.get_log_message self.client.checkin(self.checkout_path, message) def checkout(self, url, path): self.logging.info("checkout(self, " + url + ", " + path + ")") try: self.callback_get_login = self.get_svn_login results = self.client.checkout(url, path) return results except pysvn.ClientError, e: self.logging.exception("ERROR CHECKING OUT " + url + " -> " + str(e)) sys.exit("ERROR CHECKING OUT " + url + " -> " + str(e)) def copy(self, source, dest, message): try: log_message = "SVN copy " + source + " to " + dest self.client.callback_get_log_message = self.get_log_message self.client.copy(source, dest) except Exception, e: sys.exit("ERROR in Subversion.copy: " + str(e)) def remove(self, filepath): self.logging.debug("Subversion.remove(self, " + filepath + ")") try: self.client.callback_get_log_message = self.get_log_message self.client.remove(filepath) except Exception, e: self.logging.exception("ERROR in Subversion.remove of " + filepath + " => " + str(e)) sys.exit("ERROR in Subversion.remove of " + filepath + " => " + str(e)) def get_change_log(self, from_date): self.logging.debug("START get_change_log(" + str(from_date) + ")") rev = pysvn.Revision(pysvn.opt_revision_kind.date, from_date) change_list = {} try: #change_list = self.client.log(self.url, revision_start=rev) change_list = self.client.log(self.url, revision_end=rev, discover_changed_paths=True) except Exception, e: sys.exit("ERROR in Subversion.log: " + str(e)) self.logging.debug("END get_change_log(" + str(from_date) + ") => " + str(len(change_list))) return change_list