| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 | # -*- coding:utf-8 -*-import randomimport reimport stringimport subprocessimport uuidfrom datetime import datetimefrom hashlib import sha256from zoneinfo import available_timezonesfrom flask_restful import fieldsdef run(script):    return subprocess.getstatusoutput('source /root/.bashrc && ' + script)class TimestampField(fields.Raw):    def format(self, value):        return int(value.timestamp())def email(email):    # Define a regex pattern for email addresses    pattern = r"^[\w\.-]+@([\w-]+\.)+[\w-]{2,}$"    # Check if the email matches the pattern    if re.match(pattern, email) is not None:        return email    error = ('{email} is not a valid email.'             .format(email=email))    raise ValueError(error)def uuid_value(value):    if value == '':        return str(value)    try:        uuid_obj = uuid.UUID(value)        return str(uuid_obj)    except ValueError:        error = ('{value} is not a valid uuid.'                 .format(value=value))        raise ValueError(error)def timestamp_value(timestamp):    try:        int_timestamp = int(timestamp)        if int_timestamp < 0:            raise ValueError        return int_timestamp    except ValueError:        error = ('{timestamp} is not a valid timestamp.'                 .format(timestamp=timestamp))        raise ValueError(error)class str_len(object):    """ Restrict input to an integer in a range (inclusive) """    def __init__(self, max_length, argument='argument'):        self.max_length = max_length        self.argument = argument    def __call__(self, value):        length = len(value)        if length > self.max_length:            error = ('Invalid {arg}: {val}. {arg} cannot exceed length {length}'                     .format(arg=self.argument, val=value, length=self.max_length))            raise ValueError(error)        return valueclass float_range(object):    """ Restrict input to an float in a range (inclusive) """    def __init__(self, low, high, argument='argument'):        self.low = low        self.high = high        self.argument = argument    def __call__(self, value):        value = _get_float(value)        if value < self.low or value > self.high:            error = ('Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}'                     .format(arg=self.argument, val=value, lo=self.low, hi=self.high))            raise ValueError(error)        return valueclass datetime_string(object):    def __init__(self, format, argument='argument'):        self.format = format        self.argument = argument    def __call__(self, value):        try:            datetime.strptime(value, self.format)        except ValueError:            error = ('Invalid {arg}: {val}. {arg} must be conform to the format {format}'                     .format(arg=self.argument, val=value, lo=self.format))            raise ValueError(error)        return valuedef _get_float(value):    try:        return float(value)    except (TypeError, ValueError):        raise ValueError('{0} is not a valid float'.format(value))def timezone(timezone_string):    if timezone_string and timezone_string in available_timezones():        return timezone_string    error = ('{timezone_string} is not a valid timezone.'             .format(timezone_string=timezone_string))    raise ValueError(error)def generate_string(n):    letters_digits = string.ascii_letters + string.digits    result = ""    for i in range(n):        result += random.choice(letters_digits)    return resultdef get_remote_ip(request):    if request.headers.get('CF-Connecting-IP'):        return request.headers.get('Cf-Connecting-Ip')    elif request.headers.getlist("X-Forwarded-For"):        return request.headers.getlist("X-Forwarded-For")[0]    else:        return request.remote_addrdef generate_text_hash(text: str) -> str:    hash_text = str(text) + 'None'    return sha256(hash_text.encode()).hexdigest()
 |