diff --git a/__pycache__/farmbot_api.cpython-312.pyc b/__pycache__/farmbot_api.cpython-312.pyc new file mode 100644 index 0000000..cee5c8a Binary files /dev/null and b/__pycache__/farmbot_api.cpython-312.pyc differ diff --git a/__pycache__/farmbot_broker.cpython-312.pyc b/__pycache__/farmbot_broker.cpython-312.pyc new file mode 100644 index 0000000..86aa60d Binary files /dev/null and b/__pycache__/farmbot_broker.cpython-312.pyc differ diff --git a/__pycache__/farmbot_util_PORT.cpython-312.pyc b/__pycache__/farmbot_util_PORT.cpython-312.pyc new file mode 100644 index 0000000..23a4196 Binary files /dev/null and b/__pycache__/farmbot_util_PORT.cpython-312.pyc differ diff --git a/__pycache__/fbapi.cpython-312.pyc b/__pycache__/fbapi.cpython-312.pyc new file mode 100644 index 0000000..6f31910 Binary files /dev/null and b/__pycache__/fbapi.cpython-312.pyc differ diff --git a/__pycache__/fbbro.cpython-312.pyc b/__pycache__/fbbro.cpython-312.pyc new file mode 100644 index 0000000..ad281af Binary files /dev/null and b/__pycache__/fbbro.cpython-312.pyc differ diff --git a/farmbot_broker.py b/farmbot_broker.py deleted file mode 100644 index 69b8537..0000000 --- a/farmbot_broker.py +++ /dev/null @@ -1,52 +0,0 @@ -# farmbot_BROKER.py - -import sys -import json - -import paho.mqtt.client as mqtt - -from farmbot_api import FarmbotAPI - -class FarmbotBroker(): - def __init__(self): - self.api = FarmbotAPI() - - self.token = None - self.client = None - - # BROKER - # ├── connect() - # ├── disconnect() - # │ - # └── publish() - - def connect(self): - self.api.check_token() - - self.client = mqtt.Client() - self.client.username_pw_set( - username=self.token['token']['unencoded']['bot'], - password=self.token['token']['encoded'] - ) - - self.client.connect( - self.token['token']['unencoded']['mqtt'], - port=1883, - keepalive=60 - ) - - self.client.loop_start() - - def disconnect(self): - if self.client is not None: - self.client.loop_stop() - self.client.disconnect() - - def publish(self, message): - if self.client is None: - self.connect() - - self.client.publish( - f'bot/{self.token["token"]["unencoded"]["bot"]}/from_clients', - payload=json.dumps(message) - ) diff --git a/farmbot_util_PORT.py b/farmbot_util_PORT.py index 378934b..e99a076 100644 --- a/farmbot_util_PORT.py +++ b/farmbot_util_PORT.py @@ -1,9 +1,8 @@ -# farmbot_utilities.py +import sys +import os -import json - -from farmbot_broker import FarmbotBroker -from farmbot_api import FarmbotAPI +from fbbro import FarmbotBroker +from fbapi import FarmbotAPI RPC_REQUEST = { "kind": "rpc_request", @@ -17,8 +16,8 @@ def __init__(self): self.broker = FarmbotBroker() self.api = FarmbotAPI() - self.token = None - self.error = None + self.token = self.api.token + self.error = self.api.error self.echo = True # Choose whether functions print return statement self.verbose = True # Choose how much detail in return statement @@ -87,6 +86,36 @@ def read_status(self): self.broker.publish(status_message) # return ... + def read_sensor(self, id, mode, label='---'): + read_sensor_message = { + **RPC_REQUEST, + "body": [{ + "kind": "read_pin", + "args": { + "pin_mode": mode, + "label": label, + "pin_number": { + "kind": "named_pin", + "args": { + "pin_type": "Peripheral", + "pin_id": id + } + } + } + }] + } + + self.broker.publish(read_sensor_message) + # return ... + + def env(self, id=None, field=None, new_val=None): + if id is None: + data = self.api.get('farmware_envs', id=None) + print(data) + else: + data = self.api.get('farmware_envs', id) + print(data) + # MESSAGES # ├── [✅] log() # ├── [✅] message() @@ -240,6 +269,85 @@ def control_servo(self, pin, angle): self.broker.publish(control_servo_message) # return ... + def control_servo(self, pin, angle): + if angle < 0 or angle > 180: + return print("ERROR: Servo angle constrained to 0-180 degrees.") + else: + control_servo_message = { + **RPC_REQUEST, + "body": { + "kind": "set_servo_angle", + "args": { + "pin_number": pin, + "pin_value": angle # From 0 to 180 + } + } + } + + self.broker.publish(control_servo_message) + # return ... + + def control_peripheral(self, id, value, mode=None): + if mode is None: + peripheral_str = self.get_info('peripherals', id) + mode = peripheral_str['mode'] + + control_peripheral_message = { + **RPC_REQUEST, + "body": { + "kind": "write_pin", + "args": { + "pin_value": value, # Controls ON/OFF or slider value from 0-255 + "pin_mode": mode, # Controls digital (0) or analog (1) mode + "pin_number": { + "kind": "named_pin", + "args": { + "pin_type": "Peripheral", + "pin_id": id + } + } + } + } + } + + self.broker.publish(control_peripheral_message) + # return ... + + def toggle_peripheral(self, id): + toggle_peripheral_message = { + **RPC_REQUEST, + "body": [{ + "kind": "toggle_pin", + "args": { + "pin_number": { + "kind": "named_pin", + "args": { + "pin_type": "Peripheral", + "pin_id": id + } + } + } + }] + } + + self.broker.publish(toggle_peripheral_message) + # return ... + + def on(self, id): + peripheral_str = self.get_info('peripherals', id) + mode = peripheral_str['mode'] + + if mode == 1: + self.control_peripheral(id, 255) + elif mode == 0: + self.control_peripheral(id, 1) + + # return ... + + def off(self, id): + self.control_peripheral(id, 0) + # return ... + def take_photo(self): take_photo_message = { **RPC_REQUEST, diff --git a/farmbot_api.py b/fbapi.py similarity index 92% rename from farmbot_api.py rename to fbapi.py index 7120934..3c1b077 100644 --- a/farmbot_api.py +++ b/fbapi.py @@ -1,6 +1,5 @@ -# farmbot_API.py - import sys +import os import json import requests @@ -9,19 +8,17 @@ def __init__(self): self.token = None self.error = None - # API - # ├── token_handling() - # ├── request_handling() - # │ - # ├── get_token() - # ├── check_token() - # │ - # ├── request() - # │ - # ├── get() - # ├── post() - # ├── patch() - # └── delete() + # token_handling() --> errors for token + # request_handling() --> errors for request + + # get_token() + # check_token() + + # request() + # get() --> get endpoint info + # post() --> overwrite/new endpoint info + # patch() --> edit endpoint info + # delete() --> delete endpoint info def token_handling(self, response): # Handle HTTP status codes diff --git a/fbbro.py b/fbbro.py new file mode 100644 index 0000000..b810a16 --- /dev/null +++ b/fbbro.py @@ -0,0 +1,90 @@ +from fbapi import FarmbotAPI +from datetime import datetime +import paho.mqtt.client as mqtt + +import json + +class BrokerConnect(): + def __init__(self): + self.api = FarmbotAPI() + + self.token = self.api.token + self.client = None + + # connect() --> establish connection to message broker + # disconnect() --> disconnect from message broker + + # publish() + + # on_connect() --> subscribe to channel + # on_message() --> print channel/message + + # def on_connect(self, client, *_args): + # # subscribe to all channels + # self.client.subscribe(f"bot/{self.token['token']['unencoded']['bot']}/#") + # print('connected') + + # def on_message(self, _client, _userdata, msg): + # print('-' * 100) + # # print channel + # print(f'{msg.topic} ({datetime.now().strftime("%Y-%m-%d %H:%M:%S")})\n') + # # print message + # print(json.dumps(json.loads(msg.payload), indent=4)) + + # def on_connect(client, *_args): +# # subscribe to all channels +# client.subscribe(f"bot/{TOKEN['token']['unencoded']['bot']}/#") +# print('connected') + + def status_connect(self, client, *_args): + # Subscribe to specific channel + device_info = self.api.get('device') + device_id = device_info['id'] + + client.subscribe("bot/device_4652/status") + print('connected via status_connect()') + + def on_message(self, _client, _userdata, msg): + print('-' * 100) + # print channel + print("Channel:") + print(f'{msg.topic} ({datetime.now().strftime("%Y-%m-%d %H:%M:%S")})\n') + # print message + print("Message:") + print(json.dumps(json.loads(msg.payload), indent=4)) + + def connect(self): + print(self.api.token) + + self.api.check_token() + + self.client = mqtt.Client() + self.client.username_pw_set( + username=self.token['token']['unencoded']['bot'], + password=self.token['token']['encoded'] + ) + + self.client.connect( + self.token['token']['unencoded']['mqtt'], + port=1883, + keepalive=60 + ) + + # self.client.on_connect = status_connect + # self.client.on_message = on_message + + self.client.loop_start() + + def disconnect(self): + if self.client is not None: + self.client.loop_stop() + self.client.disconnect() + + def publish(self, message): + if self.client is None: + self.connect() + + self.client.publish( + f'bot/{self.token["token"]["unencoded"]["bot"]}/from_clients', + payload=json.dumps(message) + ) diff --git a/test_api.py b/test_api.py new file mode 100644 index 0000000..aa60d91 --- /dev/null +++ b/test_api.py @@ -0,0 +1,64 @@ +import unittest +from unittest.mock import patch, MagicMock +from farmbot_util_PORT import Farmbot + +class TestFarmbot(unittest.TestCase): + + def setUp(self): + self.farmbot = Farmbot() + + @patch('farmbot_util_PORT.FarmbotAPI.get_token') + def test_get_token(self, mock_get_token): + mock_get_token.return_value = 'fake_token' + self.farmbot.get_token('test@example.com', 'password123') + self.assertEqual(self.farmbot.token, 'fake_token') + mock_get_token.assert_called_once_with('test@example.com', 'password123', 'https://my.farm.bot') + + @patch('farmbot_util_PORT.FarmbotAPI.get_info') + def test_get_info(self, mock_get_info): + mock_get_info.return_value = {'info': 'fake_info'} + result = self.farmbot.get_info() + self.assertEqual(result, {'info': 'fake_info'}) + mock_get_info.assert_called_once() + + @patch('farmbot_util_PORT.FarmbotAPI.set_info') + def test_set_info(self, mock_set_info): + self.farmbot.set_info('label', 'value') + mock_set_info.assert_called_once_with('label', 'value') + + @patch('farmbot_util_PORT.FarmbotAPI.log') + def test_log(self, mock_log): + self.farmbot.log('message', 'info') + mock_log.assert_called_once_with('message', 'info') + + @patch('farmbot_util_PORT.FarmbotAPI.safe_z') + def test_safe_z(self, mock_safe_z): + mock_safe_z.return_value = 10 + result = self.farmbot.safe_z() + self.assertEqual(result, 10) + mock_safe_z.assert_called_once() + + @patch('farmbot_util_PORT.FarmbotAPI.garden_size') + def test_garden_size(self, mock_garden_size): + mock_garden_size.return_value = {'x': 1000, 'y': 2000} + result = self.farmbot.garden_size() + self.assertEqual(result, {'x': 1000, 'y': 2000}) + mock_garden_size.assert_called_once() + + @patch('farmbot_util_PORT.FarmbotAPI.group') + def test_group(self, mock_group): + sequences = ['seq1', 'seq2'] + mock_group.return_value = {'grouped': True} + result = self.farmbot.group(sequences) + self.assertEqual(result, {'grouped': True}) + mock_group.assert_called_once_with(sequences) + + @patch('farmbot_util_PORT.FarmbotAPI.curve') + def test_curve(self, mock_curve): + mock_curve.return_value = {'curve': True} + result = self.farmbot.curve('seq', 0, 0, 10, 10, 5, 5) + self.assertEqual(result, {'curve': True}) + mock_curve.assert_called_once_with('seq', 0, 0, 10, 10, 5, 5) + +if __name__ == '__main__': + unittest.main()