Compare commits

..

3 Commits

Author SHA1 Message Date
DomNomNomVR 171c1fae66 add tests because of feedback 2025-03-09 23:27:45 +13:00
DomNomNomVR 488d1e8bff revert to older effect_of_N style 2025-03-09 20:11:48 +13:00
DomNomNomVR 6d1a6907d2 changes from voice chat 2025-03-09 20:09:42 +13:00
6 changed files with 131 additions and 129 deletions
+2 -1
View File
@@ -1 +1,2 @@
from .gear import *
from .gear import Gear
from .effect import Effect
+12 -7
View File
@@ -1,16 +1,15 @@
from typing import Any, Hashable, TypeVar, Callable
from typing import Any, Hashable, Optional, TypeVar, Callable
from gears.effect import effect_of
from gears.gear import Gear
S = TypeVar("S", bound=Hashable)
T = TypeVar("T", bound=Hashable)
def connect(
def connect[S: Hashable, T: Hashable](
source: Gear[S],
to_target_val: Callable[[S], T],
to_source: Callable[[S, T], S],
# i_know_what_im_doing: Optional[bool] = None,
i_know_what_im_doing: Optional[bool] = True,
) -> Gear[T]:
target = Gear(to_target_val(source()))
@@ -23,7 +22,10 @@ def connect(
def set_target_val(source_val: S):
nonlocal entry_guard
if entry_guard:
return
if i_know_what_im_doing:
return
else:
raise ValueError("You have a bad bidirectional mapping")
entry_guard = True
try:
target.set(to_target_val(source_val))
@@ -34,7 +36,10 @@ def connect(
def set_source_val(target_val: T):
nonlocal entry_guard
if entry_guard:
return
if i_know_what_im_doing:
return
else:
raise ValueError("You have a bad bidirectional mapping")
entry_guard = True
try:
source.set(to_source(source(), target_val))
+30 -21
View File
@@ -1,31 +1,40 @@
from typing import Any, Callable, cast
from gears import Gear
from .wrap_aware_tuple import WrapAwareTuple
from typing import Any, Generic, Hashable, TypeVar, Callable, TypeVarTuple
class Effect[*Ts]:
def __init__(self, fn: Callable[[*Ts], None], gears: WrapAwareTuple[Gear, *Ts]):
for gear in gears:
gear.effects.append(self)
class Effect:
def __init__(self, fn: Callable[..., None], *gears: Gear):
self._fn = fn
self._gears = gears
for gear in gears:
gear.effects.append(self)
self.on_change()
self.on_change(gears[0]())
def on_change(self):
self._fn(*self._gears.unwrap(lambda x: x()))
def on_change(self, _value: Any):
self._fn(*(gear() for gear in self._gears))
def effect_of[*Ts, *G_Ts](
*gears: Gear[Any]
) -> Callable[[Callable[[*Ts], None]], Effect]:
# No deduction, no tears, only cast now
ts_gears = cast(WrapAwareTuple[Gear, *Ts], WrapAwareTuple.prewrapped(gears))
return effect_of_typechecked(ts_gears)
def effect_of_typechecked[*Ts](
gears: WrapAwareTuple[Gear, *Ts]
) -> Callable[[Callable[[*Ts], None]], Effect]:
def decorator(fn: Callable[[*Ts], None]) -> Effect:
return Effect(fn, gears)
def effect_of[T0: Hashable](g0: Gear[T0]) -> Callable[[Callable[[T0], None]], Effect]:
def decorator(fn: Callable[[T0], None]) -> Effect:
return Effect(fn, g0)
return decorator
def effect_of_2[T0: Hashable, T1: Hashable](
g0: Gear[T0], g1: Gear[T1]
) -> Callable[[Callable[[T0, T1], None]], Effect]:
def decorator(fn: Callable[[T0, T1], None]) -> Effect:
return Effect(fn, g0, g1)
return decorator
def effect_of_3[T0: Hashable, T1: Hashable, T2: Hashable](
g0: Gear[T0], g1: Gear[T1], g2: Gear[T2]
) -> Callable[[Callable[[T0, T1, T2], None]], Effect]:
def decorator(fn: Callable[[T0, T1, T2], None]) -> Effect:
return Effect(fn, g0, g1, g2)
return decorator
+8 -8
View File
@@ -1,14 +1,11 @@
from typing import Any, Hashable, TypeVar, TYPE_CHECKING
from typing import Any, Generic, Hashable, TypeVar, Callable
T = TypeVar("T", bound=Hashable)
if TYPE_CHECKING:
from gears.effect import Effect
class Gear[T]():
class Gear[T: Hashable]:
def __init__(self, value: T):
self._value = value
self.effects: list['Effect'] = []
self.effects = []
self.connection_effects = []
def get(self) -> T:
return self._value
@@ -17,8 +14,11 @@ class Gear[T]():
return self.get()
def set(self, value: T):
print("setty: ", value)
if value == self._value:
return
self._value = value
for effect in self.connection_effects:
effect.on_change(value)
for effect in self.effects:
effect.on_change()
effect.on_change(value)
-90
View File
@@ -1,90 +0,0 @@
from typing import cast, Callable, reveal_type, TYPE_CHECKING
# no-op "Wrap" type
type Identity[X] = X
class WrapAwareTuple[Wrap, *Ts](tuple[Wrap, ...]):
'''
Wrap is a wrapper type around each of the types in Ts, in order.
For example:
a: Wrapped[Wrap, int, str, float] = Wrapped(Wrap[int], Wrap[str], Wrap[float])
This lets us keep track of type transformations, starting from the Identity mapping.
'''
@staticmethod
def id[*NewTs](*args: *NewTs) -> 'WrapAwareTuple[Identity, *NewTs]':
'''
Create a new WrapAwareTuple, using the Identity psuedo-wrapper.
'''
return WrapAwareTuple[Identity, *NewTs](args)
@staticmethod
def prewrapped[NewWrap, *NewTs](args: tuple[NewWrap, ...]) -> 'WrapAwareTuple[NewWrap, *NewTs]':
'''
Create a new WrapAwareTuple, using the given wrapper type.
'''
return WrapAwareTuple[NewWrap, *NewTs](args)
def wrap[NewWrap](self, mapper: Callable[[Wrap], NewWrap]) -> 'WrapAwareTuple[NewWrap, *Ts]':
'''
Wrap all existing elements with a new wrapping function.
This function must unwrap the current wrapping type, if non-Identity, and apply a new one.
To pop the wrapped value from a tuple wrapper:
>>> Wrapped[tuple, int](3).wrap[Identity](lambda x: x[0])
To wrap an unwrapped value into a tuple:
>>> Wrapped[Identity, int](3).wrap(tuple)
'''
# pyright supports Union[*Ts], but we still can't do applicative functor things with it
return WrapAwareTuple[NewWrap, *Ts](mapper(v) for v in self)
def unwrap(self, mapper: Callable[[Wrap], Identity]) -> tuple[*Ts]:
'''
Unwrap all elements in the tuple, returning a normal tuple.
This is the same as calling wrap with the appropriate wrapper, except we simplify the signature and
return type for this case.
'''
return cast(tuple[*Ts], self.wrap(mapper))
def list_fmap[*Ts](t: tuple[*Ts]):
match t:
case (head, *tail):
return ([head], *list_fmap(tuple(tail)))
case _:
return ()
if TYPE_CHECKING:
def _fmap_scratchpad():
reveal_type(list_fmap((1,"s",3.0)))
# Type of "list_fmap((1, "s", 3))" is "tuple[list[*tuple[int, str, float]], *tuple[list[*tuple[Unknown, ...]] | Unknown, ...]] | tuple[()]"
# ...wanted to get "tuple[list[int], list[str], list[float]]"; RIP
def _wrapaware_scratchpad():
atom = WrapAwareTuple.id(3)
reveal_type(atom)
# Type of "atom" is "Wrapped[Unknown, int]"
# (variable) atom: Wrapped[Identity[Unknown], int]
compound_3 = WrapAwareTuple.id(3, "str", 9.0)
reveal_type(compound_3)
# Type of "compound_3" is "Wrapped[Unknown, int, str, float]"
# (variable) compound_3: Wrapped[Identity[Unknown], int, str, float]
tupled_compound_3 = compound_3.wrap(tuple)
reveal_type(tupled_compound_3)
# Type of "tupled_compound_3" is "Wrapped[tuple[Unknown, ...], int, str, float]"
# (variable) tupled_compound_3: Wrapped[tuple[Identity[Unknown], ...], int, str, float]
restored_compound_3 = tupled_compound_3.wrap(lambda x: x[0])
reveal_type(restored_compound_3)
# Type of "restored_compound_3" is "Wrapped[Unknown, int, str, float]"
# (variable) restored_compound_3: Wrapped[Unknown, int, str, float]
+79 -2
View File
@@ -1,9 +1,11 @@
from dataclasses import dataclass
from typing import NamedTuple
from unittest.mock import MagicMock
import pytest
from gears import Gear
from gears.connections import connect
from gears.effect import effect_of
from gears.effect import Effect, effect_of, effect_of_2
def test_get_set():
@@ -28,6 +30,9 @@ def test_basic_effect():
assert last_arg == "E"
g.set("hello")
assert last_arg == "hello"
def test_effect_of_2():
g1 = Gear("E")
@@ -35,7 +40,7 @@ def test_effect_of_2():
last_arg: tuple[str, int] | None = None
@effect_of(g1, g2)
@effect_of_2(g1, g2)
def value_changed(v1: str, v2: int):
nonlocal last_arg
last_arg = (v1, v2)
@@ -54,15 +59,87 @@ def test_connect():
assert my_str() == "321"
# def test_dualis():
# val_both: Gear[tuple[int, int]] = Gear((5, 6))
# val0 = connect(val_both, lambda both: both[0], lambda both, val0: (val0, both[1]))
# val1 = connect(val_both, lambda both: both[1], lambda both, val1: (both[0], val1))
def test_watts():
volts = Gear(3.0)
amps = Gear(5.0)
@effect_of_2(volts, amps)
def on_watts_changed(volts: float, amps: float):
watts = volts * amps
if watts > 100:
raise ValueError("OMG ITS BURNING")
volts.set(5.0)
with pytest.raises(ValueError):
volts.set(50.0)
def test_led():
battery_voltage = Gear(3.0)
switch: Gear[bool] = Gear(True) # true means circuit closed
# ohms_circuit = Gear(1.0)
ohms_circuit = connect(
switch, lambda x: 1.0 if x else 100000000.0, lambda _, y: y < 100
)
log: list[str] = []
@effect_of_2(battery_voltage, ohms_circuit)
def on_led_inputs_changed(volts: float, ohms: float):
amps = volts / ohms
if amps > 1:
print("LED ACTIVE!")
log.append("LED ACTIVE!")
else:
print("LED INACTIVE!")
log.append("LED INACTIVE!")
# Effect(on_led_inputs_changed, battery_voltage, led_ohms)
assert log == ["LED ACTIVE!"]
switch.set(False)
assert log == ["LED ACTIVE!", "LED INACTIVE!"]
def test_connect_multiplication():
mul1 = Gear(10.0)
mul2 = connect(mul1, lambda x: x * 2.0, lambda _, y: y / 2.0)
mul4 = connect(mul2, lambda x: x * 2.0, lambda _, y: y / 2.0)
mul3 = connect(mul1, lambda x: x * 3.0, lambda _, y: y / 3.0)
assert mul2() == 20
assert mul3() == 30
mul2.set(30)
assert mul1() == 15
assert mul3() == 45
assert mul4() == 60
# mul1
# |> mul2
# |> mul4
# |> mul3
#
class Person:
name: str
class HouseHold:
people: list[Person]
class NameEntry:
def __init__(self, name: Gear[str]):
# self.name = name
textBox = TextBox(...)
effect_of(name)(textBox.setValue)
textBox.onChange = name.set
def test_connect_property():