Skip to content

Commit

Permalink
Add some semblance of type safety to built-in classes
Browse files Browse the repository at this point in the history
  • Loading branch information
angelcaru committed May 3, 2024
1 parent 983f0af commit d025f24
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 117 deletions.
50 changes: 32 additions & 18 deletions core/builtin_classes/base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from core.errors import *
from core.datatypes import *
from core.parser import RTResult
from core.builtin_funcs import BuiltInFunction
from core.builtin_funcs import BuiltInFunction, args

from typing import Callable, Any

Expand All @@ -16,7 +16,7 @@ def __init__(self, name: str, instance_class: BuiltInObjectMeta) -> None:
self.instance_class = instance_class

def create(self, args: list[Value]) -> RTResult[BaseInstance]:
inst = BuiltInInstance(self)
inst = BuiltInInstance(self, self.instance_class(self))
return RTResult[BaseInstance]().success(inst.set_context(self.context).set_pos(self.pos_start, self.pos_end))

def init(self, inst: BaseInstance, args: list[Value], kwargs: dict[str, Value]) -> RTResult[None]:
Expand All @@ -43,20 +43,31 @@ def __repr__(self) -> str:


class BuiltInInstance(BaseInstance):
instance_class: BuiltInObjectMeta
obj: BuiltInObject

def __init__(self, parent_class: BuiltInClass) -> None:
def __init__(self, parent_class: BuiltInClass, obj: BuiltInObject) -> None:
super().__init__(parent_class, parent_class.instance_class.__symbol_table__)
self.instance_class = parent_class.instance_class
self.obj = obj
self.symbol_table.set("this", self)

def bind_method(self, method: BaseFunction) -> RTResult[BaseFunction]:
assert isinstance(method, BuiltInFunction)
assert method.func is not None

@args(method.func.arg_names, method.func.defaults)
def new_func(ctx: Context) -> RTResult[Value]:
assert method.func is not None
return method.func(self.obj, ctx)

return RTResult[BaseFunction]().success(BuiltInFunction(method.name, new_func))

def operator(self, operator: str, *args: Value) -> ResultTuple:
try:
op = self.instance_class.__operators__[operator]
op = type(self.obj).__operators__[operator]
except KeyError:
return None, self.illegal_operation(*args)
res = RTResult[Value]()
value = res.register(op(self, list(args)))
value = res.register(op(self.obj, list(args)))
if res.should_return():
assert res.error is not None
return None, res.error
Expand All @@ -66,7 +77,7 @@ def operator(self, operator: str, *args: Value) -> ResultTuple:

class BuiltInObjectMeta(type):
__symbol_table__: SymbolTable
__operators__: dict[str, Callable[[BuiltInInstance, list[Value]], RTResult[Value]]]
__operators__: dict[str, Callable[[BuiltInObject, list[Value]], RTResult[Value]]]

def __new__(cls, class_name: str, bases: tuple[type, ...], attrs: dict[str, Any]) -> BuiltInObjectMeta:
if class_name == "BuiltInObject":
Expand All @@ -90,7 +101,10 @@ def __new__(cls, class_name: str, bases: tuple[type, ...], attrs: dict[str, Any]


class BuiltInObject(metaclass=BuiltInObjectMeta):
pass
parent_class: BuiltInClass

def __init__(self, parent_class: BuiltInClass) -> None:
self.parent_class = parent_class


# Decorators for methods and operators
Expand Down Expand Up @@ -128,20 +142,20 @@ def wrapper(self, args):
if len(args) > len(types):
return res.failure(
RTError(
self.pos_start,
self.pos_end,
self.parent_class.pos_start,
self.parent_class.pos_end,
f"{len(args) - len(types)} too many args passed into {full_func_name}",
self.context,
self.parent_class.context,
)
)

if len(args) < len(types) - len(list(filter(lambda default: default is not None, defaults))):
return res.failure(
RTError(
self.pos_start,
self.pos_end,
self.parent_class.pos_start,
self.parent_class.pos_end,
f"{(len(types) - len(list(filter(lambda default: default is not None, defaults)))) - len(args)} too few args passed into {full_func_name}",
self.context,
self.parent_class.context,
)
)

Expand All @@ -153,10 +167,10 @@ def wrapper(self, args):
if not isinstance(arg, typ):
return res.failure(
RTError(
self.pos_start,
self.pos_end,
self.parent_class.pos_start,
self.parent_class.pos_end,
f"Expected {typ.__name__} for argument {i} (0-based) of {full_func_name}, got {arg.__class__.__name__} instead",
self.context,
self.parent_class.context,
)
)
real_args.append(arg)
Expand Down
42 changes: 22 additions & 20 deletions core/builtin_classes/file_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
from core.builtin_funcs import args
from core.builtin_classes.base_classes import BuiltInObject, operator, check, method

from typing import IO


class FileObject(BuiltInObject):
file: IO[str]

@operator("__constructor__")
@check([String, String], [None, String("r")])
def constructor(self, path, mode):
def constructor(self, path: String, mode: String) -> RTResult[Value]:
allowed_modes = [None, "r", "w", "a", "r+", "w+", "a+"] # Allowed modes for opening files
res = RTResult()
res = RTResult[Value]()
if mode.value not in allowed_modes:
return res.failure(RTError(mode.pos_start, mode.pos_end, f"Invalid mode '{mode.value}'", mode.context))
try:
Expand All @@ -23,18 +27,18 @@ def constructor(self, path, mode):

@args(["count"], [Number(-1)])
@method
def read(ctx):
res = RTResult()
self = ctx.symbol_table.get("this")
def read(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
count = ctx.symbol_table.get("count")
assert count is not None
if not isinstance(count, Number):
return res.failure(RTError(count.pos_start, count.pos_end, "Count must be a number", count.context))

try:
if count.value == -1:
value = self.file.read()
else:
value = self.file.read(count.value)
value = self.file.read(int(count.value))
return res.success(String(value))
except OSError as e:
return res.failure(
Expand All @@ -44,7 +48,7 @@ def read(ctx):
@args([])
@method
def readline(ctx):
res = RTResult()
res = RTResult[Value]()
self = ctx.symbol_table.get("this")
try:
value = self.file.readline()
Expand All @@ -54,21 +58,21 @@ def readline(ctx):

@args([])
@method
def readlines(ctx):
res = RTResult()
self = ctx.symbol_table.get("this")
def readlines(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
try:
value = self.file.readlines()
return res.success(Array([String(line) for line in value]))
except OSError as e:
return res.failure(RTError(None, None, f"Could not read from file: {e.strerror}", None))
pos = Position(-1, -1, -1, "<idk>", "<idk>")
return res.failure(RTError(pos, pos, f"Could not read from file: {e.strerror}", ctx))

@args(["data"])
@method
def write(ctx):
res = RTResult()
self = ctx.symbol_table.get("this")
def write(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
data = ctx.symbol_table.get("data")
assert data is not None
if not isinstance(data, String):
return res.failure(RTError(data.pos_start, data.pos_end, "Data must be a string", data.context))

Expand All @@ -82,15 +86,13 @@ def write(ctx):

@args([])
@method
def close(ctx):
res = RTResult()
self = ctx.symbol_table.get("this")
def close(self, _ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
self.file.close()
return res.success(Null.null())

@args([])
@method
def is_closed(ctx):
res = RTResult()
self = ctx.symbol_table.get("this")
def is_closed(self, _ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
return res.success(Boolean(self.file.closed))
16 changes: 10 additions & 6 deletions core/builtin_classes/json_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
class JSONObject(BuiltInObject):
@operator("__constructor__")
@check([], [])
def constructor(self):
return RTResult().success(Null.null())
def constructor(self) -> RTResult[Value]:
return RTResult[Value]().success(Null.null())

@args(["radon_object"])
@method
def dumps(ctx):
res = RTResult()
def dumps(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
radon_object = ctx.symbol_table.get("radon_object")
assert radon_object is not None
try:
return res.success(String(json.dumps(deradonify(radon_object))))
except Exception as e:
Expand All @@ -28,9 +29,12 @@ def dumps(ctx):

@args(["radon_string"])
@method
def loads(ctx):
res = RTResult()
def loads(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
radon_string = ctx.symbol_table.get("radon_string")
assert radon_string is not None
if not isinstance(radon_string, String):
return res.failure(RTError(radon_string.pos_start, radon_string.pos_end, "Cannot loads a non-string", ctx))
try:
return res.success(
radonify(
Expand Down
74 changes: 57 additions & 17 deletions core/builtin_classes/requests_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@
class RequestsObject(BuiltInObject):
@operator("__constructor__")
@check([], [])
def constructor(self):
return RTResult().success(Null.null())
def constructor(self) -> RTResult[Value]:
return RTResult[Value]().success(Null.null())

@args(["url", "headers"], [None, HashMap({})])
@method
def get(ctx):
res = RTResult()
def get(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
if not isinstance(url, String):
return res.failure(RTError(url.pos_start, url.pos_end, "Expected String", ctx))
headers = ctx.symbol_table.get("headers")
assert headers is not None
if not isinstance(headers, HashMap):
return res.failure(RTError(headers.pos_start, headers.pos_end, "Expected HashMap", ctx))
try:
req = urllib.request.Request(url.value, headers=deradonify(headers))
req = urllib.request.Request(url.value, headers=deradonify(headers)) # type: ignore
with urllib.request.urlopen(req) as response:
response_data = response.read().decode("utf-8")
return res.success(radonify(response_data, url.pos_start, url.pos_end, url.context))
Expand All @@ -32,14 +38,25 @@ def get(ctx):

@args(["url", "data", "headers"], [None, HashMap({}), HashMap({})])
@method
def post(ctx):
res = RTResult()
def post(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
if not isinstance(url, String):
return res.failure(RTError(url.pos_start, url.pos_end, "Expected String", ctx))
data = ctx.symbol_table.get("data")
assert data is not None
headers = ctx.symbol_table.get("headers")
assert headers is not None
if not isinstance(data, HashMap):
return res.failure(RTError(data.pos_start, data.pos_end, "Expected HashMap", ctx))
if not isinstance(headers, HashMap):
return res.failure(RTError(headers.pos_start, headers.pos_end, "Expected HashMap", ctx))
try:
req = urllib.request.Request(
url.value, data=json.dumps(deradonify(data)).encode("utf-8"), headers=deradonify(headers)
url.value,
data=json.dumps(deradonify(data)).encode("utf-8"),
headers=deradonify(headers), # type: ignore
)
with urllib.request.urlopen(req) as response:
response_data = response.read().decode("utf-8")
Expand All @@ -49,14 +66,24 @@ def post(ctx):

@args(["url", "data", "headers"], [None, HashMap({}), HashMap({})])
@method
def put(ctx):
res = RTResult()
def put(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
if not isinstance(url, String):
return res.failure(RTError(url.pos_start, url.pos_end, "Expected String", ctx))
data = ctx.symbol_table.get("data")
assert data is not None
headers = ctx.symbol_table.get("headers")
assert headers is not None
if not isinstance(headers, HashMap):
return res.failure(RTError(headers.pos_start, headers.pos_end, "Expected HashMap", ctx))
try:
req = urllib.request.Request(
url.value, data=json.dumps(deradonify(data)).encode("utf-8"), headers=deradonify(headers), method="PUT"
url.value,
data=json.dumps(deradonify(data)).encode("utf-8"),
headers=deradonify(headers), # type: ignore
method="PUT",
)
with urllib.request.urlopen(req) as response:
response_data = response.read().decode("utf-8")
Expand All @@ -66,12 +93,18 @@ def put(ctx):

@args(["url", "headers"], [None, HashMap({})])
@method
def delete(ctx):
res = RTResult()
def delete(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
if not isinstance(url, String):
return res.failure(RTError(url.pos_start, url.pos_end, "Expected String", ctx))
headers = ctx.symbol_table.get("headers")
assert headers is not None
if not isinstance(headers, HashMap):
return res.failure(RTError(headers.pos_start, headers.pos_end, "Expected HashMap", ctx))
try:
req = urllib.request.Request(url.value, headers=deradonify(headers), method="DELETE")
req = urllib.request.Request(url.value, headers=deradonify(headers), method="DELETE") # type: ignore
with urllib.request.urlopen(req) as response:
response_data = response.read().decode("utf-8")
return res.success(radonify(response_data, url.pos_start, url.pos_end, url.context))
Expand All @@ -80,16 +113,23 @@ def delete(ctx):

@args(["url", "data", "headers"], [None, HashMap({}), HashMap({})])
@method
def patch(ctx):
res = RTResult()
def patch(self, ctx: Context) -> RTResult[Value]:
res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
if not isinstance(url, String):
return res.failure(RTError(url.pos_start, url.pos_end, "Expected String", ctx))
data = ctx.symbol_table.get("data")
assert data is not None
headers = ctx.symbol_table.get("headers")
assert headers is not None
if not isinstance(headers, HashMap):
return res.failure(RTError(headers.pos_start, headers.pos_end, "Expected HashMap", ctx))
try:
req = urllib.request.Request(
url.value,
data=json.dumps(deradonify(data)).encode("utf-8"),
headers=deradonify(headers),
headers=deradonify(headers), # type: ignore
method="PATCH",
)
with urllib.request.urlopen(req) as response:
Expand Down
Loading

0 comments on commit d025f24

Please sign in to comment.