Python, ANT+, a dude and his Bike..., by @mstafford

I've had an idea floating around in my head recently -- mostly inspired from my buddies that are into cycling, and my endeavouring to learn some Python. Now that I think about it -- I've learned quite a lot of python over the last year or so... Picking a project and running with it is a pretty great way to learn (bad habits).

Regardless.

  • I've got a bike a home w/ a neat indoor trainer set up. Means I can get sweaty while going precisely nowhere, with the added benefit of terrifying my dog the whole time;
  • Said bicycle has ANT+ speed and cadence sensors, and I've got a comparable heart-rate monitor as well;
  • I've got an ANT+ USB dongle that acts as a receiver; and
  • I recently discovered THIS python library that implements the ANT protocols:

Python implementation of the ANT, ANT+, and ANT-FS protocols. For more information about ANT, see http://www.thisisant.com/.

ANT
python
Tape that shit on
ANT+
Python
Bike and Sensors

Lets play!

Add the afforementioned python library to your projects requirements.txt file: ```

requirements.txt

git+https://github.com/SamyCookie/python-ant Install: pip install -r requirements.txt```

Enjoy:

Start up a new python file called cadence.py , and copy/paste the following: ``` import math import re import sys import time

from ant.core import driver, node, event, message from ant.core.constants import *

NETKEY = b'\xB9\xA5\x21\xFB\xBD\x72\xC3\x45'

class ANTListener(event.EventCallback): last speed time = 0 last speed revs = 0 now speed revs = 0

fivesec speed revs = 0

tensec speed revs = 0

last_cadence_time = 0
last_cadence_revs = 0
now_cadence_revs = 0

fives cadence revs = 0

tens cadence revs = 0

wheel_diameter = 0.673 ## Wheel diameter in m

def process(self, msg, _channel):
    if isinstance(msg, message.ChannelBroadcastDataMessage):
        self.last_cadence_revs = self.now_cadence_revs
        self.last_speed_revs = self.now_speed_revs
        ## Process Cadence Cumulative Revolutions
        ## ctimeLSB = msg.payload[1],ctimeMSB = msg.payload[2],cadenceLSB = msg.payload[3], cadenceMSB = msg.payload[4]
        ctime = int(format(msg.payload[2],'#010b')+re.sub('0b','',format(msg.payload[1],'#010b')),2)
        self.now_cadence_revs = int(format(msg.payload[4],'#010b')+re.sub('0b','',format(msg.payload[3],'#010b')),2)
        ## Precess Speed Cumulative Revolutions
        ## stimeLSB = msg.payload[5],stimeMSB = msg.payload[6],speedLSB = msg.payload[7],speedMSB = msg.payload[8]
        stime = int(format(msg.payload[6],'#010b')+re.sub('0b','',format(msg.payload[5],'#010b')),2)
        self.now_speed_revs = int(format(msg.payload[8],'#010b')+re.sub('0b','',format(msg.payload[7],'#010b')),2)

        speed_rev_delta = self.now_speed_revs - self.last_speed_revs
        speed_time_delta = stime - self.last_speed_time
        cadence_rev_delta = self.now_cadence_revs - self.last_cadence_revs
        cadence_time_delta = ctime - self.last_speed_time
        speed = (speed_rev_delta/speed_time_delta)*1024*(self.wheel_diameter*math.pi*60*60/1000) ## rev/sec x 0.673*math.pi m/rev x 1km/1000m x 60s/min x 60min/hr
        cadence = (cadence_rev_delta/cadence_time_delta)*1024*60

        print("Cadence Timestamps: ",ctime/1024,"|| Cadence Revolutions: ",self.now_cadence_revs)
        print("Cadence Rev Delta: ", cadence_rev_delta," Cadence Time Delta", cadence_time_delta," cadence: ",cadence," RPM")
        print("Speed Timestamps: ",stime/1024,"|| Speed Revolutions: ",self.now_speed_revs)
        print("Speed Rev Delta: ", speed_rev_delta," Speed Time Delta", speed_time_delta," Speed: ",speed," km/hr")
        print("=============")
        self.last_speed_time = stime
        self.last_cadence_time = ctime


antDevice = '/dev/ttyUSB0'
antVendor=0x0fcf
# antProduct=0x1009
antProduct=0x1008
stick = driver.USB2Driver(idVendor=antVendor, idProduct=antProduct)
antnode = node.Node(stick)
antnode.start()
channel = antnode.getFreeChannel()

def setup(self):
    # Start shit up
    #stick = driver.USB2Driver()
    if not self.antnode.running:
        self.antnode.start()

    # Setup channel
    net = node.Network(name='N:ANT+', key=NETKEY)
    self.antnode.setNetworkKey(0, net)
    self.channel.name = 'C:HRM'
    self.channel.assign(net, CHANNEL_TYPE_TWOWAY_RECEIVE)
    self.channel.setID(121, 0, 0)
    self.channel.searchTimeout = TIMEOUT_NEVER
    self.channel.period = 8070
    self.channel.frequency = 57
    self.channel.open()

def start_listen(self):
    if not self.antnode.running:
        self.setup()
    # Setup callback
    # Note: We could also register an event listener for non-channel events by
    # calling registerEventListener() on antnode rather than channel.
    self.channel.registerCallback(ANTListener())

def stop_listen(self):
    # Shutdown
    self.channel.close()
    self.channel.unassign()
    self.antnode.stop()

```

The above script generally does the following: * setup() preps the ANT stick for the appropriate idVendor and idProduct, and finds a free channel to work on; * Currently, we set up a channel only for our speed/cadence sensors; * You can find your device type (if you're a linux user) via lsusb . * start_listen() starts the device listening for broadcast events from any sensors matching our specified type; * stop_listen() stops the device and closes the channel and antnode -- freeing the USB device to use again; * For whatever reason, I end up having to unplug / re-plug the device to get it connected again.. whatever... * Listens for broadcast events on the channel

Some notes on how this stuff works:

  • Interpreting / handling the data was a massive pain in the ass... for whatever reason, the data is sent out in byte-arrays and required a fair deal of converting back and forth between binary and integer formatting...
  • Had to learn a bit about MSB and LSB bit numbering... That was fun;
  • If your wheels are a different size, you'll probably want to update the wheel_diameter value -- mine is 673mm;
  • Script spits out your instantaneous cadence in RPM's, and your instantaneous speed in km/hr -- if you want your shit Americanized, you can 'GIT OUT:

So assuming that you've punched in the appropriate antProduct listed in your lsusb , and updated for any difference in wheel size -- you SHOULD be up and rolling:

Open up your terminal: source env/bin/activate python import cadence listener = cadence.ANTListener() listener.setup() listener.start_listen()

you SHOULD start seeing outputs like the following: ``` Cadence Timestamps: 6.95703125 || Cadence Revolutions: 285 Cadence Rev Delta: 4 Cadence Time Delta 2742 cadence: 89.62800875273523 RPM Speed Timestamps: 7.2412109375 || Speed Revolutions: 1581

Speed Rev Delta: 8 Speed Time Delta 3033 Speed: 20.55819452018244 km/hr

Cadence Timestamps: 9.6435546875 || Cadence Revolutions: 289 Cadence Rev Delta: 4 Cadence Time Delta 2460 cadence: 99.90243902439025 RPM Speed Timestamps: 10.1884765625 || Speed Revolutions: 1589

Speed Rev Delta: 8 Speed Time Delta 3018 Speed: 20.66037242535233 km/hr

Cadence Timestamps: 13.0166015625 || Cadence Revolutions: 294 Cadence Rev Delta: 5 Cadence Time Delta 2896 cadence: 106.07734806629834 RPM Speed Timestamps: 13.140625 || Speed Revolutions: 1597

Speed Rev Delta: 8 Speed Time Delta 3023 Speed: 20.62620045640534 km/hr

Cadence Timestamps: 15.744140625 || Cadence Revolutions: 298 Cadence Rev Delta: 4 Cadence Time Delta 2666 cadence: 92.18304576144037 RPM Speed Timestamps: 16.130859375 || Speed Revolutions: 1605

Speed Rev Delta: 8 Speed Time Delta 3062 Speed: 20.363489216104945 km/hr

Cadence Timestamps: 18.486328125 || Cadence Revolutions: 302 Cadence Rev Delta: 4 Cadence Time Delta 2412 cadence: 101.8905472636816 RPM Speed Timestamps: 18.7626953125 || Speed Revolutions: 1612 Speed Rev Delta: 7 Speed Time Delta 2695 Speed: 20.24448181159524 km/hr ```


So that's it that's all! That's how you can whip up a quick and dirty ANT+ speed/cadence reader...

Next steps are up to you -- I've got some cool ideas for what I want to accomplish... Images below paint a bit of a story for what I'm thinking...

Godot
Zwift Cycling Game
xhaust.me
Open Source Game Engine
Inspiration
Community

What projects / ideas are you working on? Do you like to have some kind of creative fire burning? I feel like most people here are pretty creative -- that's kind of the "early adopter" crowd that Steem has attracted.

Do you think you'd play a cycling game where you can meetup digitally and race other Steemians / Exhaust-o-nauts? I probably would, so long as I didn't have to go up against @run.vince.run or @jgrieco. This would probably be a fun way to have another "Solstice Race" event...

9.356 HBD

0

0

2

0

SHOUT-OUTS!

avatar
@raserrano:

I think I would in fact I stopped cycling when the rainy season started, I'm happy to say that is mostly gone which means I'll be able to enjoy cycling around more but I would def play at night or maybe when rain is too bad.


avatar
@steevc:

I can see that was too cool to resist. I really ought to look into what I can do with Python as I use it at work. Maybe I can automate some of my posts a bit.