oddstr13:node:piuploadtest
- testnode.py
#!/usr/bin/env python
# -*- coding: utf8 -*-
#
# Copyright: Odd Stråbø <oddstr13 at openshell dot no>
# License: MIT - http://opensource.org/licenses/MIT
# Description: This is a simple https://ukhas.net node for RPi.
# It uploads CPU temperature.
# Modified: 2015-10-19
#
import time
from decimal import Decimal as dec
import requests
# Disable insecure platform warning for requests
import requests.packages.urllib3
requests.packages.urllib3.disable_warnings()
NAME = "OSPi1"
DELAY = 60
UKHASNET_API_UPLOAD = "https://ukhas.net/api/upload"
TEMPERATURE_FILE = "/sys/devices/virtual/thermal/thermal_zone0/temp"
types = {
"temperature": {
"encode": lambda x: 'T' + str(x)
},
"zombie": {
"encode": lambda x: 'Z' + str(int(x))
}
}
def getTemp():
return dec(open(TEMPERATURE_FILE).read()) / 1000
class Node(object):
name = None
seq = -1
ttl = 3
def __init__(self, name):
if len(name) > 16:
raise ValueError("Max Node ID length is 16 bytes.")
self.name = name
def getSequence(self):
"""Returns sequence character, and increment it."""
c = chr(98 + self.seq)
self.seq += 1
self.seq %= 25
return c
def getTTL(self):
"""Returns TTL as string."""
return str(min(self.ttl, 9))
def getName(self):
"""Returns node name(ID)."""
return self.name
def send(self, **data):
"""Uploads packet to UKHASnet."""
packet = []
packet.append( self.getTTL() )
packet.append( self.getSequence() )
for type, value in data.items():
packet.append( types.get(type).get("encode")(value) )
packet.append( '[' )
packet.append( self.getName() )
packet.append( ']' )
print(''.join(packet))
res = requests.post(UKHASNET_API_UPLOAD, {
'origin': self.getName(),
'data': ''.join(packet)
})
print(res.text)
return res.json().get("result")
if __name__ == "__main__":
me = Node(NAME)
while True:
me.send(zombie=True, temperature=getTemp())
time.sleep(DELAY)
oddstr13/node/piuploadtest.txt · Last modified: 2020/06/27 23:02 (external edit)