Compare commits

...

3 Commits

Author SHA1 Message Date
DomNomNomVR 171c1fae66 add tests because of feedback 2025-03-09 23:27:45 +13:00
DomNomNomVR 488d1e8bff revert to older effect_of_N style 2025-03-09 20:11:48 +13:00
DomNomNomVR 6d1a6907d2 changes from voice chat 2025-03-09 20:09:42 +13:00
6 changed files with 131 additions and 129 deletions
+2 -1
View File
@@ -1 +1,2 @@
from .gear import * from .gear import Gear
from .effect import Effect
+12 -7
View File
@@ -1,16 +1,15 @@
from typing import Any, Hashable, TypeVar, Callable from typing import Any, Hashable, Optional, TypeVar, Callable
from gears.effect import effect_of from gears.effect import effect_of
from gears.gear import Gear from gears.gear import Gear
S = TypeVar("S", bound=Hashable)
T = TypeVar("T", bound=Hashable)
def connect[S: Hashable, T: Hashable](
def connect(
source: Gear[S], source: Gear[S],
to_target_val: Callable[[S], T], to_target_val: Callable[[S], T],
to_source: Callable[[S, T], S], to_source: Callable[[S, T], S],
# i_know_what_im_doing: Optional[bool] = None,
i_know_what_im_doing: Optional[bool] = True,
) -> Gear[T]: ) -> Gear[T]:
target = Gear(to_target_val(source())) target = Gear(to_target_val(source()))
@@ -23,7 +22,10 @@ def connect(
def set_target_val(source_val: S): def set_target_val(source_val: S):
nonlocal entry_guard nonlocal entry_guard
if entry_guard: if entry_guard:
return if i_know_what_im_doing:
return
else:
raise ValueError("You have a bad bidirectional mapping")
entry_guard = True entry_guard = True
try: try:
target.set(to_target_val(source_val)) target.set(to_target_val(source_val))
@@ -34,7 +36,10 @@ def connect(
def set_source_val(target_val: T): def set_source_val(target_val: T):
nonlocal entry_guard nonlocal entry_guard
if entry_guard: if entry_guard:
return if i_know_what_im_doing:
return
else:
raise ValueError("You have a bad bidirectional mapping")
entry_guard = True entry_guard = True
try: try:
source.set(to_source(source(), target_val)) source.set(to_source(source(), target_val))
+30 -21
View File
@@ -1,31 +1,40 @@
from typing import Any, Callable, cast
from gears import Gear from gears import Gear
from .wrap_aware_tuple import WrapAwareTuple from typing import Any, Generic, Hashable, TypeVar, Callable, TypeVarTuple
class Effect[*Ts]:
def __init__(self, fn: Callable[[*Ts], None], gears: WrapAwareTuple[Gear, *Ts]): class Effect:
for gear in gears: def __init__(self, fn: Callable[..., None], *gears: Gear):
gear.effects.append(self)
self._fn = fn self._fn = fn
self._gears = gears self._gears = gears
for gear in gears:
gear.effects.append(self)
self.on_change() self.on_change(gears[0]())
def on_change(self): def on_change(self, _value: Any):
self._fn(*self._gears.unwrap(lambda x: x())) self._fn(*(gear() for gear in self._gears))
def effect_of[*Ts, *G_Ts](
*gears: Gear[Any]
) -> Callable[[Callable[[*Ts], None]], Effect]:
# No deduction, no tears, only cast now
ts_gears = cast(WrapAwareTuple[Gear, *Ts], WrapAwareTuple.prewrapped(gears))
return effect_of_typechecked(ts_gears)
def effect_of_typechecked[*Ts]( def effect_of[T0: Hashable](g0: Gear[T0]) -> Callable[[Callable[[T0], None]], Effect]:
gears: WrapAwareTuple[Gear, *Ts] def decorator(fn: Callable[[T0], None]) -> Effect:
) -> Callable[[Callable[[*Ts], None]], Effect]: return Effect(fn, g0)
def decorator(fn: Callable[[*Ts], None]) -> Effect:
return Effect(fn, gears) return decorator
def effect_of_2[T0: Hashable, T1: Hashable](
g0: Gear[T0], g1: Gear[T1]
) -> Callable[[Callable[[T0, T1], None]], Effect]:
def decorator(fn: Callable[[T0, T1], None]) -> Effect:
return Effect(fn, g0, g1)
return decorator
def effect_of_3[T0: Hashable, T1: Hashable, T2: Hashable](
g0: Gear[T0], g1: Gear[T1], g2: Gear[T2]
) -> Callable[[Callable[[T0, T1, T2], None]], Effect]:
def decorator(fn: Callable[[T0, T1, T2], None]) -> Effect:
return Effect(fn, g0, g1, g2)
return decorator return decorator
+8 -8
View File
@@ -1,14 +1,11 @@
from typing import Any, Hashable, TypeVar, TYPE_CHECKING from typing import Any, Generic, Hashable, TypeVar, Callable
T = TypeVar("T", bound=Hashable)
if TYPE_CHECKING: class Gear[T: Hashable]:
from gears.effect import Effect
class Gear[T]():
def __init__(self, value: T): def __init__(self, value: T):
self._value = value self._value = value
self.effects: list['Effect'] = [] self.effects = []
self.connection_effects = []
def get(self) -> T: def get(self) -> T:
return self._value return self._value
@@ -17,8 +14,11 @@ class Gear[T]():
return self.get() return self.get()
def set(self, value: T): def set(self, value: T):
print("setty: ", value)
if value == self._value: if value == self._value:
return return
self._value = value self._value = value
for effect in self.connection_effects:
effect.on_change(value)
for effect in self.effects: for effect in self.effects:
effect.on_change() effect.on_change(value)
-90
View File
@@ -1,90 +0,0 @@
from typing import cast, Callable, reveal_type, TYPE_CHECKING
# no-op "Wrap" type
type Identity[X] = X
class WrapAwareTuple[Wrap, *Ts](tuple[Wrap, ...]):
'''
Wrap is a wrapper type around each of the types in Ts, in order.
For example:
a: Wrapped[Wrap, int, str, float] = Wrapped(Wrap[int], Wrap[str], Wrap[float])
This lets us keep track of type transformations, starting from the Identity mapping.
'''
@staticmethod
def id[*NewTs](*args: *NewTs) -> 'WrapAwareTuple[Identity, *NewTs]':
'''
Create a new WrapAwareTuple, using the Identity psuedo-wrapper.
'''
return WrapAwareTuple[Identity, *NewTs](args)
@staticmethod
def prewrapped[NewWrap, *NewTs](args: tuple[NewWrap, ...]) -> 'WrapAwareTuple[NewWrap, *NewTs]':
'''
Create a new WrapAwareTuple, using the given wrapper type.
'''
return WrapAwareTuple[NewWrap, *NewTs](args)
def wrap[NewWrap](self, mapper: Callable[[Wrap], NewWrap]) -> 'WrapAwareTuple[NewWrap, *Ts]':
'''
Wrap all existing elements with a new wrapping function.
This function must unwrap the current wrapping type, if non-Identity, and apply a new one.
To pop the wrapped value from a tuple wrapper:
>>> Wrapped[tuple, int](3).wrap[Identity](lambda x: x[0])
To wrap an unwrapped value into a tuple:
>>> Wrapped[Identity, int](3).wrap(tuple)
'''
# pyright supports Union[*Ts], but we still can't do applicative functor things with it
return WrapAwareTuple[NewWrap, *Ts](mapper(v) for v in self)
def unwrap(self, mapper: Callable[[Wrap], Identity]) -> tuple[*Ts]:
'''
Unwrap all elements in the tuple, returning a normal tuple.
This is the same as calling wrap with the appropriate wrapper, except we simplify the signature and
return type for this case.
'''
return cast(tuple[*Ts], self.wrap(mapper))
def list_fmap[*Ts](t: tuple[*Ts]):
match t:
case (head, *tail):
return ([head], *list_fmap(tuple(tail)))
case _:
return ()
if TYPE_CHECKING:
def _fmap_scratchpad():
reveal_type(list_fmap((1,"s",3.0)))
# Type of "list_fmap((1, "s", 3))" is "tuple[list[*tuple[int, str, float]], *tuple[list[*tuple[Unknown, ...]] | Unknown, ...]] | tuple[()]"
# ...wanted to get "tuple[list[int], list[str], list[float]]"; RIP
def _wrapaware_scratchpad():
atom = WrapAwareTuple.id(3)
reveal_type(atom)
# Type of "atom" is "Wrapped[Unknown, int]"
# (variable) atom: Wrapped[Identity[Unknown], int]
compound_3 = WrapAwareTuple.id(3, "str", 9.0)
reveal_type(compound_3)
# Type of "compound_3" is "Wrapped[Unknown, int, str, float]"
# (variable) compound_3: Wrapped[Identity[Unknown], int, str, float]
tupled_compound_3 = compound_3.wrap(tuple)
reveal_type(tupled_compound_3)
# Type of "tupled_compound_3" is "Wrapped[tuple[Unknown, ...], int, str, float]"
# (variable) tupled_compound_3: Wrapped[tuple[Identity[Unknown], ...], int, str, float]
restored_compound_3 = tupled_compound_3.wrap(lambda x: x[0])
reveal_type(restored_compound_3)
# Type of "restored_compound_3" is "Wrapped[Unknown, int, str, float]"
# (variable) restored_compound_3: Wrapped[Unknown, int, str, float]
+79 -2
View File
@@ -1,9 +1,11 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import NamedTuple from typing import NamedTuple
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest
from gears import Gear from gears import Gear
from gears.connections import connect from gears.connections import connect
from gears.effect import effect_of from gears.effect import Effect, effect_of, effect_of_2
def test_get_set(): def test_get_set():
@@ -28,6 +30,9 @@ def test_basic_effect():
assert last_arg == "E" assert last_arg == "E"
g.set("hello")
assert last_arg == "hello"
def test_effect_of_2(): def test_effect_of_2():
g1 = Gear("E") g1 = Gear("E")
@@ -35,7 +40,7 @@ def test_effect_of_2():
last_arg: tuple[str, int] | None = None last_arg: tuple[str, int] | None = None
@effect_of(g1, g2) @effect_of_2(g1, g2)
def value_changed(v1: str, v2: int): def value_changed(v1: str, v2: int):
nonlocal last_arg nonlocal last_arg
last_arg = (v1, v2) last_arg = (v1, v2)
@@ -54,15 +59,87 @@ def test_connect():
assert my_str() == "321" assert my_str() == "321"
# def test_dualis():
# val_both: Gear[tuple[int, int]] = Gear((5, 6))
# val0 = connect(val_both, lambda both: both[0], lambda both, val0: (val0, both[1]))
# val1 = connect(val_both, lambda both: both[1], lambda both, val1: (both[0], val1))
def test_watts():
volts = Gear(3.0)
amps = Gear(5.0)
@effect_of_2(volts, amps)
def on_watts_changed(volts: float, amps: float):
watts = volts * amps
if watts > 100:
raise ValueError("OMG ITS BURNING")
volts.set(5.0)
with pytest.raises(ValueError):
volts.set(50.0)
def test_led():
battery_voltage = Gear(3.0)
switch: Gear[bool] = Gear(True) # true means circuit closed
# ohms_circuit = Gear(1.0)
ohms_circuit = connect(
switch, lambda x: 1.0 if x else 100000000.0, lambda _, y: y < 100
)
log: list[str] = []
@effect_of_2(battery_voltage, ohms_circuit)
def on_led_inputs_changed(volts: float, ohms: float):
amps = volts / ohms
if amps > 1:
print("LED ACTIVE!")
log.append("LED ACTIVE!")
else:
print("LED INACTIVE!")
log.append("LED INACTIVE!")
# Effect(on_led_inputs_changed, battery_voltage, led_ohms)
assert log == ["LED ACTIVE!"]
switch.set(False)
assert log == ["LED ACTIVE!", "LED INACTIVE!"]
def test_connect_multiplication(): def test_connect_multiplication():
mul1 = Gear(10.0) mul1 = Gear(10.0)
mul2 = connect(mul1, lambda x: x * 2.0, lambda _, y: y / 2.0) mul2 = connect(mul1, lambda x: x * 2.0, lambda _, y: y / 2.0)
mul4 = connect(mul2, lambda x: x * 2.0, lambda _, y: y / 2.0)
mul3 = connect(mul1, lambda x: x * 3.0, lambda _, y: y / 3.0) mul3 = connect(mul1, lambda x: x * 3.0, lambda _, y: y / 3.0)
assert mul2() == 20 assert mul2() == 20
assert mul3() == 30 assert mul3() == 30
mul2.set(30) mul2.set(30)
assert mul1() == 15 assert mul1() == 15
assert mul3() == 45 assert mul3() == 45
assert mul4() == 60
# mul1
# |> mul2
# |> mul4
# |> mul3
#
class Person:
name: str
class HouseHold:
people: list[Person]
class NameEntry:
def __init__(self, name: Gear[str]):
# self.name = name
textBox = TextBox(...)
effect_of(name)(textBox.setValue)
textBox.onChange = name.set
def test_connect_property(): def test_connect_property():