User Tools

Site Tools


oddstr13:node:piuploadtest
testnode.py
#!/usr/bin/env python
# -*- coding: utf8 -*-
#
# Copyright:   Odd Stråbø <oddstr13 at openshell dot no>
# License:     MIT - http://opensource.org/licenses/MIT
# Description: This is a simple https://ukhas.net node for RPi.
#              It uploads CPU temperature.
# Modified:    2015-10-19
#
 
import time
from decimal import Decimal as dec
import requests
 
# Disable insecure platform warning for requests
import requests.packages.urllib3
requests.packages.urllib3.disable_warnings()
 
NAME = "OSPi1"
DELAY = 60
 
UKHASNET_API_UPLOAD = "https://ukhas.net/api/upload"
 
TEMPERATURE_FILE = "/sys/devices/virtual/thermal/thermal_zone0/temp"
 
 
types = {
    "temperature": {
        "encode": lambda x: 'T' + str(x)
    },
    "zombie": {
        "encode": lambda x: 'Z' + str(int(x))
    }
}
 
 
def getTemp():
    return dec(open(TEMPERATURE_FILE).read()) / 1000
 
 
class Node(object):
    name = None
    seq = -1
    ttl = 3
 
    def __init__(self, name):
        if len(name) > 16:
            raise ValueError("Max Node ID length is 16 bytes.")
        self.name = name
 
    def getSequence(self):
        """Returns sequence character, and increment it."""
        c = chr(98 + self.seq)
        self.seq += 1
        self.seq %= 25
        return c
 
    def getTTL(self):
        """Returns TTL as string."""
        return str(min(self.ttl, 9))
 
    def getName(self):
        """Returns node name(ID)."""
        return self.name
 
    def send(self, **data):
        """Uploads packet to UKHASnet."""
        packet = []
        packet.append( self.getTTL() )
        packet.append( self.getSequence() )
 
        for type, value in data.items():
            packet.append( types.get(type).get("encode")(value) )
 
        packet.append( '[' )
        packet.append( self.getName() )
        packet.append( ']' )
 
        print(''.join(packet))
        res = requests.post(UKHASNET_API_UPLOAD, {
            'origin': self.getName(),
            'data': ''.join(packet)
        })
        print(res.text)
        return res.json().get("result")
 
 
if __name__ == "__main__":
    me = Node(NAME)
 
    while True:
        me.send(zombie=True, temperature=getTemp())
        time.sleep(DELAY)
oddstr13/node/piuploadtest.txt · Last modified: 2015/11/08 08:27 by oddstr13