Jenkins job…. put it running every 5 minutes…
from objects import Jenkins
import logging
import sys
import re
import pysvn
import os
jenkins_uid="brand_admin"
jenkins_pwd="1qazxsw2"
jenkins_url="http://jenkins.company.com"
jenkins_job_file="config.xml"
device_list=("android", "ios", "blackberry")
svn_url="https://test.freerange360.com/svn/"
jenkins_project_list={}
jenkins_project_list["android"] = "android_trunk"
jenkins_project_list["ios"] = "ios_trunk"
jenkins_project_list["blackberry"] = "blackberry_trunk"
"""
get_svn_login is required for pysvn to set credentials
"""
def get_svn_login(realm, username, may_save):
return True, "svninfoforbuild", "g3tV3rs10n", False
"""
query SVN for a list of tags. Look through that list for tags matching regex.
figure out which of these was created last. return this tag.
"""
def svn_get_latest_tag_list(device, client, quantity):
print("svn_get_latest_tag_list(" + device + ", " + str(client) + ", " + quantity + ")")
reponame = svn_url + device + "/tags"
# Get list of svn tags in the repo.
svn_tag_list = client.list(reponame)
svn_tag_coll = {}
to_return = []
# Create a collection of tags & sort the keys (timestamp)
for tag in svn_tag_list:
m = re.search(r"" + reponame + "/(" + device + "-\d+\.\d+\.\d+\.\d+)$", tag[0]["path"])
if m is not None:
svn_tag_coll[tag[0]["time"]] = m.group(1)
sorted_keys = sorted(svn_tag_coll)
key_count = len(sorted_keys)
# Pull only the quantity specified as an arg.
startnum = key_count - int(quantity)
endnum = key_count
for l in reversed(range(startnum, endnum)):
to_return.append(svn_tag_coll[sorted_keys[l]])
# Reverse the order.
return to_return
# Define initial variables.
device_type = None
quantity = 10
# Get args provided by user.
for arg in sys.argv:
args = re.search(r"^--(\S+)=(\S+)$", arg)
if args is not None:
if re.search(r"^device_type$", args.group(1)):
device_type = args.group(2).lower()
if re.search(r"^quantity$", args.group(1)):
quantity = args.group(2).lower()
if device_type in device_list:
print(device_type + " is valid device")
else:
sys.exit("ERROR: device_type does not match the required list of options (android, ios, blackberry)")
svn_trunk_url=svn_url + device_type + "/trunk"
jenkins_job_name = device_type + "_batch"
# accept arg: blackberry, android, ios
# use this arg. look in SVN//tags & get list of tags
# populate _batch job_to_execute with this list.
logging.basicConfig(level=logging.DEBUG)
jenkins = Jenkins.Jenkins(jenkins_url, jenkins_uid, jenkins_pwd, logging)
client = pysvn.Client()
client.callback_get_login = get_svn_login
tag_list = svn_get_latest_tag_list(device_type, client, quantity)
#tag_list.insert(0, device_type + "_trunk")
tag_list.append(device_type + "_trunk")
xml_file_orig = jenkins.get_project_xml(jenkins_job_name)
xml_file_new = jenkins.batch_project_replace_job_list(xml_file_orig, tag_list, jenkins_job_name)
if(xml_file_new):
print(jenkins.push_xml_to_project(jenkins_job_name, xml_file_new))
os.remove(xml_file_new)
os.remove(xml_file_orig)
objects/Jenkins.py
import re
import time
import xml.etree.ElementTree as ET
#from xml.etree.ElementTree import ElementTree, Element, SubElement
import logging
import base64
import urllib
import urllib2
import JenkinsJob
import Brand
from urllib2 import URLError, HTTPError
class Jenkins:
url = None
username = None
password = None
logging = None
base64string = None
job_list_all = None
logging = None
et = None
def __init__(self, url, username, password, logging):
self = self
self.url = url
self.username = username
self.password = password
self.logging = logging
self.base64string = base64.encodestring("%s:%s" % (username, password))[:-1]
self.job_list_all = {}
self.et = ET.ElementTree()
def fetch_url(self, url):
req = urllib2.Request(url)
req.add_header("Authorization", "Basic %s" % self.base64string)
results = None
try:
response = urllib2.urlopen(req)
results = response.read()
except URLError, e:
#self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e))
return False
except HTTPError, e:
#self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e))
return False
return results
def get_project_xml(self, job_name):
time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime())
jobxmlurl = self.url + "/job/" + job_name + "/config.xml"
self.logging.debug("jobxmlurl = " + jobxmlurl)
jobxml = self.fetch_url(jobxmlurl)
if(jobxml):
#self.logging.debug("jobxml = " + str(jobxml))
filename = "./" + job_name + "_" + time_str + ".xml"
f = open(filename, "w")
f.write(jobxml)
f.close()
return filename
else:
return False
#print jobxmlurl
'''
project_xml_replace_brand_list takes 2 args:
xml_file_orig => the path to your xml file you would like modified.
blist => a list of Brand objects which you would like as options in the dropdown.
The XML will be modified to have all brands as options.
XML STRUCTURE:
...
...
brand
...
choice1
choice2
...
'''
def project_xml_replace_brand_list(self, xml_file_orig, blist, job_name):
time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime())
project_elem = self.et.parse(xml_file_orig)
if project_elem.find("properties") is not None:
if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty") is not None:
if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") is not None:
try:
parameterDef = project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions")
parameters = list(parameterDef)
for param in parameters:
if param.find("name").text == "brand":
# THIS IS THE brand VAR
try:
# LOAD CORRECT PORTION OF XML
paramchoicesa = param.find("choices").find("a")
allchoices = list(paramchoicesa)
self.logging.debug("ORIGINAL CHOICE COUNT = " + str(len(allchoices)))
# DELETE ALL PARAM CHOICES
paramchoicesa.clear()
# DEFINE ALL PARAM CHOICES
paramchoicesa.set("class", "string-array")
#elem = ElementTree.Element("string")
#elem.text = "foo"
#paramchoicesa.append(elem)
for b in blist:
elem = ET.Element("string")
elem.text = b
paramchoicesa.append(elem)
#for b in blist.keys():
# #print blist[b]
# elem = Element("string")
# elem.text = blist[b].id
# paramchoicesa.append(elem)
allchoices = list(paramchoicesa)
print len(allchoices)
self.logging.debug("FINAL CHOICE COUNT = " + str(len(allchoices)))
except Exception, e:
self.logging.exception("EXCEPTION IN project_xml_replace_brand_list: " + str(e))
# MODIFICATION IS DONE. WRITE TO FILE.
newxmlname = "./" + job_name + "_new_" + time_str + ".xml"
self.et.write(newxmlname)
return newxmlname
except Exception, e:
logging.exception(job_name + " does not follow appropriate xml format")
return False
def batch_project_replace_job_list(self, job_xml, job_list, job_name):
time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime())
# 1) fetch job xml
#jobxml = self.get_project_xml(job_name)
project_elem = self.et.parse(job_xml)
# 2) replace job_to_execute options with job_list
if project_elem.find("properties") is not None:
if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty") is not None:
if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") is not None:
try:
parameterDef = project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions")
parameters = list(parameterDef)
for param in parameters:
if param.find("name").text == "job_to_execute":
try:
# LOAD CORRECT PORTION OF XML
paramchoicesa = param.find("choices").find("a")
allchoices = list(paramchoicesa)
self.logging.debug("ORIGINAL CHOICE COUNT = " + str(len(allchoices)))
# DELETE ALL PARAM CHOICES
paramchoicesa.clear()
# DEFINE ALL PARAM CHOICES
paramchoicesa.set("class", "string-array")
for b in job_list:
elem = ET.Element("string")
elem.text = b
paramchoicesa.append(elem)
allchoices = list(paramchoicesa)
self.logging.debug("FINAL CHOICE COUNT = " + str(len(allchoices)))
print "job_to_execute"
except Exception, e:
self.logging.exception("EXCEPTION IN batch_project_replace_job_list: " + str(e))
# MODIFICATION IS DONE. WRITE TO FILE.
newxmlname = "./" + job_name + "_new_" + time_str + ".xml"
self.et.write(newxmlname)
return newxmlname
except Exception, e:
logging.exception(job_name + " does not follow appropriate xml format")
'''
XML STRUCTURE:
...
...
brand
http://test.freerange360.com/freenews/brandadmin?user=brandadmin&pw=s247-brand&type=brand
...
REPLACE THE StringParameterDefinition WITH:
name
The brand you would like to build.
JayHiggs
'''
def project_xml_replace_brand_with_choice(self, xml_file_orig, blist, job_name):
#self.logging.debug("project_xml_replace_brand_with_choice(self, " + xml_file_orig + ", " + job_name + ")")
# 1) Create the element which will be inserted.
choicedef = ET.Element("hudson.model.ChoiceParameterDefinition")
namedef = ET.Element("name")
namedef.text = "brand"
descdef = ET.Element("description")
descdef.text = "The brand you would like to build"
choicesdef = ET.Element("choices")
choicesdef.set("class", "java.util.Arrays$ArrayList")
adef = ET.Element("a")
adef.set("class", "string-array")
#choice1 = ET.Element("string")
#choice1.text = "foo"
choicedef.append(namedef)
choicedef.append(descdef)
for b in blist:
choice = ET.Element("string")
choice.text = b
adef.append(choice)
#adef.append(choice1)
choicesdef.append(adef)
choicedef.append(choicesdef)
# AT THIS POINT, choicedef IS THE XML WHICH MUST BE INSERTED.
# NOW IT'S TIME TO DELETE THE APPROPRIATE PARAM FROM THE XML
project_elem = self.et.parse(xml_file_orig)
if project_elem.find("properties") is not None:
if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty") is not None:
if project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions") is not None:
try:
parameterDef = project_elem.find("properties").find("hudson.model.ParametersDefinitionProperty").find("parameterDefinitions")
parameters = list(parameterDef)
for param in parameters:
if param.tag == "hudson.model.StringParameterDefinition" and param.find("name").text == "brand" :
self.logging.debug("---------------------------------------------------------")
print job_name
print str(param.tag)
# DELETE THIS PARAM
parameterDef.remove(param)
# ADD OUR NEW PARAM
parameterDef.insert(0, choicedef)
#if param.find("name").text == "brand":
# parameters.remove(param)
self.logging.debug("---------------------------------------------------------")
time_str = time.strftime("%Y-%m-%d_%H%M%S", time.localtime())
newxmlname = "./" + job_name + "_new_" + time_str + ".xml"
self.et.write(newxmlname)
return newxmlname
except Exception, e:
logging.exception(job_name + " does not follow appropriate xml format")
return False
def copy_project_xml(self, new_proj_name, source_proj, new_proj_xml):
# READ XML INTO MEMORY
xml_contents = ""
file = open(new_proj_xml, "r")
for line in file:
xml_contents += line
file.close()
# 1) CREATE A NEW JOB IN JENKINS
job_create_url = self.url + "/createItem?name=" + new_proj_name
self.logging.debug(job_create_url)
req = urllib2.Request(job_create_url, data=xml_contents, headers={"Content-Type":"text/xml"})
#base64string = base64.encodestring("%s:%s" % (jenkins_id, jenkins_pwd))[:-1]
req.add_header("Authorization", "Basic %s" % self.base64string)
response = urllib2.urlopen(req)
create_results = response.read()
def push_xml_to_project(self, proj_name, proj_xml):
self.logging.debug("push_xml_to_project(self, " + proj_name + ", " + proj_xml + ")")
jobxmlurl = self.url + "/job/" + proj_name + "/config.xml"
#jobxmlurl = self.url + "/createItem?name=" + proj_name
#jobxmlurl = self.url + "/job/" + proj_name + "/api/?"
#jobxmlurl = "http://jenkins.freerange360.lan:8080/job/android_build_james/config.xml"
xml_contents = ""
file = open(proj_xml, "r")
for line in file:
xml_contents += line
file.close()
try:
req = urllib2.Request(jobxmlurl, data=xml_contents, headers={"Content-Type":"text/xml"})
#req = urllib2.Request(jobxmlurl, data=proj_xml)
#req.get_method = lambda: 'POST'
base64string = base64.encodestring("%s:%s" % (self.username, self.password))[:-1]
req.add_header("Authorization", "Basic %s" % base64string)
response = urllib2.urlopen(req)
create_results = response.read()
return create_results
except Exception, e:
logging.exception(str(e))
return False
def project_exists(self, pname):
jobxmlurl = self.url + "/job/" + pname
results = self.fetch_url(jobxmlurl)
if results:
return True
else:
return False
def load_job_list_all(self):
jl = {}
myurl = self.url + "/api/xml"
results = self.fetch_url(myurl)
hudson_elem = ET.fromstring(results)
jobs = hudson_elem.findall("job")
for j in jobs:
#self.logging.debug("j = " + str(j))
self.logging.debug("job = JenkinsJob.JenkinsJob(" + j.find("name").text + ", " + j.find("url").text + ", " + self.username + ", " + self.password + ", self.logging.getLogger())")
job = JenkinsJob.JenkinsJob(j.find("name").text, j.find("url").text, self.username, self.password, self.logging.getLogger())
self.job_list_all[j.find("name").text] = job
#self.logging.debug("name = " + j.find("name").text)
#self.logging.debug("url = " + j.find("url").text)
return len(self.job_list_all)
def get_job_list_xml_view(self, view):
myurl = self.url + "/view/" + view + "/api/xml"
results = self.fetch_url(myurl)
#print results
objects/JenkinsJob.py
import logging
import os
import re
import base64
import urllib
import urllib2
import xml.etree.ElementTree as ET
from objects import Helper
from objects import JenkinsBuild
class JenkinsJob:
name = None
url = None
builds = {}
logdir = None
base64string = None
username = None
password = None
def __init__(self, name, url, username, password, logger):
self = self
self.name = name
self.url = url
self.logger = logger
self.username = username
self.password = password
self.base64string = base64.encodestring("%s:%s" % (username, password))[:-1]
self.helper = Helper.Helper(self.logger)
def fetch_url(self, url):
req = urllib2.Request(url)
req.add_header("Authorization", "Basic %s" % self.base64string)
results = None
try:
response = urllib2.urlopen(req)
results = response.read()
except URLError, e:
#self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e))
return False
except HTTPError, e:
#self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e))
return False
return results
'''
Load a list of all executions of this job
'''
def load_build_list_all(self):
self.logger.debug("START JenkinsJob.load_build_list_all()")
xmlurl = self.url + "/api/xml"
results = self.fetch_url(xmlurl)
jenkins_elem = ET.fromstring(results)
builds = jenkins_elem.findall("build")
temp_build_list = {}
for b in builds:
num = b.find("number").text
url = b.find("url").text
jb = JenkinsBuild.JenkinsBuild(int(num), url, self.username, self.password, self.logger)
self.logger.debug("jb.number = '" + str(jb.number) + "'")
#self.builds[k] = temp_build_list[k]
#temp_build_list[jb.number] = jb
self.builds[jb.number] = jb
# This list is out of order. Order list based on jb.number
#sorted_keys = sorted(temp_build_list)
#for k in sorted_keys:
# self.logger.debug(k)
# self.builds[k] = temp_build_list[k]
self.logger.debug("END JenkinsJob.load_build_list_all() => " + str(len(self.builds)))
'''
Iterate through self.builds & load detail xml of each.
'''
def load_build_list_details(self):
for bk in self.builds.keys():
b = self.builds[bk]
b.fetch_details_via_xml()
objects/JenkinsBuild.py
import base64
import urllib
import urllib2
import re
from objects import Helper
import xml.etree.ElementTree as ET
#import JenkinsJob
class JenkinsBuild:
def __init__(self, number, url, username, password, logger):
self = self
self.number = number
self.started_by = None
self.timestamp = None
self.fullDisplayName = None
self.result = None
self.building = None
self.builtOn=None
self.failure_reason = None
self.url = url
self.logger = logger
self.username = username
self.password = password
self.parameters = {}
self.base64string = base64.encodestring("%s:%s" % (username, password))[:-1]
self.helper = Helper.Helper(self.logger)
def fetch_url(self, url):
req = urllib2.Request(url)
req.add_header("Authorization", "Basic %s" % self.base64string)
results = None
try:
response = urllib2.urlopen(req)
results = response.read()
except URLError, e:
#self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e))
return False
except HTTPError, e:
#self.logging.debug("ERROR in Jenkins.fetchurl(" + url + ") => " + str(e))
return False
return results
def fetch_details_via_xml(self):
self.logger.debug("START JenkinsBuild.fetch_details_via_xml")
tempurl = self.url + "/api/xml"
#http://jenkins.freerange360.lan:8080/job/android-4.0.0.16/47/api/xml
self.logger.debug("fetch_url(" + tempurl + ")")
xml = self.fetch_url(tempurl)
fsb = ET.fromstring(xml)
try:
actions = fsb.findall("action")
for a in actions:
if a.findall("parameter"):
parameters = a.findall("parameter")
for p in parameters:
self.parameters[p.find("name").text] = p.find("value").text
elif a.findall("cause"):
cause = a.find("cause")
self.started_by = cause.find("userName").text
#parameters = fsb.find("action").findall("parameter")
#for p in parameters:
# self.parameters[p.find("name").text = p.find("value").text
#print p.find("name").text + " = " + p.find("value").text
#self.started_by = fsb.find("action").find("cause").find("userName")
self.building = fsb.find("building").text
self.duration = fsb.find("duration").text
self.timestamp = fsb.find("id").text
self.result = fsb.find("result").text
self.builtOn = fsb.find("builtOn").text
if self.result == "FAILURE":
self.fetch_why_project_failed()
except AttributeError, e:
self.logger.debug("ERROR -> " + str(e))
return False
#self.logger.debug("END JenkinsBuild.fetch_details_via_xml")
def fetch_why_project_failed(self):
self.logger.debug("START JenkinsBuild.fetch_why_project_failed")
tempurl = self.url + "/consoleText"
self.logger.debug(tempurl)
log = self.fetch_url(tempurl)
self.log = log
m = re.search("(No resource found that matches the given name)", log)
if re.search("SVNException.*request\sfailed", log):
self.failure_reason = "SVNException"
elif re.search("UnknownHostException:\sfreerangeinc3\.virtual\.vps-host\.net", log):
self.failure_reason = "Unable to FTP build"
elif re.search("\[BEROR\]Code\sSign\serror", log):
self.failure_reason = "Signing error"
elif re.search("No\sresource\sidentifier\sfound\sfor\sattribute", log):
self.failure_reason = "No resource identifier found error"
elif re.search("(\[ERROR\]\sclient\.config\skey.*)", log):
n = re.search("(\[ERROR\]\sclient\.config\skey.*)", log)
self.failure_reason = n.group(1)
elif re.search("error:\sError:\sNo\sresource\sfound\sthat\smatches\sthe\sgiven\sname", log):
n = re.search("(error:\sError:\sNo\sresource\sfound\sthat\smatches\sthe\sgiven\sname.*)", log)
self.failure_reason = n.group(1)
elif re.search("ABORTED", log):
self.failure_reason = "Aborted"
elif m is not None:
self.failure_reason = m.group(1)
self.logger.debug("END JenkinsBuild.fetch_why_project_failed => " + self.failure_reason)
elif re.search("Failure", log):
self.failure_reason = "Unknown - look @ job"
objects/Helper.py
import os
import re
import time
import logging
class Helper:
def __init__(self, logger):
self = self
self.logger = logger
#def get_subdir_list(self, branddir):
# #print "get_subdir_list(self, " + branddir + ")"
# subdirs = []
# for adir in os.listdir(branddir):
# if re.search(r"^\.", adir):
# pass
# else:
# if os.path.isdir(os.path.join(branddir,adir)):
# subdirs.append(adir)
# subdirs.append(".")
# return subdirs
def get_timestamp(self):
return time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
def dir_get_file_list(self, parentdir):
self.logger.debug("START Helper.dir_get_file_list(self, " + parentdir + ")")
files = []
for f in os.listdir(parentdir):
self.logger.debug("f = " + str(f))
if os.path.isfile(os.path.join(parentdir, f)):
self.logger.debug("files.append(" + str(f) + ")")
files.append(f)
self.logger.debug("END Helper.dir_get_file_list(self, " + parentdir + ")")
return files
def dir_get_subdir_list(self, branddir):
self.logger.debug("START get_subdir_list(self, " + branddir + ")")
subdirs = []
for adir in os.listdir(branddir):
self.logger.debug("adir = " + str(adir))
if re.search(r"^\.", adir):
pass
else:
if os.path.isdir(os.path.join(branddir,adir)):
self.logger.debug("subdirs.append(" + str(adir) + ")")
subdirs.append(adir)
else:
self.logger.debug(str(adir) + " is not a dir")
self.logger.debug("END get_subdir_list(self, " + branddir + ")")
return subdirs
def milliseconds_to_human_readable(self, millis):
millis = int(millis)
seconds_total = millis / 1000
seconds = seconds_total % 60
minutes_total = seconds_total / 60
minutes = minutes_total % 60
hours = minutes_total / 60
return (hours, minutes, seconds)
objects/Brand.py
class Brand:
id = None
name = None
def __init__(self, id, name):
self = self
self.id = id
self.name = name
objects/Subversion.py
import pysvn
import re
import sys
import time
import datetime
import logging
class Subversion:
user = ""
pwd = ""
url = ""
checkout_path = None
url = None
client = None
# status_list will be a collection of status_list[${file_path}] = ${is_versioned}
status_list = {}
logging = None
def __init__(self, logging):
self.user = "brandcreation"
self.pwd = "newbrand"
self.checkout_path = ""
self.url = ""
self.client = pysvn.Client()
self.logging = logging
try:
self.client.callback_get_login = self.get_svn_login
except Exception, e:
sys.exit("Error: " + str(e))
'''
get_svn_login is required for pysvn to set credentials
'''
def get_svn_login(self, realm, username, may_save):
return True, "brandcreation", "newbrand", False
#return True, "cmuser@freerangeinc.com", "D3pl0y", False
'''
get_log_message is required for pysvn to for certain activities (such as copy)
'''
def get_log_message(self):
#return True, "brandadmin created activity " + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
return True, "FB-9390 - brandadmin created activity " + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
'''
Get the svn status of self.checkout_path
Used to determine a list of modified / new files which need to be committed.
Return size of list loaded.
'''
def load_status(self):
self.status_list = self.client.status(self.checkout_path)
def test_credentials(self):
return 0
'''
Get a list of only modified files.
REQUIRE: load_status must be called first.
'''
def get_changedfile_list(self):
if len(self.status_list) is 0:
return 0
else:
toreturn = {}
for stat in self.status_list:
if(str(stat.text_status) == "modified"):
toreturn.append(stat)
return toreturn
'''
Get a list of only new files.
REQUIRE: load_status must be called first.
'''
#def get_newfile_list(self):
# if len(self.status_list) is 0:
# return 0
# else:
# toreturn = {}
# for change in self.status_list.keys():
# if self.status_list[change] is "unversioned":
# toreturn[change] = self.status_list[change]
# return toreturn
'''
Add new files to SVN
REQUIRE: load_status must be called first.
'''
#def commit_new_and_modded_files(self, message):
# if len(self.status_list) is 0:
# return 0
# else:
# for change in self.status_list.keys():
# if self.status_list[change] is 0:
# # SVN ADD
# self.client.add(change)
# result = self.client.checkin(self.checkout_path, message)
# return(result)
def get_subdir_list(self):
subdirlist = self.client.list(self.url)
toreturn = []
for d in subdirlist:
bn = re.search(r".*/([\w\-]*)$", d[0]["path"])
if bn is not None:
toreturn.append(bn.group(1))
return toreturn
'''
Add new files to svn
REQUIRE: load_status must be called first.
'''
def add_new_files(self):
self.logging.debug("Subversion.add_new_files(self)")
self.logging.debug("status_list size = " + str(len(self.status_list)))
count = 0
if len(self.status_list) is 0:
return 0
else:
for stat in self.status_list:
if(str(stat.text_status) == "unversioned"):
self.client.add(stat.path)
count = count + 1
self.logging.debug("END Subversion.add_new_files(self) => " + str(count))
return count
def commit(self, message):
log_message = message
self.callback_get_log_message = self.get_log_message
self.client.checkin(self.checkout_path, message)
def checkout(self, url, path):
self.logging.info("checkout(self, " + url + ", " + path + ")")
try:
self.callback_get_login = self.get_svn_login
results = self.client.checkout(url, path)
return results
except pysvn.ClientError, e:
self.logging.exception("ERROR CHECKING OUT " + url + " -> " + str(e))
sys.exit("ERROR CHECKING OUT " + url + " -> " + str(e))
def copy(self, source, dest, message):
try:
log_message = "SVN copy " + source + " to " + dest
self.client.callback_get_log_message = self.get_log_message
self.client.copy(source, dest)
except Exception, e:
sys.exit("ERROR in Subversion.copy: " + str(e))
def remove(self, filepath):
self.logging.debug("Subversion.remove(self, " + filepath + ")")
try:
self.client.callback_get_log_message = self.get_log_message
self.client.remove(filepath)
except Exception, e:
self.logging.exception("ERROR in Subversion.remove of " + filepath + " => " + str(e))
sys.exit("ERROR in Subversion.remove of " + filepath + " => " + str(e))
def get_change_log(self, from_date):
self.logging.debug("START get_change_log(" + str(from_date) + ")")
rev = pysvn.Revision(pysvn.opt_revision_kind.date, from_date)
change_list = {}
try:
#change_list = self.client.log(self.url, revision_start=rev)
change_list = self.client.log(self.url, revision_end=rev, discover_changed_paths=True)
except Exception, e:
sys.exit("ERROR in Subversion.log: " + str(e))
self.logging.debug("END get_change_log(" + str(from_date) + ") => " + str(len(change_list)))
return change_list