-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpatcher.py
executable file
·195 lines (162 loc) · 6.76 KB
/
patcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
# vim: ts=2 sts=2 et sw=2
import sys
import json
import shutil
import argparse
import tempfile
import subprocess
import collections
import dataclasses
from pathlib import Path
def log(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr, flush=True)
patch2pr_path = shutil.which('patch2pr')
if not patch2pr_path:
gopath = subprocess.check_output(['go', 'env', 'GOPATH'], text=True).strip()
patch2pr_path = gopath + '/bin/patch2pr'
patch2pr = Path(patch2pr_path)
assert patch2pr.exists(), "patch2pr not found!"
EXPECTED_NIX_FLAKE_LOCK_VERSION = 7
@dataclasses.dataclass
class Repo:
basename: str = '' # flake input name
upstream: str = '' # owner/repo
revision: str = '' # revision hash of patch base
fork: str = '' # owner/repo
branch: str = '' # branch to use for patch
patches: list[tuple[int, str, str]] = dataclasses.field(default_factory=list)
final: str = ''
# apply the given patch to the given fork repo and
# return the new commit hash
def apply_patch(fork: str, base: str, branch: str, patch: str, dry: bool):
if dry:
log('... dry run, skipping patch apply')
split = base.split('_') + ['0']
commit, n = split[:2]
return commit + '_' + str(int(n)+1)
out = subprocess.check_output([
patch2pr,
'-repository', fork,
'-patch-base', base,
'-head-branch', branch,
'-no-pull-request',
'-force',
'-json', patch
])
out = json.loads(out)
return out['commit']
def main(argv):
argp = argparse.ArgumentParser(description='nix-patcher is a tool for patching Nix flake inputs, semi-automatically.')
argp.add_argument('--flake', '-f', default='.', type=str,
help='flake reference')
argp.add_argument('--upstream-suffix', default='-upstream', type=str,
help='suffix for upstream repositories (default: -upstream)')
argp.add_argument('--patched-suffix', default='', type=str,
help='suffix for patched forks (default: \'\')')
argp.add_argument('--patch-suffix', default='-patch-', type=str,
help='suffix for patch files (default: -patch-)')
argp.add_argument('--update', action='store_true',
help='if set, will call `nix flake update _` on the newly patched inputs')
argp.add_argument('--commit', action='store_true',
help='like --update but also adds --commit-lock-file')
argp.add_argument('--dry-run', action='store_true',
help='log steps but do not perform any changes')
argp.add_argument('--verbose', action='store_true',
help='print additional state information')
args = argp.parse_args(argv[1:])
args.update |= args.commit
if args.upstream_suffix == args.patched_suffix:
argp.error('upstream and patched suffixes must not be identical.')
if not args.patch_suffix:
argp.error('patch suffix must not be empty')
flakemeta = subprocess.check_output(
['nix', 'flake', 'metadata', '--json', args.flake],
text=True).strip()
flakemeta = json.loads(flakemeta)
locks = flakemeta['locks']
lockver = locks.get('version')
if lockver != EXPECTED_NIX_FLAKE_LOCK_VERSION:
log(f'WARNING: unsupported/untested flake.lock metadata version'
f' (found {lockver}, but {EXPECTED_NIX_FLAKE_LOCK_VERSION} expected)')
flakepath = flakemeta["resolvedUrl"]
log(f'{flakemeta["resolvedUrl"]=}')
repos = collections.defaultdict(Repo)
# restrict using locks.root.inputs to only
# handle inputs occuring in /this/ flake, and
# not transitive inputs
rootkey = locks['root']
for k in locks['nodes'][rootkey]['inputs'].keys():
v = locks['nodes'].get(k, {})
original = v.get('original')
locked = v.get('locked')
# log(locked)
if not locked: continue # "root" object
# if both suffixes match, select the longer one
is_patched = k.endswith(args.patched_suffix)
is_upstream = k.endswith(args.upstream_suffix)
is_patch = None
if is_patched and is_upstream:
is_patched = len(args.patched_suffix) > len(args.upstream_suffix)
is_upstream = len(args.upstream_suffix) > len(args.patched_suffix)
assert is_patched != is_upstream, "logic error, equal suffixes??"
if (split := k.rsplit(args.patch_suffix, 1)) and len(split) == 2:
name, num = split
try: num = int(num)
except ValueError: num = None
if num is not None:
is_patch = (name, num)
log(f'parsing {k=}', f'{is_upstream=}', f'{is_patched=}', f'{is_patch=}', sep=', ')
if is_patch:
name, num = is_patch
# via lv.cha on discord
path = subprocess.check_output(
['nix', 'eval', '--impure', '--expr',
f'(builtins.getFlake "{flakepath}").inputs.{k}.outPath'])
path = json.loads(path)
patch = (num, locked['url'], path)
repos[name].patches.append(patch)
elif is_patched:
name = k[:len(k)-len(args.patched_suffix)]
if not original.get('ref'):
log(name, ': ignoring input despite patched-suffix match, due to no branch specified')
continue
repos[name].fork = f"{original['owner']}/{original['repo']}"
repos[name].branch = original['ref']
elif is_upstream:
name = k[:len(k)-len(args.upstream_suffix)]
repos[name].basename = name
repos[name].upstream = f"{locked['owner']}/{locked['repo']}"
repos[name].revision = locked['rev']
if args.verbose:
asdict = {k: dataclasses.asdict(v) for k,v in repos.items()}
json.dump(asdict, sys.stderr, indent=2)
print(file=sys.stderr)
log(f'{patch2pr=}')
for repo in repos.values():
if not (repo.basename and repo.fork):
log(repo.basename, ': skipping input (missing upstream or fork):', repo)
continue
log(repo.basename, ': patching. fork is', f'{repo.fork=}', f'{repo.branch=}')
repo.patches.sort(key=lambda x: x[0])
rev = repo.revision
for i, url, p in repo.patches:
log(repo.basename, f': applying patch {i}: {p!s}')
rev = apply_patch(repo.fork, rev, repo.branch, p, args.dry_run)
log(repo.basename, ': ->', rev)
repo.final = rev
log(repo.basename, ': final commit:', rev)
log(repo.basename, ':', f'https://github.com/{repo.fork}/tree/{repo.branch}')
log(repo.basename, ':', f'https://github.com/{repo.fork}/commit/{rev}')
if args.update:
def patched(repo): return repo.basename + args.patched_suffix
update_cmd = [[patched(repo), '--override-input', patched(repo), f'github:{repo.fork}/{repo.final}'] for repo in repos.values() if repo.final]
update_cmd = [x for y in update_cmd for x in y]
if update_cmd:
if args.commit:
update_cmd = ['--commit-lock-file'] + update_cmd
log(f'{update_cmd=}')
if not args.dry_run:
subprocess.check_call(['nix', 'flake', 'update', '--flake', flakepath] + update_cmd)
if __name__ == '__main__':
sys.exit(main(sys.argv))