Python script – Jenkins job copy

This is a script which uses the XML parsing tool ElementTree to copy one Jenkins job to another & replaces the SVN location to be the latest created tag.

import urllib
import urllib2
from urllib2 import URLError
import logging
import re
import os
import base64
import sys
import time
import pysvn
import xml.etree.ElementTree
from xml.etree.ElementTree import ElementTree, Element, SubElement

# ElementTree XML tutorial -> http://www.bigfatalien.com/?p=223

"""
get_svn_login is required for pysvn to set credentials
"""
def get_svn_login(realm, username, may_save):
   return True, "svninfoforbuild", "svninfoforbuild", False
   
"""
query SVN for a list of tags. Look through that list for tags matching regex. 
figure out which of these was created last. return this tag.
"""
def svn_get_latest_tag(device, client):
   logging.debug( "svn_get_latest_tag(" + device + ", Client)")
   reponame = svn_url + "/" + svn_repo_list[device] + "/tags"
   logging.debug( "reponame = " + reponame)
   taglist = client.list(reponame)
   logging.debug("taglist = " + str(taglist))
   maxtime = 0
   maxtag = None
   tagname = None
   for tag in taglist:
      #for x in tag[0]:
      #   logging.debug( str(x) + " = " + str(tag[0][x]))
      m = re.search(r"/(" + device + "-d+.d+.d+.d+)$", tag[0]["path"])
      if m is not None:
         if tag[0]["time"] > maxtime:
            tagname = m.group(1)
            maxtime = tag[0]["time"]
            maxtag = tag[0]["path"]
   logging.debug( "maxtime = " + str(maxtime))
   logging.debug( "maxtag = " + str(maxtag))
   logging.debug( "tagname = " + str(tagname))
   return maxtime, maxtag, tagname

"""
query SVN for a list of branchs. Look through that list for branchs matching regex. 
figure out which of these was created last. return this branch.
"""
def svn_get_latest_branch(device, client):
   logging.debug( "svn_get_latest_tag(" + device + ", Client)")
   reponame = svn_url + "/" + svn_repo_list[device] + "/branches"
   logging.debug( "reponame = " + reponame)
   branchlist = client.list(reponame)
   maxtime = 0
   maxtag = None
   branchname = None
   for branch in branchlist:
      logging.debug( "------------------------")
      for x in branch[0]:
         logging.debug( str(x) + " = " + str(branch[0][x]))
      logging.debug( str(branch[0]["time"]) + " > " + str(maxtime))
      m = re.search(r"/(" + device + "-d+.d+.d+.[dw]+)$", branch[0]["path"])
      if m is not None:
         logging.debug( "m is not None")
         if branch[0]["time"] > maxtime:
            branchname = m.group(1)
            maxtime = branch[0]["time"]
            maxbranch = branch[0]["path"]
            logging.debug( "********* branchname == " + branchname)
            logging.debug( "------ maxtime == " + str(maxtime))
      logging.debug( "------------------------")
      
   logging.debug( "maxtime = " + str(maxtime))
   logging.debug( "maxbranch = " + str(maxbranch))
   logging.debug( "branchname = " + str(branchname))
   return maxtime, maxbranch, branchname

def jenkins_get_project_xml(device, timestr):
   jobxmlurl = jenkins_url + "/job/" + jenkins_project_list[device] + "/config.xml"
   req = urllib2.Request(jobxmlurl)
   base64string = base64.encodestring("%s:%s" % (jenkins_id, jenkins_pwd))[:-1]
   req.add_header("Authorization", "Basic %s" % base64string)
   response = urllib2.urlopen(req)
   jobxml = response.read()
   f = open("./" + device + "_config_" + timestr + ".xml", "w")
   f.write(jobxml)
   f.close()
   return "./" + device + "_config_" + timestr + ".xml"
   #logging.debug( jobxmlurl)

def jenkins_push_project_xml(new_proj_name, source_proj, new_proj_xml):
   # READ XML INTO MEMORY
   xml_contents = ""
   file = open(new_proj_xml, "r")
   for line in file:
      xml_contents += line
   file.close()
   
   # 1) CREATE A NEW JOB IN JENKINS
   job_create_url = jenkins_url + "/createItem?name=" + new_proj_name
   logging.debug( job_create_url)
   req = urllib2.Request(job_create_url, data=xml_contents, headers={"Content-Type":"text/xml"})
   base64string = base64.encodestring("%s:%s" % (jenkins_id, jenkins_pwd))[:-1]
   req.add_header("Authorization", "Basic %s" % base64string)
   response = urllib2.urlopen(req)
   create_results = response.read()
   
def jenkins_project_exists(pname):
   logging.debug("jenkins_project_exists(" + pname + ")")
   jobxmlurl = jenkins_url + "/job/" + pname
   logging.debug( "jobxmlurl = " + jobxmlurl)
   req = urllib2.Request(jobxmlurl)
   base64string = base64.encodestring("%s:%s" % (jenkins_id, jenkins_pwd))[:-1]
   req.add_header("Authorization", "Basic %s" % base64string)
   try:
      response = urllib2.urlopen(req)
   except URLError, e:
      return False
   return True
   
def svn_get_trunk_url(device):
   return svn_url + "/" + svn_repo_list[device] + "/trunk"

def process_xml(device, tagname, jobxml, svn_trunk_url, svn_maxtag_url, timestr):
   
   # Element - XML elements containing:
   #           the label (tag), 
   #           a list of attributes
   #           a list of chilren elements (forming the XML tree hierarchy)
   # Elementtree - The wrappers around Element objects which provide facilities to output the Element as an xml file
   #               You can also read an xml file into an ElementTree then access teh Elements within.
   
   # CREATE AN ELEMENT TREE OBJECT
   et = ElementTree()
   # PARSE THE XML FILE.
   project_elem = et.parse(jobxml)
   
   
   logging.debug( "--------------------------------------------")
   # REPLACE THE SCM LOCATION POINTING TO TRUNK WITH ONE POINTING TO THE NEW TAG
   # Structure = 
   #
   #  ...
   #  projectname
   #  ...
   #  
   #     
   #        
   #           ...
   #           ...
   #        
   #        ...
   #     
   #     ...
   #  
   #  ...
   #
   # FIND THE FIRST description ELEMENT 
   de = project_elem.find("description")
   de.text = device + " TAG " + tagname
   scme = project_elem.find("scm")
   loce = scme.find("locations")
   location_children = list(loce)
   logging.debug( "svn_trunk_url = " + svn_trunk_url)
   for l in location_children:
      remote = l.find("remote")
      local = l.find("local")
      m = re.search(r"^" + svn_trunk_url + "(/src)?$", remote.text) 
      if m is not None:
         newstr = svn_maxtag_url
         if m.group(1) is not None:
            newstr += m.group(1)
         logging.debug( newstr)
         remote.text=newstr
   newxmlname = "./" + device + "_config_new_" + timestr + ".xml" 
   et.write(newxmlname);
   return(newxmlname);
   
   
   
svn_url="https://svn.company.com"
#device_type="ios"
jenkins_url="http://jenkins.corp.company.com:8080"
job_sub_http="job"

svn_repo_list={}
svn_repo_list["android"] = "android"
svn_repo_list["ios"] = "ios"
svn_repo_list["blackberry"] = "blackberry"

jenkins_project_list={}
jenkins_project_list["android"] = "android_trunk"
jenkins_project_list["ios"] = "ios_trunk"
jenkins_project_list["blackberry"] = "blackberry_trunk"

jenkins_id = "tag_creator"
jenkins_pwd = "1qazxsw2"

job_file="config.xml"

logging.basicConfig(level=logging.DEBUG)

timestr = time.strftime("%Y-%m-%d_%H%M%S", time.localtime())

device_type = None
svn_loc = "tags"
for arg in sys.argv:
   args = re.search(r"^--(S+)=(S+)$", arg)
   if args is not None:
      if re.search(r"^device_type$", args.group(1)):
         device_type = args.group(2).lower()
         logging.debug("device_type = " + device_type)
      elif re.search(r"^svn_loc$", args.group(1)):
         svn_loc = args.group(2)
         logging.debug("svn_loc = " + svn_loc)


try:
   repo = svn_repo_list[device_type]
   logging.debug("repo = " + repo)
except KeyError, e:
   sys.exit("ERROR: device_type does not match the required list of options (android, ios, blackberry)")
   
svn_trunk_url=svn_url +  repo + "/trunk"
logging.debug("svn_trunk_url = " + svn_trunk_url)

client = pysvn.Client()
client.callback_get_login = get_svn_login

maxtime = None
svn_maxtag_url = None
reponame = None


if svn_loc == "tags":
   maxtime, svn_maxtag_url, reponame =  svn_get_latest_tag(device_type, client)
elif svn_loc == "branches":
   maxtime, svn_maxtag_url, reponame =  svn_get_latest_branch(device_type, client)
else:
   sys.exit("ERROR: You cannot use svn_loc = " + svn_loc)
   
logging.debug("maxtime = " + str(maxtime))
logging.debug("svn_maxtag_url = " + str(svn_maxtag_url))
logging.debug("reponame = " + str(reponame))


project_exists = jenkins_project_exists(reponame)
logging.debug("project_exists = " + str(project_exists))
if project_exists:
   sys.exit("ERROR: The Jenkins job " + reponame + " already exists.")
   
origxml = jenkins_get_project_xml(device_type, timestr)
newxml = process_xml(device_type, reponame, origxml, svn_trunk_url, svn_maxtag_url, timestr)
jenkins_push_project_xml(reponame, jenkins_project_list[device_type], newxml)

os.remove(origxml)
os.remove(newxml)