verify getbot2

This commit is contained in:
DomNomNom 2025-02-15 09:37:43 +13:00
parent 5f7abca758
commit bea22dfa70
2 changed files with 90 additions and 34 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
credentials.json
credentials2.json
credentials.py
store

View File

@ -81,6 +81,8 @@ from nio import (
LocalProtocolError,
LoginResponse,
ToDeviceError,
ToDeviceMessage,
UnknownToDeviceEvent,
)
# file to store credentials in case you want to run program multiple times
@ -88,6 +90,7 @@ CONFIG_FILE = "credentials.json" # login credentials JSON file
# directory to store persistent data for end-to-end encryption
STORE_PATH = "./store/" # local directory
class Callbacks:
"""Class to pass client to callback methods."""
@ -100,7 +103,38 @@ class Callbacks:
try:
client = self.client
if isinstance(event, KeyVerificationStart): # first step
if event.source['type'] == 'm.key.verification.request':
"""First step in new flow: receive a request proposing
a set of verification methods, and in this case respond
saying we only support SAS verification.
"""
print(
"Got verification request. "
"Waiting for other device to accept SAS method..."
)
if 'm.sas.v1' not in event.source['content']['methods']:
print(
"Other device does not support SAS authentication. "
f"Methods: {event.source['content']['methods']}."
)
return
assert client.device_id is not None
assert client.user_id is not None
txid = event.source['content']['transaction_id']
ready_event = ToDeviceMessage(
type = 'm.key.verification.ready',
recipient = event.sender,
recipient_device = event.source['content']['from_device'],
content = {
'from_device': client.device_id,
'methods': ['m.sas.v1'],
'transaction_id': txid,
},
)
resp = await client.to_device(ready_event, txid)
if isinstance(resp, ToDeviceError):
print(f"to_device failed with {resp}")
elif isinstance(event, KeyVerificationStart): # first step
"""first step: receive KeyVerificationStart
KeyVerificationStart(
source={'content':
@ -195,6 +229,23 @@ class Callbacks:
resp = await client.confirm_short_auth_string(event.transaction_id)
if isinstance(resp, ToDeviceError):
print(f"confirm_short_auth_string failed with {resp}")
# Extra step in new flow: once we have completed the SAS
# verification successfully, send a 'done' to-device event
# to the other device to assert that the verification was
# successful.
done_message = ToDeviceMessage(
type = 'm.key.verification.done',
recipient = event.sender,
recipient_device = sas.other_olm_device.device_id,
content = {
'transaction_id': sas.transaction_id,
},
)
resp = await client.to_device(done_message, sas.transaction_id)
if isinstance(resp, ToDeviceError):
client.log.error(f"'done' failed with {resp}")
elif yn.lower() == "n": # no, don't match, reject
print(
"No match! Device will NOT be verified "
@ -243,6 +294,11 @@ class Callbacks:
resp = await client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
print(f"to_device failed with {resp}")
elif event.source['type'] == 'm.key.verification.done':
# Final step, other device acknowledges verification success.
txid = event.source['content']['transaction_id']
sas = client.key_verifications[txid]
print(
f"sas.we_started_it = {sas.we_started_it}\n"
f"sas.sas_accepted = {sas.sas_accepted}\n"
@ -265,6 +321,7 @@ class Callbacks:
except BaseException:
print(traceback.format_exc())
def write_details_to_disk(resp: LoginResponse, homeserver) -> None:
"""Write the required login details to disk.
@ -289,6 +346,7 @@ def write_details_to_disk(resp: LoginResponse, homeserver) -> None:
f,
)
async def login() -> AsyncClient:
"""Handle login with or without stored credentials."""
# Configuration options for the AsyncClient
@ -301,26 +359,21 @@ async def login() -> AsyncClient:
# If there are no previously-saved credentials, we'll use the password
if not os.path.exists(CONFIG_FILE):
homeserver = "https://matrix.domnomnom.com"
user_id = "@getbot2:matrix.domnomnom.com"
room_id = "!sZpfYzLsRbnIOKJlPH:matrix.domnomnom.com"
device_name = "whitebox-nio"
print(
"First time use. Did not find credential file. Asking for "
"homeserver, user, and password to create credential file."
)
homeserver = "https://matrix.example.org"
homeserver = input(f"Enter your homeserver URL: [{homeserver}] ")
# print(
# "First time use. Did not find credential file. Asking for "
# "homeserver, user, and password to create credential file."
# )
# homeserver = "https://matrix.example.org"
# homeserver = input(f"Enter your homeserver URL: [{homeserver}] ")
if not (homeserver.startswith("https://") or homeserver.startswith("http://")):
homeserver = "https://" + homeserver
# if not (homeserver.startswith("https://") or homeserver.startswith("http://")):
# homeserver = "https://" + homeserver
user_id = "@user:example.org"
user_id = input(f"Enter your full user ID: [{user_id}] ")
# user_id = "@user:example.org"
# user_id = input(f"Enter your full user ID: [{user_id}] ")
# device_name = "matrix-nio"
# device_name = input(f"Choose a name for this device: [{device_name}] ")
device_name = "matrix-nio"
device_name = input(f"Choose a name for this device: [{device_name}] ")
if not os.path.exists(STORE_PATH):
os.makedirs(STORE_PATH)
@ -352,7 +405,7 @@ async def login() -> AsyncClient:
# Otherwise the config file exists, so we'll use the stored credentials
else:
# open the file in read-only mode
async with aiofiles.open(CONFIG_FILE) as f:
async with aiofiles.open(CONFIG_FILE, "r") as f:
contents = await f.read()
config = json.loads(contents)
# Initialize the matrix client based on credentials from file
@ -373,12 +426,13 @@ async def login() -> AsyncClient:
return client
async def main() -> None:
"""Login and wait for and perform emoji verify."""
client = await login()
# Set up event callbacks
callbacks = Callbacks(client)
client.add_to_device_callback(callbacks.to_device_callback, (KeyVerificationEvent,))
client.add_to_device_callback(callbacks.to_device_callback, (KeyVerificationEvent, UnknownToDeviceEvent))
# Sync encryption keys with the server
# Required for participating in encrypted rooms
if client.should_upload_keys:
@ -390,6 +444,7 @@ async def main() -> None:
)
await client.sync_forever(timeout=30000, full_state=True)
try:
asyncio.run(main())
except Exception: