-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmyblt.py
275 lines (204 loc) · 7.29 KB
/
myblt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import os
import hashlib
import binascii
import string
import random
import uuid
import sys
from flask import Flask, request, send_from_directory, jsonify, redirect
from database import db_session, init_db, init_engine
from wand.image import Image
from wand.display import display
from models import Upload, User
app = Flask(__name__)
def hash_exists(hash):
return Upload.query.filter(Upload.hash == hash).count() != 0
def short_url_exists(url):
if not url:
return True
return Upload.query.filter(Upload.short_url == url).count() != 0
def get_random_short_url():
"""Generates a random string of 7 ascii letters and digits
Can provide in the order or 10^12 unique strings
"""
pool = string.ascii_letters + string.digits
return ''.join(random.choice(pool) for _ in range(7))
def get_new_short_url():
"""Generate random urls until a new one is generated"""
url = None
while short_url_exists(url):
url = get_random_short_url()
return url
def new_user(username, password):
# TODO generate new salt with every user
salt = "salty"
hashpass = get_hash(password, salt)
user = User(username, hashpass, salt)
db_session.add(user)
db_session.commit()
return user
def get_extension(filename):
last_dot_pos = filename.rfind('.')
if last_dot_pos < 1:
return None
ext = filename[last_dot_pos + 1:]
double_dot_pos = filename.rfind('.', 0, last_dot_pos - 1)
if double_dot_pos == -1:
return ext
else:
double_ext = filename[double_dot_pos + 1:last_dot_pos]
if double_ext in app.config['DOUBLE_EXTS']:
return double_ext + '.' + ext
else:
return ext
def create_thumbnail(abs_orig, abs_thumbnail):
if not os.path.isdir(os.path.dirname(abs_thumbnail)):
os.makedirs(os.path.dirname(abs_thumbnail))
with Image(filename=abs_orig) as img:
with img.clone() as i:
height = 200
if img.height > height:
ratio = img.height / height
width = int(img.width / ratio)
i.resize(width, height)
i.save(filename=abs_thumbnail)
def new_upload(file, file_hash_bin, public):
file_hash_str = str(binascii.hexlify(file_hash_bin).decode('utf8'))
abs_orig = os.path.join(app.config['UPLOAD_FOLDER'], file_hash_str)
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
file.stream.seek(0)
file.save(abs_orig)
abs_thumbnail = os.path.join(app.config['UPLOAD_FOLDER'], 'thumbnail',
file_hash_str)
thumbnail_url = create_thumbnail(abs_orig, abs_thumbnail)
# Generate a short id and append extension
short_id = get_new_short_url()
extension = get_extension(file.filename)
if extension:
full_id = short_id + '.' + extension
else:
full_id = short_id
# Add upload in DB
upload = Upload(file_hash_bin, full_id, thumbnail_url, file.mimetype, True)
db_session.add(upload)
db_session.commit()
return upload
def get_hash(password, salt):
m = hashlib.sha512()
m.update(salt.encode('utf8'))
m.update(password.encode('utf8'))
return m.digest()
def get_auth_error(semi=False):
# If app is not configured for private usage, ignore check
if semi and not app.config['IS_PRIVATE']:
return
token = request.cookies.get('token')
if not token or not User.query.filter(User.token == token).first():
return jsonify({'error': 'Unauthorized'}), 403
@app.route('/private', methods=['GET'])
def is_app_private():
return jsonify({'private': app.config['IS_PRIVATE']})
@app.route('/upload', methods=['POST'])
def upload_file():
file = request.files['file']
if not file:
return jsonify({'error': 'Bad request'}), 400
err = get_auth_error(True)
if err:
return err
# Get sha1 of uploaded file
m = hashlib.sha1()
m.update(file.read())
file_hash_bin = m.digest()
if hash_exists(file_hash_bin):
# Get old (identical) file's short_url from the hash
upload = Upload.query.filter(Upload.hash == file_hash_bin).first()
else:
upload = new_upload(file, file_hash_bin, True)
if not upload:
return jsonify({'error': 'An unknown error occurred'}), 500
return jsonify(short_url=app.config['API_URL'] + upload.short_url)
@app.route('/<hash>/thumbnail', methods=['GET'])
def get_thumbnail_from_hash(hash):
return send_from_directory(
app.config['UPLOAD_FOLDER'], 'thumbnail',
hash_str, mimetype=mimetype, as_attachment=False)
@app.route('/<short_url>', methods=['GET'])
def get_upload(short_url):
upload = Upload.query.filter(Upload.short_url == short_url).first()
if not upload:
return '404 not found'
hash_str = str(binascii.hexlify(upload.hash).decode('utf8'))
mimetype = upload.mime_type
if upload.blocked:
return redirect("/#/blocked", code=301)
else:
return send_from_directory(
app.config['UPLOAD_FOLDER'],
hash_str, mimetype=mimetype, as_attachment=False)
@app.route('/uploads', methods=['GET'])
def get_uploads():
err = get_auth_error()
if err:
return err
uploads = Upload.query.all()
objects = []
for upload in uploads:
o = {
"short_url": upload.short_url,
"blocked": upload.blocked
}
if o.thumbnail_url:
o['thumbnail_url'] = o.thumbnail_url
objects.append(o)
return jsonify(uploads=objects)
@app.route('/uploads/public', methods=['GET'])
def get_public_uploads():
uploads = Upload.query.filter(Upload.public)
objects = []
for upload in uploads:
objects.append({
"short_url": upload.short_url,
"blocked": upload.blocked
})
return jsonify(uploads=objects)
@app.route('/block/<short_url>', methods=['GET'])
def block_upload(short_url):
err = get_auth_error()
if err:
return err
upload = Upload.query.filter(Upload.short_url == short_url).first()
upload.blocked = not upload.blocked
db_session.commit()
return redirect("#/admin", code=302)
@app.route('/login', methods=['POST'])
def login():
req = request.get_json()
if 'username' not in req or 'password' not in req:
return jsonify({'error': 'Bad request'}), 400
username = req['username']
password = req['password']
user = User.query.filter(User.username == username).first()
if user and get_hash(password, user.salt) == user.password:
token = uuid.uuid4().hex
user.token = token
db_session.query(User).filter_by(id=user.id) \
.update({"token": user.token})
db_session.commit()
resp = jsonify({'success': True})
resp.set_cookie('token', token)
return resp
return jsonify({'error': 'Bad login'}), 401
@app.teardown_appcontext
def shutdown_session(exception=None):
db_session.remove()
if __name__ == "__main__":
app.config.from_pyfile('config/default_config.py')
if len(sys.argv) == 2:
conf = sys.argv[1]
print('Loading additional config %s...', conf)
app.config.from_pyfile('config/' + conf + '_config.py')
init_engine(app.config['DATABASE_URI'])
init_db()
app.run()