Skip to content

Commit

Permalink
Add support for creating and removing files
Browse files Browse the repository at this point in the history
  • Loading branch information
mingwandroid committed Jun 7, 2016
1 parent 240a39d commit d5b90be
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 42 deletions.
122 changes: 81 additions & 41 deletions patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import copy
import logging
import re
import tempfile

# cStringIO doesn't support unicode in 2.5
try:
Expand Down Expand Up @@ -477,11 +478,26 @@ def lineno(self):
# XXX header += srcname
# double source filename line is encountered
# attempt to restart from this second line
re_filename = b"^--- ([^\t]+)"
match = re.match(re_filename, line)

# Files dated at Unix epoch don't exist, e.g.:
# '1970-01-01 01:00:00.000000000 +0100'
# They include timezone offsets.
# .. which can be parsed (if we remove the nanoseconds)
# .. by strptime() with:
# '%Y-%m-%d %H:%M:%S %z'
# .. but unfortunately this relies on the OSes libc
# strptime function and %z support is patchy, so we drop
# everything from the . onwards and group the year and time
# separately.
re_filename_date_time = b"^--- ([^\t]+)(?:\s([0-9-]+)\s([0-9:]+)|.*)"
match = re.match(re_filename_date_time, line)
# todo: support spaces in filenames
if match:
srcname = match.group(1).strip()
date = match.group(2)
time = match.group(3)
if (date == b'1970-01-01' or date == b'1969-12-31') and time.split(b':',1)[1] == b'00:00':
srcname = b'/dev/null'
else:
warning("skipping invalid filename at line %d" % (lineno+1))
self.errors += 1
Expand Down Expand Up @@ -516,8 +532,8 @@ def lineno(self):
filenames = False
headscan = True
else:
re_filename = b"^\+\+\+ ([^\t]+)"
match = re.match(re_filename, line)
re_filename_date_time = b"^\+\+\+ ([^\t]+)(?:\s([0-9-]+)\s([0-9:]+)|.*)"
match = re.match(re_filename_date_time, line)
if not match:
warning("skipping invalid patch - no target filename at line %d" % (lineno+1))
self.errors += 1
Expand All @@ -526,12 +542,18 @@ def lineno(self):
filenames = False
headscan = True
else:
tgtname = match.group(1).strip()
date = match.group(2)
time = match.group(3)
if (date == b'1970-01-01' or date == b'1969-12-31') and time.split(b':',1)[1] == b'00:00':
tgtname = b'/dev/null'
if p: # for the first run p is None
self.items.append(p)
p = Patch()
p.source = srcname
srcname = None
p.target = match.group(1).strip()
p.target = tgtname
tgtname = None
p.header = header
header = []
# switch to hunkhead state
Expand Down Expand Up @@ -654,7 +676,7 @@ def _detect_type(self, p):
break
if p.header[idx].startswith(b'diff --git a/'):
if (idx+1 < len(p.header)
and re.match(b'index \\w{7}..\\w{7} \\d{6}', p.header[idx+1])):
and re.match(b'(?:index \\w{7}..\\w{7} \\d{6}|new file mode \\d*)', p.header[idx+1])):
if DVCS:
return GIT

Expand Down Expand Up @@ -729,16 +751,17 @@ def _normalize_filenames(self):
while p.target.startswith(b".." + sep):
p.target = p.target.partition(sep)[2]
# absolute paths are not allowed
if xisabs(p.source) or xisabs(p.target):
if (xisabs(p.source) and p.source != b'/dev/null') or \
(xisabs(p.target) and p.target != b'/dev/null'):
warning("error: absolute paths are not allowed - file no.%d" % (i+1))
self.warnings += 1
if xisabs(p.source):
if xisabs(p.source) and p.source != b'/dev/null':
warning("stripping absolute path from source name '%s'" % p.source)
p.source = xstrip(p.source)
if xisabs(p.target):
if xisabs(p.target) and p.target != b'/dev/null':
warning("stripping absolute path from target name '%s'" % p.target)
p.target = xstrip(p.target)

self.items[i].source = p.source
self.items[i].target = p.target

Expand Down Expand Up @@ -800,12 +823,24 @@ def diffstat(self):
return output


def findfile(self, old, new):
""" return name of file to be patched or None """
if exists(old):
return old
def findfiles(self, old, new):
""" return tuple of source file, target file """
if old == b'/dev/null':
handle, abspath = tempfile.mkstemp(suffix='pypatch')
abspath = abspath.encode()
# The source file must contain a line for the hunk matching to succeed.
os.write(handle, b' ')
os.close(handle)
if not exists(new):
handle = open(new, 'wb')
handle.close()
return abspath, new
elif exists(old):
return old, old
elif exists(new):
return new
return new, new
elif new == b'/dev/null':
return None, None
else:
# [w] Google Code generates broken patches with its online editor
debug("broken patch from Google Code, stripping prefixes..")
Expand All @@ -814,10 +849,10 @@ def findfile(self, old, new):
debug(" %s" % old)
debug(" %s" % new)
if exists(old):
return old
return old, old
elif exists(new):
return new
return None
return new, new
return None, None


def apply(self, strip=0, root=None):
Expand Down Expand Up @@ -848,27 +883,27 @@ def apply(self, strip=0, root=None):
debug("stripping %s leading component(s) from:" % strip)
debug(" %s" % p.source)
debug(" %s" % p.target)
old = pathstrip(p.source, strip)
new = pathstrip(p.target, strip)
old = p.source if p.source == b'/dev/null' else pathstrip(p.source, strip)
new = p.target if p.target == b'/dev/null' else pathstrip(p.target, strip)
else:
old, new = p.source, p.target

filename = self.findfile(old, new)
filenameo, filenamen = self.findfiles(old, new)

if not filename:
if not filenameo or not filenamen:
warning("source/target file does not exist:\n --- %s\n +++ %s" % (old, new))
errors += 1
continue
if not isfile(filename):
warning("not a file - %s" % filename)
if not isfile(filenameo):
warning("not a file - %s" % filenameo)
errors += 1
continue

# [ ] check absolute paths security here
debug("processing %d/%d:\t %s" % (i+1, total, filename))
debug("processing %d/%d:\t %s" % (i+1, total, filenamen))

# validate before patching
f2fp = open(filename, 'rb')
f2fp = open(filenameo, 'rb')
hunkno = 0
hunk = p.hunks[hunkno]
hunkfind = []
Expand All @@ -891,7 +926,7 @@ def apply(self, strip=0, root=None):
if line.rstrip(b"\r\n") == hunkfind[hunklineno]:
hunklineno+=1
else:
info("file %d/%d:\t %s" % (i+1, total, filename))
info("file %d/%d:\t %s" % (i+1, total, filenamen))
info(" hunk no.%d doesn't match source file at line %d" % (hunkno+1, lineno+1))
info(" expected: %s" % hunkfind[hunklineno])
info(" actual : %s" % line.rstrip(b"\r\n"))
Expand All @@ -911,8 +946,8 @@ def apply(self, strip=0, root=None):
break

# check if processed line is the last line
if lineno+1 == hunk.startsrc+len(hunkfind)-1:
debug(" hunk no.%d for file %s -- is ready to be patched" % (hunkno+1, filename))
if len(hunkfind) == 0 or lineno+1 == hunk.startsrc+len(hunkfind)-1:
debug(" hunk no.%d for file %s -- is ready to be patched" % (hunkno+1, filenamen))
hunkno+=1
validhunks+=1
if hunkno < len(p.hunks):
Expand All @@ -924,34 +959,39 @@ def apply(self, strip=0, root=None):
break
else:
if hunkno < len(p.hunks):
warning("premature end of source file %s at hunk %d" % (filename, hunkno+1))
warning("premature end of source file %s at hunk %d" % (filenameo, hunkno+1))
errors += 1

f2fp.close()

if validhunks < len(p.hunks):
if self._match_file_hunks(filename, p.hunks):
warning("already patched %s" % filename)
if self._match_file_hunks(filenameo, p.hunks):
warning("already patched %s" % filenameo)
else:
warning("source file is different - %s" % filename)
warning("source file is different - %s" % filenameo)
errors += 1
if canpatch:
backupname = filename+b".orig"
backupname = filenamen+b".orig"
if exists(backupname):
warning("can't backup original file to %s - aborting" % backupname)
else:
import shutil
shutil.move(filename, backupname)
if self.write_hunks(backupname, filename, p.hunks):
info("successfully patched %d/%d:\t %s" % (i+1, total, filename))
shutil.move(filenamen, backupname)
if self.write_hunks(backupname if filenameo == filenamen else filenameo, filenamen, p.hunks):
info("successfully patched %d/%d:\t %s" % (i+1, total, filenamen))
os.unlink(backupname)
if new == b'/dev/null':
# check that filename is of size 0 and delete it.
if os.path.getsize(filenamen) > 0:
warning("expected patched file to be empty as it's marked as deletion:\t %s" % filenamen)
os.unlink(filenamen)
else:
errors += 1
warning("error patching file %s" % filename)
shutil.copy(filename, filename+".invalid")
warning("invalid version is saved to %s" % filename+".invalid")
warning("error patching file %s" % filenamen)
shutil.copy(filenamen, filenamen+".invalid")
warning("invalid version is saved to %s" % filenamen+".invalid")
# todo: proper rejects
shutil.move(backupname, filename)
shutil.move(backupname, filenamen)

if root:
os.chdir(prevdir)
Expand Down
5 changes: 5 additions & 0 deletions tests/08create/08create.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
diff -urN from/created to/created
--- created 1970-01-01 01:00:00.000000000 +0100
+++ created 2016-06-07 00:43:08.701304500 +0100
@@ -0,0 +1 @@
+Created by patch
1 change: 1 addition & 0 deletions tests/08create/[result]/created
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Created by patch
5 changes: 5 additions & 0 deletions tests/09delete/09delete.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
diff -urN from/deleted to/deleted
--- deleted 2016-06-07 00:44:19.093323300 +0100
+++ deleted 1970-01-01 01:00:00.000000000 +0100
@@ -1 +0,0 @@
-Deleted by patch
1 change: 1 addition & 0 deletions tests/09delete/[result]/.gitkeep
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions tests/09delete/deleted
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deleted by patch
16 changes: 15 additions & 1 deletion tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def _run_test(self, testname):
# recursive comparison
self._assert_dirs_equal(join(basepath, "[result]"),
tmpdir,
ignore=["%s.patch" % testname, ".svn", "[result]"])
ignore=["%s.patch" % testname, ".svn", ".gitkeep", "[result]"])


shutil.rmtree(tmpdir)
Expand Down Expand Up @@ -410,6 +410,20 @@ def test_apply_strip(self):
p.target = b'nasty/prefix/' + p.target
self.assertTrue(pto.apply(strip=2, root=treeroot))

def test_create_file(self):
treeroot = join(self.tmpdir, 'rootparent')
os.makedirs(treeroot)
pto = patch.fromfile(join(TESTS, '08create/08create.patch'))
pto.apply(strip=0, root=treeroot)
self.assertTrue(os.path.exists(os.path.join(treeroot, 'created')))

def test_delete_file(self):
treeroot = join(self.tmpdir, 'rootparent')
shutil.copytree(join(TESTS, '09delete'), treeroot)
pto = patch.fromfile(join(TESTS, '09delete/09delete.patch'))
pto.apply(strip=0, root=treeroot)
self.assertFalse(os.path.exists(os.path.join(treeroot, 'deleted')))


class TestHelpers(unittest.TestCase):
# unittest setting
Expand Down

0 comments on commit d5b90be

Please sign in to comment.