Hi I have two digi xbee 3s. One a coordinator, connected to laptop via USB and the second set up as a remote sensor - it has a potentiometer on it and sends to value to the coordinator.
They can communicate via 2.4GHz RF. In XCTU, I can see that the correct API packets are being sent from the remote sensor to the coordinator. I can open communication with the coordinator via python
from digi.xbee.devices import XBeeDevice
xbee = XBeeDevice(“COM7”, 115200)
xbee.open()
I want to be able to, via python (or some other way), view the incoming data (API packets) and preferably save them into .csv files. What packages/libraries do I need?
Can anyone provide some code for this?
Also, I would prefer to directly receive data from my end device, connected to a computer via bluetooth (only 1 xbee required). Is that possible?
Thanks, when I try to run any of the of the sample projects from the digi micropython pycharm plugin, I get this “Error Compiling Project: ‘mpy-cross’ was not found in the Python interpreter configured for the project.” Even though mpy-cross is installed. Any ideas?
Here’s my code that works. Goes from a remote xbee 3 device and sends data to my local (USB plugged in xbee 3). API packets that are sent are strings of Hexadecimal data. This converts Hex to real normal decimal numbers and then this is converted to mV value (the data). That is then saved into .csv file
import serial
import csv
import sys
from datetime import datetime
# Open the serial port (check which COM with device manager)
ser = serial.Serial('COM7', 115200)
# Wait for user input to start
input("Press Enter to start reading data...")
# Open the CSV file for writing
with open('hexdata.csv', 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
# Write a header row
csvwriter.writerow(['Timestamp', 'Hexadecimal String',
'Sample 1 (Hex)', 'Sample 2 (Hex)', 'Sample 3 (Hex)', 'Sample 4 (Hex)', 'Sample 5 (Hex)',
'Sample 1 (Dec)', 'Sample 2 (Dec)', 'Sample 3 (Dec)', 'Sample 4 (Dec)', 'Sample 5 (Dec)',
'Sample 1 (mV)', 'Sample 2 (mV)', 'Sample 3 (mV)', 'Sample 4 (mV)', 'Sample 5 (mV)'])
try:
print("Reading data... Press Ctrl+C to stop.")
# Continuously read from the serial port
while True:
# Read incoming data (44 characters total in string, 22 pairs of 2 characters = 22 bytes)
incoming_data = ser.read(22)
# Convert to hexadecimal string (I think it already is?)
hex_string = incoming_data.hex()
# Check if the string starts with '7e'
if hex_string.startswith('7e'):
# Get current timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Take away 3 decimal places for miliseconds
# Extract sample bytes (samples start after 11 bytes, which is 22 characters)
# it's set to 5 samples per API/in each hex string
sample_start_index = 11 * 2
sample_bytes_hex = [
hex_string[sample_start_index:sample_start_index+4],
hex_string[sample_start_index+4:sample_start_index+8],
hex_string[sample_start_index+8:sample_start_index+12],
hex_string[sample_start_index+12:sample_start_index+16],
hex_string[sample_start_index+16:sample_start_index+20]
]
# Convert the hexadecimal sample values to decimal numbers
sample_bytes_dec = [int(sample, 16) for sample in sample_bytes_hex]
# Convert the decimal sample values to milivolts
# My voltage goes from 0 to 3925mV, the number coming in represents that as a number between 0 and 1023 (1023 steps). 3925 / 1023 = 3.22mV per step
mv_conversion_factor = 3.22
sample_bytes_mv = [round(sample_dec * mv_conversion_factor, 2) for sample_dec in sample_bytes_dec]
# Prepare row with timestamp, hex string, and all sample values grouped by type
row = [timestamp, hex_string]
row.extend(sample_bytes_hex)
row.extend(sample_bytes_dec)
row.extend(sample_bytes_mv)
# Write the row to the CSV file
csvwriter.writerow(row)
csvfile.flush() # Ensure data is written to the file immediately
# Optional print sample data to the console
# print(f"Timestamp: {timestamp}, Received data: {hex_string}, Samples (Hex): {sample_bytes_hex}, Samples (Dec): {sample_bytes_dec}, Samples (mV): {sample_bytes_mv}")
except KeyboardInterrupt:
print("\nData reading stopped by user.")
finally:
# Close the serial port
ser.close()
print("Serial port closed.")