Add API and clean code
This commit is contained in:
		
							parent
							
								
									b58f1dd273
								
							
						
					
					
						commit
						33628a56d0
					
				
					 4 changed files with 178 additions and 66 deletions
				
			
		
							
								
								
									
										12
									
								
								config.yml
									
										
									
									
									
								
							
							
						
						
									
										12
									
								
								config.yml
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -4,13 +4,14 @@ admin_username: 'admin'
 | 
			
		|||
 | 
			
		||||
# Plaintext password to make admin API requests
 | 
			
		||||
# Safe to remove if admin_hashed_password is set
 | 
			
		||||
# Default: commented out, 'password'
 | 
			
		||||
#admin_password: 'password'
 | 
			
		||||
# Default: unset
 | 
			
		||||
admin_password: 'password'
 | 
			
		||||
 | 
			
		||||
# Hashed password (bcrypt) to make admin API requests - Preferred over plaintext, use securepass.sh to generate
 | 
			
		||||
# Please note that authentication takes noticeably longer than using plaintext password
 | 
			
		||||
# Don't include the <username>: segment, just the hash
 | 
			
		||||
# Default: '$2y$15$Dhll3IY42R.JNOYazarlG.8IndwMjxmHLpFsebJzcGTJd.gbsAwna' (hash for 'password')
 | 
			
		||||
admin_hashed_password: '$2y$15$Dhll3IY42R.JNOYazarlG.8IndwMjxmHLpFsebJzcGTJd.gbsAwna'
 | 
			
		||||
#admin_hashed_password: '$2y$15$Dhll3IY42R.JNOYazarlG.8IndwMjxmHLpFsebJzcGTJd.gbsAwna'
 | 
			
		||||
 | 
			
		||||
# Filename of the URL database
 | 
			
		||||
# Default: 'urls'
 | 
			
		||||
| 
						 | 
				
			
			@ -31,3 +32,8 @@ random_gen_timeout: 5
 | 
			
		|||
# Name shown on tab while on site and on page header
 | 
			
		||||
# Default: 'liteshort'
 | 
			
		||||
site_name: 'liteshort'
 | 
			
		||||
 | 
			
		||||
# URL shown when finished generating shortlinks. Include the / at the end.
 | 
			
		||||
# If not set, it is automatically taken from the URL the shorten request is sent to.
 | 
			
		||||
# Default: unset
 | 
			
		||||
site_url:
 | 
			
		||||
							
								
								
									
										216
									
								
								liteshort.py
									
										
									
									
									
								
							
							
						
						
									
										216
									
								
								liteshort.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -1,10 +1,13 @@
 | 
			
		|||
from flask import Flask, request, current_app, g, render_template
 | 
			
		||||
from flask import Flask, request, current_app, g, render_template, jsonify
 | 
			
		||||
import bcrypt
 | 
			
		||||
import random
 | 
			
		||||
import sqlite3
 | 
			
		||||
import time
 | 
			
		||||
import urllib
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
app = Flask(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_config():
 | 
			
		||||
    new_config = yaml.load(open('config.yml'))
 | 
			
		||||
| 
						 | 
				
			
			@ -12,11 +15,12 @@ def load_config():
 | 
			
		|||
 | 
			
		||||
    req_options = {'admin_username': 'admin', 'database_name': "urls", 'random_length': 4,
 | 
			
		||||
                   'allowed_chars': 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_',
 | 
			
		||||
                   'random_gen_timeout': 5, 'site_name': 'liteshort'
 | 
			
		||||
                   'random_gen_timeout': 5, 'site_name': 'liteshort', 'site_url': None
 | 
			
		||||
                   }
 | 
			
		||||
 | 
			
		||||
    config_types = {'admin_username': str, 'database_name': str, 'random_length': int,
 | 
			
		||||
                    'allowed_chars': str, 'random_gen_timeout': int, 'site_name': str}
 | 
			
		||||
                    'allowed_chars': str, 'random_gen_timeout': int, 'site_name': str,
 | 
			
		||||
                    'site_url': (str, type(None))}
 | 
			
		||||
 | 
			
		||||
    for option in req_options.keys():
 | 
			
		||||
        if option not in new_config.keys():  # Make sure everything in req_options is set in config
 | 
			
		||||
| 
						 | 
				
			
			@ -24,8 +28,14 @@ def load_config():
 | 
			
		|||
 | 
			
		||||
    for option in new_config.keys():
 | 
			
		||||
        if option in config_types:
 | 
			
		||||
            if not type(new_config[option]) is config_types[option]:
 | 
			
		||||
                raise TypeError(option + " must be type " + config_types[option].__name__)
 | 
			
		||||
            matches = False
 | 
			
		||||
            if type(config_types[option]) is not tuple:
 | 
			
		||||
                config_types[option] = (config_types[option],)  # Automatically creates tuple for non-tuple types
 | 
			
		||||
            for req_type in config_types[option]:  # Iterates through tuple to allow multiple types for config options
 | 
			
		||||
                if type(new_config[option]) is req_type:
 | 
			
		||||
                    matches = True
 | 
			
		||||
            if not matches:
 | 
			
		||||
                raise TypeError(option + " is incorrect type")
 | 
			
		||||
 | 
			
		||||
    if 'admin_hashed_password' in new_config.keys():  # Sets config value to see if bcrypt is required to check password
 | 
			
		||||
        new_config['password_hashed'] = True
 | 
			
		||||
| 
						 | 
				
			
			@ -36,6 +46,28 @@ def load_config():
 | 
			
		|||
    return new_config
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def authenticate(username, password):
 | 
			
		||||
    return username == current_app.config['admin_username'] and check_password(password, current_app.config)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_long_exist(long):
 | 
			
		||||
    query = query_db('SELECT short FROM urls WHERE long = ?', (long,))
 | 
			
		||||
    for i in query:
 | 
			
		||||
        if i and (len(i['short']) <= current_app.config["random_length"]):  # Checks if query if pre-existing URL is same as random length URL
 | 
			
		||||
            return i['short']
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_short_exist(short, long=None):  # Allow to also check against a long link
 | 
			
		||||
    query = query_db('SELECT * FROM urls WHERE short = ?', (short,))
 | 
			
		||||
    for i in query:
 | 
			
		||||
        if i and i['short'] == short and i['long'] == long:
 | 
			
		||||
            return short
 | 
			
		||||
    if query:
 | 
			
		||||
        return True
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_password(password, pass_config):
 | 
			
		||||
    if pass_config['password_hashed']:
 | 
			
		||||
        return bcrypt.checkpw(password.encode('utf-8'), pass_config['admin_hashed_password'].encode('utf-8'))
 | 
			
		||||
| 
						 | 
				
			
			@ -45,24 +77,77 @@ def check_password(password, pass_config):
 | 
			
		|||
        raise RuntimeError('This should never occur! Bailing...')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_short_exist(short):
 | 
			
		||||
    query = query_db('SELECT long FROM urls WHERE short = ?', (short,))
 | 
			
		||||
    if query:
 | 
			
		||||
        return True
 | 
			
		||||
    return False
 | 
			
		||||
def delete_url(deletion):
 | 
			
		||||
    result = query_db('SELECT * FROM urls WHERE short = ?', (deletion,), False, None)  # Return as tuple instead of row
 | 
			
		||||
    get_db().cursor().execute('DELETE FROM urls WHERE short = ?', (deletion,))
 | 
			
		||||
    get_db().commit()
 | 
			
		||||
    return len(result)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_long_exist(long):
 | 
			
		||||
    query = query_db('SELECT short FROM urls WHERE long = ?', (long,))
 | 
			
		||||
    for i in query:
 | 
			
		||||
        if i and (len(i['short']) <= current_app.config["random_length"]):  # Checks if query if pre-existing URL is same as random length URL
 | 
			
		||||
            return i['short']
 | 
			
		||||
    return False
 | 
			
		||||
def dict_factory(cursor, row):
 | 
			
		||||
    d = {}
 | 
			
		||||
    for idx, col in enumerate(cursor.description):
 | 
			
		||||
        d[col[0]] = row[idx]
 | 
			
		||||
    return d
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def generate_short():
 | 
			
		||||
    return ''.join(random.choice(current_app.config['allowed_chars'])
 | 
			
		||||
                   for i in range(current_app.config['random_length']))
 | 
			
		||||
def generate_short(rq):
 | 
			
		||||
    timeout = time.time() + current_app.config['random_gen_timeout']
 | 
			
		||||
    while True:
 | 
			
		||||
        if time.time() >= timeout:
 | 
			
		||||
            return response(rq, None, 'Timeout while generating random short URL')
 | 
			
		||||
        short = ''.join(random.choice(current_app.config['allowed_chars'])
 | 
			
		||||
                        for i in range(current_app.config['random_length']))
 | 
			
		||||
        if not check_short_exist(short):
 | 
			
		||||
            return short
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def list_shortlinks():
 | 
			
		||||
    result = query_db('SELECT * FROM urls', (), False, None)
 | 
			
		||||
    result = nested_list_to_dict(result)
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def nested_list_to_dict(l):
 | 
			
		||||
    d = {}
 | 
			
		||||
    for nl in l:
 | 
			
		||||
            d[nl[0]] = nl[1]
 | 
			
		||||
    return d
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def response(rq, result, error_msg="Error: Unknown error"):
 | 
			
		||||
    if 'api' in rq.form and 'format' not in rq.form:
 | 
			
		||||
        return "Format type HTML (default) not support for API"  # Future-proof for non-json return types
 | 
			
		||||
    if 'format' in rq.form and rq.form['format'] == 'json':
 | 
			
		||||
        # If not result provided OR result doesn't exist, send error
 | 
			
		||||
        # Allows for setting an error message with explicitly checking in regular code
 | 
			
		||||
        if result:
 | 
			
		||||
            if result is True:  # Allows sending with no result (ie. during deletion)
 | 
			
		||||
                return jsonify(success=True)
 | 
			
		||||
            else:
 | 
			
		||||
                return jsonify(success=True, result=result)
 | 
			
		||||
        else:
 | 
			
		||||
            return jsonify(success=False, error=error_msg)
 | 
			
		||||
    else:
 | 
			
		||||
        if result:
 | 
			
		||||
            return render_template("main.html", result=(True, result))
 | 
			
		||||
        else:
 | 
			
		||||
            return render_template("main.html", result=(False, error_msg))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def validate_short(short):
 | 
			
		||||
    for char in short:
 | 
			
		||||
        if char not in current_app.config['allowed_chars']:
 | 
			
		||||
            return response(request, None,
 | 
			
		||||
                            'Character ' + char + ' not allowed in short URL')
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def validate_long(long):  # https://stackoverflow.com/a/36283503
 | 
			
		||||
    token = urllib.parse.urlparse(long)
 | 
			
		||||
    return all([token.scheme, token.netloc])
 | 
			
		||||
 | 
			
		||||
# Database connection functions
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_db():
 | 
			
		||||
| 
						 | 
				
			
			@ -71,32 +156,25 @@ def get_db():
 | 
			
		|||
            ''.join((current_app.config['database_name'], '.db')),
 | 
			
		||||
            detect_types=sqlite3.PARSE_DECLTYPES
 | 
			
		||||
        )
 | 
			
		||||
        g.db.row_factory = sqlite3.Row
 | 
			
		||||
        g.db.cursor().execute('CREATE TABLE IF NOT EXISTS urls (long,short)')
 | 
			
		||||
    return g.db
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def query_db(query, args=(), one=False):
 | 
			
		||||
def query_db(query, args=(), one=False, row_factory=sqlite3.Row):
 | 
			
		||||
    get_db().row_factory = row_factory
 | 
			
		||||
    cur = get_db().execute(query, args)
 | 
			
		||||
    rv = cur.fetchall()
 | 
			
		||||
    cur.close()
 | 
			
		||||
    return (rv[0] if rv else None) if one else rv
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def response(rq, short, error_msg=None):
 | 
			
		||||
    if 'json' in rq.form and rq.form['json']:
 | 
			
		||||
        pass
 | 
			
		||||
    else:
 | 
			
		||||
        if short:
 | 
			
		||||
            return render_template("main.html", result=(True, rq.base_url + short))
 | 
			
		||||
        else:
 | 
			
		||||
            return render_template("main.html", result=(False, error_msg))
 | 
			
		||||
@app.teardown_appcontext
 | 
			
		||||
def close_db(error):
 | 
			
		||||
    if hasattr(g, 'sqlite_db'):
 | 
			
		||||
        g.sqlite_db.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
config = load_config()
 | 
			
		||||
 | 
			
		||||
app = Flask(__name__)
 | 
			
		||||
app.config.update(config)  # Add loaded YAML config to Flask config
 | 
			
		||||
app.config.update(load_config())  # Add YAML config to Flask config
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.route('/')
 | 
			
		||||
| 
						 | 
				
			
			@ -106,33 +184,61 @@ def main():
 | 
			
		|||
 | 
			
		||||
@app.route('/', methods=['POST'])
 | 
			
		||||
def main_post():
 | 
			
		||||
    # Check if long in form (ie. provided by curl) and not blank (browsers always send blank forms as empty quote)
 | 
			
		||||
    if 'long' in request.form and request.form['long']:
 | 
			
		||||
        if 'short' in request.form and request.form['short']:
 | 
			
		||||
            for char in request.form['short']:
 | 
			
		||||
                if char not in current_app.config['allowed_chars']:
 | 
			
		||||
                    return response(request, None, 'Character ' + char + ' not allowed in short URL.')
 | 
			
		||||
            short = request.form['short']
 | 
			
		||||
            # Validate long as URL and short custom text against allowed characters
 | 
			
		||||
            if not validate_long(request.form['long']):
 | 
			
		||||
                return response(request, None, "Long URL is not valid")
 | 
			
		||||
            result = validate_short(request.form['short'])
 | 
			
		||||
            if validate_short(request.form['short']) is True:
 | 
			
		||||
                short = request.form['short']
 | 
			
		||||
            else:
 | 
			
		||||
                return result
 | 
			
		||||
            if check_short_exist(short, request.form['long']) is short:
 | 
			
		||||
                return response(request, (current_app.config['site_url'] or request.base_url) + short,
 | 
			
		||||
                                'Error: Failed to return pre-existing non-random shortlink')
 | 
			
		||||
        else:
 | 
			
		||||
            timeout = time.time() + current_app.config['random_gen_timeout']
 | 
			
		||||
            while True:
 | 
			
		||||
                if time.time() >= timeout:
 | 
			
		||||
                    return response(request, None, 'Timeout while generating random short URL.')
 | 
			
		||||
                short = generate_short()
 | 
			
		||||
                if not check_short_exist(short):
 | 
			
		||||
                    break
 | 
			
		||||
        short_exists = check_short_exist(short)
 | 
			
		||||
            short = generate_short(request)
 | 
			
		||||
        if check_short_exist(short) is True:
 | 
			
		||||
            return response(request, None,
 | 
			
		||||
                            'Short URL already exists')
 | 
			
		||||
        long_exists = check_long_exist(request.form['long'])
 | 
			
		||||
        if long_exists and not ('short' in request.form and request.form['short']):
 | 
			
		||||
            return response(request, long_exists)
 | 
			
		||||
        if short_exists:
 | 
			
		||||
            return response(request, None, "Short URL already exists.")
 | 
			
		||||
        database = get_db()
 | 
			
		||||
        database.cursor().execute("INSERT INTO urls (long,short) VALUES (?,?)", (request.form['long'], short))
 | 
			
		||||
        database.commit()
 | 
			
		||||
        database.close()
 | 
			
		||||
        return response(request, short)
 | 
			
		||||
        if long_exists:
 | 
			
		||||
            return response(request, (current_app.config['site_url'] or request.base_url) + long_exists,
 | 
			
		||||
                            'Error: Failed to return pre-existing random shortlink')
 | 
			
		||||
        get_db().cursor().execute('INSERT INTO urls (long,short) VALUES (?,?)', (request.form['long'], short))
 | 
			
		||||
        get_db().commit()
 | 
			
		||||
        return response(request, (current_app.config['site_url'] or request.base_url) + short,
 | 
			
		||||
                        'Error: Failed to generate')
 | 
			
		||||
    elif 'api' in request.form:
 | 
			
		||||
        # All API calls require authentication
 | 
			
		||||
        if not request.authorization \
 | 
			
		||||
                or not authenticate(request.authorization['username'], request.authorization['password']):
 | 
			
		||||
            return response(request, None, "BaiscAuth failed")
 | 
			
		||||
        command = request.form['api']
 | 
			
		||||
        if command == 'list' or command == 'listshort':
 | 
			
		||||
            return response(request, list_shortlinks(), "Failed to list items")
 | 
			
		||||
        elif command == 'listlong':
 | 
			
		||||
            shortlinks = list_shortlinks()
 | 
			
		||||
            shortlinks = {v: k for k, v in shortlinks.items()}
 | 
			
		||||
            return response(request, shortlinks, "Failed to list items")
 | 
			
		||||
        elif command == 'delete':
 | 
			
		||||
            deleted = 0
 | 
			
		||||
            if 'long' not in request.form and 'short' not in request.form:
 | 
			
		||||
                return response(request, None, "Provide short or long in POST data")
 | 
			
		||||
            if 'short' in request.form:
 | 
			
		||||
                deleted = delete_url(request.form['short']) + deleted
 | 
			
		||||
            if 'long' in request.form:
 | 
			
		||||
                deleted = delete_url(request.form['long']) + deleted
 | 
			
		||||
            if deleted > 0:
 | 
			
		||||
                return response(request, "Deleted " + str(deleted) + " URLs")
 | 
			
		||||
            else:
 | 
			
		||||
                return response(request, None, "Failed to delete URL")
 | 
			
		||||
        else:
 | 
			
		||||
            return response(request, None, 'Command ' + command + ' not found')
 | 
			
		||||
    else:
 | 
			
		||||
        return "Long URL required!"
 | 
			
		||||
        return response(request, None, 'Long URL required')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,10 +3,6 @@ div.form {
 | 
			
		|||
  text-align: center;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
input {
 | 
			
		||||
  margin: auto;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
div.success {
 | 
			
		||||
  display: inline-block;
 | 
			
		||||
  font-family: Open Sans;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,11 +12,15 @@
 | 
			
		|||
    <div class="form">
 | 
			
		||||
        <h2>{{ config.site_name }}</h2>
 | 
			
		||||
        <form class="pure-form">
 | 
			
		||||
            <input name="long" type="url" placeholder="Long URL">
 | 
			
		||||
            <p>
 | 
			
		||||
            <input name="short" type="text" placeholder="Custom link (optional)">
 | 
			
		||||
                <input name="long" type="url" placeholder="Long URL">
 | 
			
		||||
            </p>
 | 
			
		||||
            <p>
 | 
			
		||||
            <button type="submit" class="pure-button pure-button-primary" formmethod="post">Shorten</button>
 | 
			
		||||
                <input name="short" type="text" placeholder="Custom link (optional)">
 | 
			
		||||
            </p>
 | 
			
		||||
            <p>
 | 
			
		||||
                <button type="submit" class="pure-button pure-button-primary" formmethod="post">Shorten</button>
 | 
			
		||||
            </p>
 | 
			
		||||
        </form>
 | 
			
		||||
    </div>
 | 
			
		||||
    {% if result is defined and result[0] %}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Reference in a new issue