Skip to content
Snippets Groups Projects
Unverified Commit 7d7ddd65 authored by Piero Toffanin's avatar Piero Toffanin Committed by GitHub
Browse files

Merge pull request #90 from yogeshwaran01/main

Linted with Black and flake8
parents 167f551a 95487f84
No related branches found
No related tags found
No related merge requests found
import os
from appdirs import user_data_dir
# override polyglot path
import polyglot
polyglot.polyglot_path = os.path.join(user_data_dir(appname="LibreTranslate", appauthor="uav4geo"), "polyglot_data")
from appdirs import user_data_dir
polyglot.polyglot_path = os.path.join(
user_data_dir(appname="LibreTranslate", appauthor="uav4geo"), "polyglot_data"
)
from .main import main
......
import sqlite3
import uuid
from expiringdict import ExpiringDict
DEFAULT_DB_PATH = "api_keys.db"
class Database:
def __init__(self, db_path = DEFAULT_DB_PATH, max_cache_len=1000, max_cache_age=30):
def __init__(self, db_path=DEFAULT_DB_PATH, max_cache_len=1000, max_cache_age=30):
self.db_path = db_path
self.cache = ExpiringDict(max_len=max_cache_len, max_age_seconds=max_cache_age)
# Make sure to do data synchronization on writes!
self.c = sqlite3.connect(db_path, check_same_thread=False)
self.c.execute('''CREATE TABLE IF NOT EXISTS api_keys (
self.c.execute(
"""CREATE TABLE IF NOT EXISTS api_keys (
"api_key" TEXT NOT NULL,
"req_limit" INTEGER NOT NULL,
PRIMARY KEY("api_key")
);''')
);"""
)
def lookup(self, api_key):
req_limit = self.cache.get(api_key)
if req_limit is None:
# DB Lookup
stmt = self.c.execute('SELECT req_limit FROM api_keys WHERE api_key = ?', (api_key, ))
stmt = self.c.execute(
"SELECT req_limit FROM api_keys WHERE api_key = ?", (api_key,)
)
row = stmt.fetchone()
if row is not None:
self.cache[api_key] = row[0]
......@@ -29,26 +35,29 @@ class Database:
else:
self.cache[api_key] = False
req_limit = False
if isinstance(req_limit, bool):
req_limit = None
return req_limit
def add(self, req_limit, api_key = "auto"):
def add(self, req_limit, api_key="auto"):
if api_key == "auto":
api_key = str(uuid.uuid4())
self.remove(api_key)
self.c.execute("INSERT INTO api_keys (api_key, req_limit) VALUES (?, ?)", (api_key, req_limit))
self.c.execute(
"INSERT INTO api_keys (api_key, req_limit) VALUES (?, ?)",
(api_key, req_limit),
)
self.c.commit()
return (api_key, req_limit)
def remove(self, api_key):
self.c.execute('DELETE FROM api_keys WHERE api_key = ?', (api_key, ))
self.c.execute("DELETE FROM api_keys WHERE api_key = ?", (api_key,))
self.c.commit()
return api_key
def all(self):
row = self.c.execute("SELECT api_key, req_limit FROM api_keys")
return row.fetchall()
\ No newline at end of file
return row.fetchall()
import os
from flask import Flask, render_template, jsonify, request, abort, send_from_directory
from functools import wraps
from flask import Flask, abort, jsonify, render_template, request
from flask_swagger import swagger
from flask_swagger_ui import get_swaggerui_blueprint
from pkg_resources import resource_filename
from .api_keys import Database
from app.language import detect_languages, transliterate
from app import flood
from functools import wraps
from app.language import detect_languages, transliterate
from .api_keys import Database
def get_json_dict(request):
d = request.get_json()
......@@ -14,14 +17,16 @@ def get_json_dict(request):
abort(400, description="Invalid JSON format")
return d
def get_remote_address():
if request.headers.getlist("X-Forwarded-For"):
ip = request.headers.getlist("X-Forwarded-For")[0]
else:
ip = request.remote_addr or '127.0.0.1'
ip = request.remote_addr or "127.0.0.1"
return ip
def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
if default_req_limit == -1:
# TODO: better way?
......@@ -33,7 +38,7 @@ def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
if api_keys_db:
if request.is_json:
json = get_json_dict(request)
api_key = json.get('api_key')
api_key = json.get("api_key")
else:
api_key = request.values.get("api_key")
......@@ -47,53 +52,67 @@ def get_routes_limits(default_req_limit, daily_req_limit, api_keys_db):
res = [limits]
if daily_req_limit > 0:
res.append("%s per day" % daily_req_limit)
res.append("%s per day" % daily_req_limit)
return res
def create_app(args):
from app.init import boot
boot(args.load_only)
from app.language import languages
app = Flask(__name__)
if args.debug:
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["TEMPLATES_AUTO_RELOAD"] = True
# Map userdefined frontend languages to argos language object.
if args.frontend_language_source == "auto":
frontend_argos_language_source = type('obj', (object,), {
'code': 'auto',
'name': 'Auto Detect'
})
frontend_argos_language_source = type(
"obj", (object,), {"code": "auto", "name": "Auto Detect"}
)
else:
frontend_argos_language_source = next(iter([l for l in languages if l.code == args.frontend_language_source]), None)
frontend_argos_language_source = next(
iter([l for l in languages if l.code == args.frontend_language_source]),
None,
)
frontend_argos_language_target = next(iter([l for l in languages if l.code == args.frontend_language_target]), None)
frontend_argos_language_target = next(
iter([l for l in languages if l.code == args.frontend_language_target]), None
)
# Raise AttributeError to prevent app startup if user input is not valid.
if frontend_argos_language_source is None:
raise AttributeError(f"{args.frontend_language_source} as frontend source language is not supported.")
raise AttributeError(
f"{args.frontend_language_source} as frontend source language is not supported."
)
if frontend_argos_language_target is None:
raise AttributeError(f"{args.frontend_language_target} as frontend target language is not supported.")
raise AttributeError(
f"{args.frontend_language_target} as frontend target language is not supported."
)
api_keys_db = None
if args.req_limit > 0 or args.api_keys or args.daily_req_limit > 0:
api_keys_db = Database() if args.api_keys else None
from flask_limiter import Limiter
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=get_routes_limits(args.req_limit, args.daily_req_limit, api_keys_db)
default_limits=get_routes_limits(
args.req_limit, args.daily_req_limit, api_keys_db
),
)
else:
from .no_limiter import Limiter
limiter = Limiter()
from .no_limiter import Limiter
limiter = Limiter()
if args.req_flood_threshold > 0:
flood.setup(args.req_flood_threshold)
......@@ -102,7 +121,7 @@ def create_app(args):
def func(*a, **kw):
if flood.is_banned(get_remote_address()):
abort(403, description="Too many request limits violations")
if args.api_keys and args.require_api_key_origin:
if request.is_json:
json = get_json_dict(request)
......@@ -110,10 +129,16 @@ def create_app(args):
else:
ak = request.values.get("api_key")
if api_keys_db.lookup(ak) is None and request.headers.get("Origin") != args.require_api_key_origin:
abort(403, description="Please contact the server operator to obtain an API key")
if (
api_keys_db.lookup(ak) is None and request.headers.get("Origin") != args.require_api_key_origin
):
abort(
403,
description="Please contact the server operator to obtain an API key",
)
return f(*a, **kw)
return func
@app.errorhandler(400)
......@@ -133,18 +158,23 @@ def create_app(args):
def denied(e):
return jsonify({"error": str(e.description)}), 403
@app.route("/")
@limiter.exempt
def index():
return render_template('index.html', gaId=args.ga_id, frontendTimeout=args.frontend_timeout, api_keys=args.api_keys, web_version=os.environ.get('LT_WEB') is not None)
return render_template(
"index.html",
gaId=args.ga_id,
frontendTimeout=args.frontend_timeout,
api_keys=args.api_keys,
web_version=os.environ.get("LT_WEB") is not None,
)
@app.route("/javascript-licenses", methods=['GET'])
@app.route("/javascript-licenses", methods=["GET"])
@limiter.exempt
def javascript_licenses():
return render_template('javascript-licenses.html')
return render_template("javascript-licenses.html")
@app.route("/languages", methods=['GET', 'POST'])
@app.route("/languages", methods=["GET", "POST"])
@limiter.exempt
def langs():
"""
......@@ -177,21 +207,22 @@ def create_app(args):
type: string
description: Reason for slow down
"""
return jsonify([{'code': l.code, 'name': l.name} for l in languages])
return jsonify([{"code": l.code, "name": l.name} for l in languages])
# Add cors
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin','*')
response.headers.add('Access-Control-Allow-Headers', "Authorization, Content-Type")
response.headers.add('Access-Control-Expose-Headers', "Authorization")
response.headers.add('Access-Control-Allow-Methods', "GET, POST")
response.headers.add('Access-Control-Allow-Credentials', "true")
response.headers.add('Access-Control-Max-Age', 60 * 60 * 24 * 20)
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add(
"Access-Control-Allow-Headers", "Authorization, Content-Type"
)
response.headers.add("Access-Control-Expose-Headers", "Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET, POST")
response.headers.add("Access-Control-Allow-Credentials", "true")
response.headers.add("Access-Control-Max-Age", 60 * 60 * 24 * 20)
return response
@app.route("/translate", methods=['POST'])
@app.route("/translate", methods=["POST"])
@access_check
def translate():
"""
......@@ -282,9 +313,9 @@ def create_app(args):
"""
if request.is_json:
json = get_json_dict(request)
q = json.get('q')
source_lang = json.get('source')
target_lang = json.get('target')
q = json.get("q")
source_lang = json.get("source")
target_lang = json.get("target")
else:
q = request.values.get("q")
source_lang = request.values.get("source")
......@@ -300,20 +331,28 @@ def create_app(args):
batch = isinstance(q, list)
if batch and args.batch_limit != -1:
batch_size = len(q)
if args.batch_limit < batch_size:
abort(400, description="Invalid request: Request (%d) exceeds text limit (%d)" % (batch_size, args.batch_limit))
batch_size = len(q)
if args.batch_limit < batch_size:
abort(
400,
description="Invalid request: Request (%d) exceeds text limit (%d)"
% (batch_size, args.batch_limit),
)
if args.char_limit != -1:
if batch:
chars = sum([len(text) for text in q])
chars = sum([len(text) for text in q])
else:
chars = len(q)
chars = len(q)
if args.char_limit < chars:
abort(400, description="Invalid request: Request (%d) exceeds character limit (%d)" % (chars, args.char_limit))
abort(
400,
description="Invalid request: Request (%d) exceeds character limit (%d)"
% (chars, args.char_limit),
)
if source_lang == 'auto':
if source_lang == "auto":
candidate_langs = detect_languages(q)
if args.debug:
......@@ -335,14 +374,29 @@ def create_app(args):
translator = src_lang.get_translation(tgt_lang)
try:
if batch:
return jsonify({"translatedText": [translator.translate(transliterate(text, target_lang=source_lang)) for text in q] })
else:
return jsonify({"translatedText": translator.translate(transliterate(q, target_lang=source_lang)) })
if batch:
return jsonify(
{
"translatedText": [
translator.translate(
transliterate(text, target_lang=source_lang)
)
for text in q
]
}
)
else:
return jsonify(
{
"translatedText": translator.translate(
transliterate(q, target_lang=source_lang)
)
}
)
except Exception as e:
abort(500, description="Cannot translate text: %s" % str(e))
@app.route("/detect", methods=['POST'])
@app.route("/detect", methods=["POST"])
@access_check
def detect():
"""
......@@ -393,7 +447,7 @@ def create_app(args):
properties:
error:
type: string
description: Error message
description: Error message
500:
description: Detection error
schema:
......@@ -427,7 +481,7 @@ def create_app(args):
if request.is_json:
json = get_json_dict(request)
q = json.get('q')
q = json.get("q")
else:
q = request.values.get("q")
......@@ -436,7 +490,6 @@ def create_app(args):
return jsonify(detect_languages(q))
@app.route("/frontend/settings")
@limiter.exempt
def frontend_settings():
......@@ -480,30 +533,37 @@ def create_app(args):
type: string
description: Human-readable language name (in English)
"""
return jsonify({'charLimit': args.char_limit,
'frontendTimeout': args.frontend_timeout,
'language': {
'source': {'code': frontend_argos_language_source.code, 'name': frontend_argos_language_source.name},
'target': {'code': frontend_argos_language_target.code, 'name': frontend_argos_language_target.name}}
})
return jsonify(
{
"charLimit": args.char_limit,
"frontendTimeout": args.frontend_timeout,
"language": {
"source": {
"code": frontend_argos_language_source.code,
"name": frontend_argos_language_source.name,
},
"target": {
"code": frontend_argos_language_target.code,
"name": frontend_argos_language_target.name,
},
},
}
)
swag = swagger(app)
swag['info']['version'] = "1.2"
swag['info']['title'] = "LibreTranslate"
swag["info"]["version"] = "1.2"
swag["info"]["title"] = "LibreTranslate"
@app.route("/spec")
@limiter.exempt
def spec():
return jsonify(swag)
SWAGGER_URL = '/docs' # URL for exposing Swagger UI (without trailing '/')
API_URL = '/spec'
SWAGGER_URL = "/docs" # URL for exposing Swagger UI (without trailing '/')
API_URL = "/spec"
# Call factory function to create our blueprint
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL
)
swaggerui_blueprint = get_swaggerui_blueprint(SWAGGER_URL, API_URL)
app.register_blueprint(swaggerui_blueprint)
......
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
......@@ -7,11 +6,13 @@ banned = {}
active = False
threshold = -1
def clear_banned():
global banned
banned = {}
def setup(violations_threshold = 100):
def setup(violations_threshold=100):
global active
global threshold
......@@ -31,6 +32,7 @@ def report(request_ip):
banned[request_ip] = banned.get(request_ip, 0)
banned[request_ip] += 1
def is_banned(request_ip):
# More than X offences?
return active and banned.get(request_ip, 0) >= threshold
import os
from pathlib import Path
from argostranslate import settings, package, translate
import os, glob, shutil, zipfile
import app.language
import polyglot
from argostranslate import package, translate
import app.language
def boot(load_only=None):
try:
......@@ -12,6 +13,7 @@ def boot(load_only=None):
except Exception as e:
print("Cannot update models (normal if you're offline): %s" % str(e))
def check_and_install_models(force=False, load_only_lang_codes=None):
if len(package.get_installed_packages()) < 2 or force:
# Update package definitions from remote
......@@ -29,48 +31,63 @@ def check_and_install_models(force=False, load_only_lang_codes=None):
for pack in available_packages:
unavailable_lang_codes -= {pack.from_code, pack.to_code}
if unavailable_lang_codes:
raise ValueError('Unavailable language codes: %s.' % ','.join(sorted(unavailable_lang_codes)))
raise ValueError(
"Unavailable language codes: %s."
% ",".join(sorted(unavailable_lang_codes))
)
# Keep only the packages that have both from_code and to_code in our list.
available_packages = [
pack
for pack in available_packages
if pack.from_code in load_only_lang_codes
and pack.to_code in load_only_lang_codes
if pack.from_code in load_only_lang_codes and pack.to_code in load_only_lang_codes
]
if not available_packages:
raise ValueError('no available package')
raise ValueError("no available package")
print("Keep %s models" % len(available_packages))
# Download and install all available packages
for available_package in available_packages:
print("Downloading %s (%s) ..." % (available_package, available_package.package_version))
print(
"Downloading %s (%s) ..."
% (available_package, available_package.package_version)
)
download_path = available_package.download()
package.install_from_path(download_path)
# reload installed languages
app.language.languages = translate.load_installed_languages()
print("Loaded support for %s languages (%s models total)!" % (len(translate.load_installed_languages()), len(available_packages)))
print(
"Loaded support for %s languages (%s models total)!"
% (len(translate.load_installed_languages()), len(available_packages))
)
def check_and_install_transliteration(force=False):
# 'en' is not a supported transliteration language
transliteration_languages = [l.code for l in app.language.languages if l.code != "en"]
transliteration_languages = [
l.code for l in app.language.languages if l.code != "en"
]
# check installed
install_needed = []
if not force:
t_packages_path = Path(polyglot.polyglot_path) / "transliteration2"
for lang in transliteration_languages:
if not (t_packages_path / lang / f"transliteration.{lang}.tar.bz2").exists():
if not (
t_packages_path / lang / f"transliteration.{lang}.tar.bz2"
).exists():
install_needed.append(lang)
else:
install_needed = transliteration_languages
# install the needed transliteration packages
if install_needed:
print(f"Installing transliteration models for the following languages: {', '.join(install_needed)}")
print(
f"Installing transliteration models for the following languages: {', '.join(install_needed)}"
)
from polyglot.downloader import Downloader
downloader = Downloader()
for lang in install_needed:
......
......@@ -4,7 +4,6 @@ from argostranslate import translate
from polyglot.detect.base import Detector, UnknownLanguage
from polyglot.transliteration.base import Transliterator
languages = translate.load_installed_languages()
......@@ -24,24 +23,21 @@ def detect_languages(text):
for t in text:
try:
candidates.extend(Detector(t).languages)
except UnknownLanguage as e:
except UnknownLanguage:
pass
# total read bytes of the provided text
read_bytes_total = sum(c.read_bytes for c in candidates)
# only use candidates that are supported by argostranslate
candidate_langs = list(filter(lambda l: l.read_bytes != 0 and l.code in __lang_codes, candidates))
candidate_langs = list(
filter(lambda l: l.read_bytes != 0 and l.code in __lang_codes, candidates)
)
# this happens if no language could be detected
if not candidate_langs:
# use language "en" by default but with zero confidence
return [
{
'confidence': 0.0,
'language': "en"
}
]
return [{"confidence": 0.0, "language": "en"}]
# for multiple occurrences of the same language (can happen on batch detection)
# calculate the average confidence for each language
......@@ -65,15 +61,11 @@ def detect_languages(text):
candidate_langs = temp_average_list
# sort the candidates descending based on the detected confidence
candidate_langs.sort(key=lambda l: (l.confidence * l.read_bytes) / read_bytes_total, reverse=True)
candidate_langs.sort(
key=lambda l: (l.confidence * l.read_bytes) / read_bytes_total, reverse=True
)
return [
{
'confidence': l.confidence,
'language': l.code
}
for l in candidate_langs
]
return [{"confidence": l.confidence, "language": l.code} for l in candidate_langs]
def __transliterate_line(transliterator, line_text):
......@@ -97,9 +89,9 @@ def __transliterate_line(transliterator, line_text):
else:
# add back any stripped punctuation
if r_diff:
t_word = t_word + ''.join(r_diff)
t_word = t_word + "".join(r_diff)
if l_diff:
t_word = ''.join(l_diff) + t_word
t_word = "".join(l_diff) + t_word
new_text.append(t_word)
......
import argparse
import operator
from app.app import create_app
def main():
parser = argparse.ArgumentParser(description='LibreTranslate - Free and Open Source Translation API')
parser.add_argument('--host', type=str,
help='Hostname (%(default)s)', default="127.0.0.1")
parser.add_argument('--port', type=int,
help='Port (%(default)s)', default=5000)
parser.add_argument('--char-limit', default=-1, type=int, metavar="<number of characters>",
help='Set character limit (%(default)s)')
parser.add_argument('--req-limit', default=-1, type=int, metavar="<number>",
help='Set the default maximum number of requests per minute per client (%(default)s)')
parser.add_argument('--daily-req-limit', default=-1, type=int, metavar="<number>",
help='Set the default maximum number of requests per day per client, in addition to req-limit. (%(default)s)')
parser.add_argument('--req-flood-threshold', default=-1, type=int, metavar="<number>",
help='Set the maximum number of request limit offences per 4 weeks that a client can exceed before being banned. (%(default)s)')
parser.add_argument('--batch-limit', default=-1, type=int, metavar="<number of texts>",
help='Set maximum number of texts to translate in a batch request (%(default)s)')
parser.add_argument('--ga-id', type=str, default=None, metavar="<GA ID>",
help='Enable Google Analytics on the API client page by providing an ID (%(default)s)')
parser.add_argument('--debug', default=False, action="store_true",
help="Enable debug environment")
parser.add_argument('--ssl', default=None, action="store_true",
help="Whether to enable SSL")
parser.add_argument('--frontend-language-source', type=str, default="en", metavar="<language code>",
help='Set frontend default language - source (%(default)s)')
parser.add_argument('--frontend-language-target', type=str, default="es", metavar="<language code>",
help='Set frontend default language - target (%(default)s)')
parser.add_argument('--frontend-timeout', type=int, default=500, metavar="<milliseconds>",
help='Set frontend translation timeout (%(default)s)')
parser.add_argument('--api-keys', default=False, action="store_true",
help="Enable API keys database for per-user rate limits lookup")
parser.add_argument('--require-api-key-origin', type=str, default="",
help="Require use of an API key for programmatic access to the API, unless the request origin matches this domain")
parser.add_argument('--load-only', type=operator.methodcaller('split', ','),
metavar='<comma-separated language codes>',
help='Set available languages (ar,de,en,es,fr,ga,hi,it,ja,ko,pt,ru,zh)')
parser = argparse.ArgumentParser(
description="LibreTranslate - Free and Open Source Translation API"
)
parser.add_argument(
"--host", type=str, help="Hostname (%(default)s)", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="Port (%(default)s)", default=5000)
parser.add_argument(
"--char-limit",
default=-1,
type=int,
metavar="<number of characters>",
help="Set character limit (%(default)s)",
)
parser.add_argument(
"--req-limit",
default=-1,
type=int,
metavar="<number>",
help="Set the default maximum number of requests per minute per client (%(default)s)",
)
parser.add_argument(
"--daily-req-limit",
default=-1,
type=int,
metavar="<number>",
help="Set the default maximum number of requests per day per client, in addition to req-limit. (%(default)s)",
)
parser.add_argument(
"--req-flood-threshold",
default=-1,
type=int,
metavar="<number>",
help="Set the maximum number of request limit offences per 4 weeks that a client can exceed before being banned. (%(default)s)",
)
parser.add_argument(
"--batch-limit",
default=-1,
type=int,
metavar="<number of texts>",
help="Set maximum number of texts to translate in a batch request (%(default)s)",
)
parser.add_argument(
"--ga-id",
type=str,
default=None,
metavar="<GA ID>",
help="Enable Google Analytics on the API client page by providing an ID (%(default)s)",
)
parser.add_argument(
"--debug", default=False, action="store_true", help="Enable debug environment"
)
parser.add_argument(
"--ssl", default=None, action="store_true", help="Whether to enable SSL"
)
parser.add_argument(
"--frontend-language-source",
type=str,
default="en",
metavar="<language code>",
help="Set frontend default language - source (%(default)s)",
)
parser.add_argument(
"--frontend-language-target",
type=str,
default="es",
metavar="<language code>",
help="Set frontend default language - target (%(default)s)",
)
parser.add_argument(
"--frontend-timeout",
type=int,
default=500,
metavar="<milliseconds>",
help="Set frontend translation timeout (%(default)s)",
)
parser.add_argument(
"--api-keys",
default=False,
action="store_true",
help="Enable API keys database for per-user rate limits lookup",
)
parser.add_argument(
"--require-api-key-origin",
type=str,
default="",
help="Require use of an API key for programmatic access to the API, unless the request origin matches this domain",
)
parser.add_argument(
"--load-only",
type=operator.methodcaller("split", ","),
metavar="<comma-separated language codes>",
help="Set available languages (ar,de,en,es,fr,ga,hi,it,ja,ko,pt,ru,zh)",
)
args = parser.parse_args()
app = create_app(args)
......@@ -46,7 +107,14 @@ def main():
app.run(host=args.host, port=args.port)
else:
from waitress import serve
serve(app, host=args.host, port=args.port, url_scheme='https' if args.ssl else 'http')
serve(
app,
host=args.host,
port=args.port,
url_scheme="https" if args.ssl else "http",
)
if __name__ == "__main__":
main()
import argparse
from app.api_keys import Database
def manage():
parser = argparse.ArgumentParser(description='LibreTranslate Manage Tools')
subparsers = parser.add_subparsers(help='', dest='command', required=True, title="Command List")
keys_parser = subparsers.add_parser('keys', help='Manage API keys database')
keys_subparser = keys_parser.add_subparsers(help='', dest='sub_command', title="Command List")
keys_add_parser = keys_subparser.add_parser('add', help='Add API keys to database')
keys_add_parser.add_argument('req_limit',
type=int,
help='Request Limits (per second)')
keys_add_parser.add_argument('--key',
type=str,
default="auto",
required=False,
help='API Key')
keys_remove_parser = keys_subparser.add_parser('remove', help='Remove API keys to database')
keys_remove_parser.add_argument('key',
type=str,
help='API Key')
parser = argparse.ArgumentParser(description="LibreTranslate Manage Tools")
subparsers = parser.add_subparsers(
help="", dest="command", required=True, title="Command List"
)
keys_parser = subparsers.add_parser("keys", help="Manage API keys database")
keys_subparser = keys_parser.add_subparsers(
help="", dest="sub_command", title="Command List"
)
keys_add_parser = keys_subparser.add_parser("add", help="Add API keys to database")
keys_add_parser.add_argument(
"req_limit", type=int, help="Request Limits (per second)"
)
keys_add_parser.add_argument(
"--key", type=str, default="auto", required=False, help="API Key"
)
keys_remove_parser = keys_subparser.add_parser(
"remove", help="Remove API keys to database"
)
keys_remove_parser.add_argument("key", type=str, help="API Key")
args = parser.parse_args()
if args.command == 'keys':
if args.command == "keys":
db = Database()
if args.sub_command is None:
# Print keys
......@@ -36,10 +40,10 @@ def manage():
for item in keys:
print("%s: %s" % item)
elif args.sub_command == 'add':
elif args.sub_command == "add":
print(db.add(args.req_limit, args.key)[0])
elif args.sub_command == 'remove':
elif args.sub_command == "remove":
print(db.remove(args.key))
else:
parser.print_help()
exit(1)
\ No newline at end of file
exit(1)
from functools import wraps
class Limiter:
def exempt(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
def exempt(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
......@@ -3,5 +3,5 @@
from app.init import check_and_install_models, check_and_install_transliteration
if __name__ == "__main__":
check_and_install_models(force=True)
check_and_install_transliteration(force=True)
check_and_install_models(force=True)
check_and_install_transliteration(force=True)
[flake8]
exclude = .git,
.vscode,
.gitignore,
README.md,
venv,
test,
setup.py,
app/__init__.py
max-line-length = 136
ignore = E741
import pytest
from app.init import boot
from argostranslate import package
def test_boot_argos():
"""Test Argos translate models initialization"""
boot()
assert len(package.get_installed_packages()) > 2
\ No newline at end of file
assert len(package.get_installed_packages()) > 2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment