Initial Commit

This commit is contained in:
2018-08-27 21:22:50 -07:00
commit 15546c9782
137 changed files with 10509 additions and 0 deletions

View File

@@ -0,0 +1,127 @@
#===========================================================================
#
# Log on/off packets
#
#===========================================================================
import logging
import socket
import time
# Import the base header and the struct type codes we're using.
from .Header import *
from .. import util
#===========================================================================
class LogOn ( Header ):
_fields = Header._fields + [
( uint4, 'command' ),
( uint4, 'group' ),
( uint4, 'timeout' ),
( uint4, 'time' ),
( uint4, 'unknown1' ),
( '12s', "password" ),
( uint4, 'trailer' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
#------------------------------------------------------------------------
def __init__( self, group, password ):
assert( len( password ) <= 12 )
assert( group == "USER" or group == "INSTALLER" )
Header.__init__( self )
if group == "USER":
self.group = 0x00000007
passOffset = 0x88
else: # installer
self.group = 0x0000000A
passOffset = 0xBB
self.password = ""
# Loop over each character and encode it as hex and offset by
# the group code offset.
for i in range( 12 ):
if i < len( password ):
c = int( password[i].encode( 'hex' ), 16 ) + passOffset
# Pad out to 12 bytes w/ the group code offset.
else:
c = passOffset
# Turn the hex code back to a character.
self.password += chr( c )
self.destCtrl = 0x0100
self.srcCtrl = 0x0100
self.command = 0xFFFD040C
self.timeout = 0x00000384 # 900 sec
self.time = int( time.time() )
self.unknown1 = 0x00
self.trailer = 0x00
#------------------------------------------------------------------------
def send( self, sock ):
# Pack ourselves into the message structure.
bytes = self.struct.pack( self )
if self._log.isEnabledFor( logging.DEBUG ):
self._log.debug( "Send: LogOn packet\n" + util.hex.dump( bytes ) )
try:
# Send the message and receive the response back.
sock.send( bytes )
bytes = sock.recv( 4096 )
except socket.timeout as e:
msg = "Can't log on - time out error"
self._log.error( msg )
util.Error.raiseException( e, msg )
if self._log.isEnabledFor( logging.DEBUG ):
self._log.debug( "Recv: LogOn reply\n" + util.hex.dump( bytes ) )
# Overwrite our fields w/ the reply data.
self.struct.unpack( self, bytes )
if self.error:
raise util.Error( "Error trying to log on to the SMA inverter. "
"Group/password failed." )
#------------------------------------------------------------------------
#===========================================================================
class LogOff ( Header ):
_fields = Header._fields + [
( uint4, 'command' ),
( uint4, 'unknown1' ),
( uint4, 'trailer' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
#------------------------------------------------------------------------
def __init__( self ):
Header.__init__( self )
self.destCtrl = 0x0300
self.srcCtrl = 0x0300
self.command = 0xFFFD010E
self.unknown1 = 0xFFFFFFFF
self.trailer = 0x00
#------------------------------------------------------------------------
def send( self, sock ):
bytes = self.struct.pack( self )
if self._log.isEnabledFor( logging.DEBUG ):
self._log.debug( "Send: LogOff packet\n" + util.hex.dump( bytes ) )
sock.send( bytes )
#------------------------------------------------------------------------
#===========================================================================

View File

@@ -0,0 +1,92 @@
#===========================================================================
#
# Common data packet structures. Used for requests and replies.
#
#===========================================================================
from .. import util
#===========================================================================
# Struct type codes
uint1 = "B"
uint2 = "H"
uint4 = "I"
uint8 = "Q"
int1 = "b"
int2 = "h"
int4 = "i"
int8 = "q"
#==============================================================================
class Header:
_fields = [
# Header fields
( uint4, 'hdrMagic' ),
( uint4, 'hdrUnknown1' ),
( uint4, 'hdrUnknown2' ),
( uint1, 'packetHi' ), # packet length in little endian hi word
( uint1, 'packetLo' ), # packet length in little endian low word
( uint4, 'signature' ),
( uint1, 'wordLen' ), # int( packetLen / 4 )
( uint1, 'hdrUnknown3' ),
# Common packet fields
( uint2, 'destId', ),
( uint4, 'destSerial', ),
( uint2, 'destCtrl', ),
( uint2, 'srcId', ),
( uint4, 'srcSerial', ),
( uint2, 'srcCtrl', ),
( uint2, 'error', ),
( uint2, 'fragmentId', ),
( uint1, 'packetId', ),
( uint1, 'baseUnknown', ),
]
_hdrSize = 20 # bytes for the header fields.
_nextPacketId = 0
#------------------------------------------------------------------------
def __init__( self ):
assert( self.struct )
self.hdrMagic = 0x00414D53
self.hdrUnknown1 = 0xA0020400
self.hdrUnknown2 = 0x01000000
self.signature = 0x65601000
self.hdrUnknown3 = 0xA0
# NOTE: self.struct must be created by the derived class. That
# allows this to compute the correct packet length and encode it.
packetLen = len( self.struct ) - self._hdrSize
self.packetHi = ( packetLen >> 8 ) & 0xFF
self.packetLo = packetLen & 0xFF
self.wordLen = int( packetLen / 4 )
# Any destination - we send to a specific IP address so this
# isn't important.
self.destId = 0xFFFF
self.destSerial = 0xFFFFFFFF
self.destCtrl = 0x00
self.srcId = 0x7d # TODO change this?
self.srcSerial = 0x334657B0 # TODO change this?
self.srcCtrl = 0x00
self.error = 0
self.fragmentId = 0
self.baseUnknown = 0x80
# Packet id is 1 byte so roll over at 256.
self._nextPacketId += 1
if self._nextPacketId == 256:
self._nextPacketId = 1
self.packetId = self._nextPacketId
self._log = util.log.get( "sma" )
#------------------------------------------------------------------------
def bytes( self ):
return self.struct.pack( self )
#------------------------------------------------------------------------
#===========================================================================

View File

@@ -0,0 +1,240 @@
#===========================================================================
#
# Primary SMA API.
#
#===========================================================================
import socket
from .. import util
from . import Auth
from . import Reply
from . import Request
#==============================================================================
class Link:
"""SMA WebConnection link
Units: Watt, Watt-hours, C, seconds
l = Link( '192.168.1.14' )
print l.acTotalEnergy()
See also: report for common requests.
"""
def __init__( self, ip, port=9522, group="USER", password="0000",
connect=True, timeout=120, decode=True, raw=False ):
if group != "USER" and group != "INSTALLER":
raise util.Error( "Invalid group '%s'. Valid groups are 'USER' "
"'INSTALLER'." % group )
self.ip = ip
self.port = port
self.group = group
self.password = password
self.timeout = timeout
self.decode = decode
self.raw = raw
self.socket = None
if connect:
self.open()
#---------------------------------------------------------------------------
def info( self ):
p = Request.Data( command=0x58000200, first=0x00821E00, last=0x008220FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.StringItem( "name", 40, timeVar="timeWake" ),
Reply.AttrItem( "type", 40 ),
Reply.AttrItem( "model", 40 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def status( self ):
p = Request.Data( command=0x51800200, first=0x00214800, last=0x002148FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.AttrItem( "status", 32, timeVar="time" ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def gridRelayStatus( self ):
p = Request.Data( command=0x51800200, first=0x00416400, last=0x004164FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.AttrItem( "gridStatus", 32, timeVar="timeOff" ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def temperature( self ):
"""Return the inverter temp in deg C (or 0 if unavailable)."""
p = Request.Data( command=0x52000200, first=0x00237700, last=0x002377FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I32Item( "temperature", 16, mult=0.01 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def version( self ):
"""Return the inverter software version string."""
p = Request.Data( command=0x58000200, first=0x00823400, last=0x008234FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.VersionItem( "version" ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def acTotalEnergy( self ):
p = Request.Data( command=0x54000200, first=0x00260100, last=0x002622FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I64Item( "totalEnergy", 16, mult=1.0, timeVar="timeLast" ),
Reply.I64Item( "dailyEnergy", 16, mult=1.0 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def acTotalPower( self ):
p = Request.Data( command=0x51000200, first=0x00263F00, last=0x00263FFF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I32Item( "acPower", 28, mult=1.0, timeVar="timeOff" ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def acPower( self ):
p = Request.Data( command=0x51000200, first=0x00464000, last=0x004642FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I32Item( "acPower1", 28, mult=1.0, timeVar="timeOff" ),
Reply.I32Item( "acPower2", 28, mult=1.0 ),
Reply.I32Item( "acPower3", 28, mult=1.0 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def acMaxPower( self ):
p = Request.Data( command=0x51000200, first=0x00411E00, last=0x004120FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.U32Item( "acMaxPower1", 28, mult=1.0, timeVar="time" ),
Reply.U32Item( "acMaxPower2", 28, mult=1.0 ),
Reply.U32Item( "acMaxPower3", 28, mult=1.0 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def operationTime( self ):
p = Request.Data( command=0x54000200, first=0x00462E00, last=0x00462FFF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I64Item( "operationTime", 16, mult=1.0, timeVar="timeLast" ),
Reply.I64Item( "feedTime", 16, mult=1.0 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def dcPower( self ):
p = Request.Data( command=0x53800200, first=0x00251E00, last=0x00251EFF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I32Item( "dcPower1", 28, mult=1.0, timeVar="timeOff" ),
Reply.I32Item( "dcPower2", 28, mult=1.0 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def dcVoltage( self ):
p = Request.Data( command=0x53800200, first=0x00451F00, last=0x004521FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.I32Item( "dcVoltage1", 28, mult=0.01, timeVar="timeOff" ),
Reply.I32Item( "dcVoltage2", 28, mult=0.01 ),
Reply.I32Item( "dcCurrent1", 28, mult=0.001 ),
Reply.I32Item( "dcCurrent2", 28, mult=0.001 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def acVoltage( self ):
p = Request.Data( command=0x51000200, first=0x00464800, last=0x004652FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.U32Item( "acVoltage1", 28, mult=0.01, timeVar="timeOff" ),
Reply.U32Item( "acVoltage2", 28, mult=0.01 ),
Reply.U32Item( "acVoltage3", 28, mult=0.01 ),
Reply.U32Item( "acGridVoltage", 28, mult=0.01 ),
Reply.U32Item( "unknown1", 28, mult=0.01 ),
Reply.U32Item( "unknown2", 28, mult=0.01 ),
Reply.U32Item( "acCurrent1", 28, mult=0.001 ),
Reply.U32Item( "acCurrent2", 28, mult=0.001 ),
Reply.U32Item( "acCurrent3", 28, mult=0.001 ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def gridFrequency( self ):
p = Request.Data( command=0x51000200, first=0x00465700, last=0x004657FF )
bytes = p.send( self.socket )
decoder = Reply.Value( [
Reply.U32Item( "frequency", 28, mult=0.01, timeVar="timeOff" ),
] )
return self._return( bytes, decoder )
#---------------------------------------------------------------------------
def __del__( self ):
self.close()
#---------------------------------------------------------------------------
def open( self ):
if self.socket:
return
self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
self.socket.settimeout( self.timeout )
try:
self.socket.connect( ( self.ip, self.port ) )
p = Auth.LogOn( self.group, self.password )
p.send( self.socket )
except:
if self.socket:
self.socket.close()
self.socket = None
raise
#---------------------------------------------------------------------------
def close( self ):
if not self.socket:
return
p = Auth.LogOff()
try:
p.send( self.socket )
finally:
self.socket.close()
self.socket = None
#---------------------------------------------------------------------------
def __enter__( self ):
return self
#---------------------------------------------------------------------------
def __exit__( self, type, value, traceback ):
self.close()
#---------------------------------------------------------------------------
def _return( self, bytes, decoder ):
if self.decode:
return decoder.decode( bytes, self.raw )
else:
return ( bytes, decoder )
#==============================================================================

View File

@@ -0,0 +1,225 @@
#===========================================================================
#
# Data reply packets
#
#===========================================================================
import struct
from .. import util
# Import the base header and the struct type codes we're using.
from .Header import *
from . import tags
#===========================================================================
class Value ( Header, util.Data ):
_fields = Header._fields + [
( uint4, 'command' ),
( uint4, 'first' ),
( uint4, 'last' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
def __init__( self, decoders ):
self.values = []
self._decoders = decoders
Header.__init__( self )
util.Data.__init__( self )
def __call__( self, bytes, raw=False, obj=None ):
return self.decode( bytes, raw, obj )
def decode( self, bytes, raw=False, obj=None ):
# Unpack the base data.
self.struct.unpack( self, bytes )
offset = len( self.struct )
for d in self._decoders:
offset += d.decodeItem( self, bytes, offset )
if raw:
return self
r = obj if obj is not None else util.Data()
for item in self.values:
for varFrom, varTo in item._variables:
value = getattr( item, varFrom )
setattr( r, varTo, value )
return r
#==============================================================================
class BaseItem ( util.Data ):
_fields = [
( uint1, 'mppNum' ),
( uint2, 'msgType' ),
( uint1, 'dataType' ),
( uint4, 'time' ),
]
_baseSize = 8 # bytes
def __init__( self, var, size, timeVar=None ):
self.variable = var
self._size = size
self._variables = []
if timeVar:
self._variables.append( ( 'time', timeVar ) )
util.Data.__init__( self )
def decodeBase( self, obj, attrVal ):
setattr( obj, self.variable, attrVal )
obj.values.append( self )
return self._size
#==============================================================================
class StringItem ( BaseItem ):
def __init__( self, var, size, timeVar=None ):
BaseItem.__init__( self, var, size, timeVar )
self._fields = BaseItem._fields + [
( '%ds' % ( size - BaseItem._baseSize ), "bytes" ),
]
self._struct = util.NamedStruct( 'LITTLE_ENDIAN', self._fields )
self._variables.append( ( 'value', var ) )
def decodeItem( self, obj, bytes, offset ):
self._struct.unpack( self, bytes, offset )
s = self.bytes
idx = s.find( "\0" )
if idx != -1:
s = s[:idx]
self.value = s
return BaseItem.decodeBase( self, obj, self.value )
#==============================================================================
class AttrItem ( BaseItem ):
struct = util.NamedStruct( 'LITTLE_ENDIAN', BaseItem._fields )
def __init__( self, var, size, timeVar=None ):
BaseItem.__init__( self, var, size, timeVar )
self._numAttr = ( size - BaseItem._baseSize ) / 4
self._variables.append( ( 'value', var ) )
def decodeItem( self, obj, bytes, offset ):
self.struct.unpack( self, bytes, offset )
offset += len( self.struct )
# Little endian, 4 individual unsigned bytes
s = struct.Struct( '<BBBB' )
for i in range( self._numAttr ):
# x[0], x[1], x[2] are the attribute code
# x[3] == 1 if the attribute is active or 0 if not
# Only 1 active attribute per block
x = s.unpack_from( bytes, offset )
if x[3] == 1:
self.code = x[2]*65536 + x[1]*256 + x[0]
self.value = tags.values.get( self.code, "Unknown" )
break
offset += s.size
return BaseItem.decodeBase( self, obj, self.value )
#==============================================================================
class BaseInt ( BaseItem ):
def __init__( self, var, size, mult=1.0, timeVar=None ):
BaseItem.__init__( self, var, size, timeVar )
self._mult = mult
self._variables.append( ( 'value', var ) )
def decodeItem( self, obj, bytes, offset ):
self.struct.unpack( self, bytes, offset )
if self.value == self.nan:
self.value = 0
else:
self.value *= self._mult
return BaseItem.decodeBase( self, obj, self.value )
#==============================================================================
class I32Item ( BaseInt ):
_fields = BaseInt._fields + [
( int4, 'value' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
nan = -0x80000000 # SMA int32 NAN value
#==============================================================================
class U32Item ( BaseInt ):
_fields = BaseInt._fields + [
( uint4, 'value' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
nan = 0xFFFFFFFF # SMA uint32 NAN value
#==============================================================================
class I64Item ( BaseInt ):
_fields = BaseInt._fields + [
( int8, 'value' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
nan = -0x8000000000000000 # SMA int64 NAN value
#==============================================================================
class U64Item ( BaseInt ):
_fields = BaseInt._fields + [
( uint8, 'value' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
def __init__( self, size, var, mult=1.0 ):
# 0xFFFFFFFFFFFFFFFF == SMA uint64 NAN value
BaseInt.__init__( self, 0xFFFFFFFFFFFFFFFF, size, var, mult )
#==============================================================================
class VersionItem ( BaseItem ):
_fields = BaseItem._fields + [
( uint4, 'value' ),
( uint4, 'value2' ),
( uint4, 'value3' ),
( uint4, 'value4' ),
( uint1, 'verType' ),
( uint1, 'verBuild' ),
( uint1, 'verMinor' ),
( uint1, 'verMajor' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
def __init__( self, var ):
BaseItem.__init__( self, var, 24 )
self._variables.append( ( 'version', var ) )
def decodeItem( self, obj, bytes, offset ):
self.struct.unpack( self, bytes, offset )
if self.verType > 5:
relType = str( self.verType )
else:
relType = "NEABRS"[self.verType]
self.version = '%02x.%02x.%02x.%s' % ( self.verMajor, self.verMinor,
self.verBuild, relType )
return BaseItem.decodeBase( self, obj, self.version )
#==============================================================================

View File

@@ -0,0 +1,58 @@
#===========================================================================
#
# Data request packet
#
#===========================================================================
import logging
import socket
# Import the base header and the struct type codes we're using.
from .Header import *
from .. import util
#===========================================================================
class Data ( Header ):
_fields = Header._fields + [
( uint4, 'command' ),
( uint4, 'first' ),
( uint4, 'last' ),
( uint4, 'trailer' ),
]
struct = util.NamedStruct( 'LITTLE_ENDIAN', _fields )
#------------------------------------------------------------------------
def __init__( self, command, first, last ):
Header.__init__( self )
self.command = command
self.first = first
self.last = last
self.trailer = 0x00
#------------------------------------------------------------------------
def send( self, sock ):
bytes = self.struct.pack( self )
if self._log.isEnabledFor( logging.DEBUG ):
self._log.debug( "Send: Request.Data packet\n" +
util.hex.dump( bytes ) )
# Send the request and read the reply back in.
try:
sock.send( bytes )
reply = sock.recv( 4096 )
except socket.timeout as e:
msg = "Data request failed - time out error"
self._log.error( msg )
util.Error.raiseException( e, msg )
if self._log.isEnabledFor( logging.DEBUG ):
self._log.debug( "Recv: Reply packet\n" + util.hex.dump( reply ) )
return reply
#------------------------------------------------------------------------
#===========================================================================

View File

@@ -0,0 +1,28 @@
#===========================================================================
#
# SMA inverter module
#
#===========================================================================
__doc__ = """Read SMA Solar inverter data.
See Link and report for the main programming interfaces.
See cmdLine for a main program to poll the inverter and send out MQTT
messges.
"""
#===========================================================================
from . import Auth
from . import cmdLine
from . import config
from .Header import Header
from .Link import Link
from . import Reply
from . import report
from . import Request
from . import start
from . import tags
#===========================================================================

View File

@@ -0,0 +1,49 @@
#===========================================================================
#
# Command line processing
#
#===========================================================================
import argparse
from .. import broker
from . import config
from . import start
#===========================================================================
def run( args ):
"""Parse command line arguments to poll the inverter.
= INPUTS
- args [str]: List of command line arguments. [0] should be the
program name.
"""
p = argparse.ArgumentParser( prog=args[0],
description="SMA inverter reader" )
p.add_argument( "-c", "--configDir", metavar="configDir",
default="/var/config/tHome",
help="T-Home configuration directory." )
p.add_argument( "-l", "--log", metavar="logFile",
default=None, help="Logging file to use. Input 'stdout' "
"to log to the screen." )
p.add_argument( "--debug", default=False, action="store_true",
help="Debugging - no sending, just print" )
c = p.parse_args( args[1:] )
if c.debug:
c.log = "stdout"
# Parse the sma and broker config files.
cfg = config.parse( c.configDir )
log = config.log( cfg, c.log )
if c.debug:
log.setLevel( 10 )
# Create the MQTT client and connect it to the broker.
client = broker.connect( c.configDir, log )
start.start( cfg, client, debug=c.debug )
#===========================================================================

View File

@@ -0,0 +1,49 @@
#===========================================================================
#
# Config file
#
#===========================================================================
__doc__ = """Config file parsing.
"""
from .. import util
from ..util import config as C
#===========================================================================
# Config file section name and defaults.
configEntries = [
# ( name, converter function, default value )
C.Entry( "host", str ),
C.Entry( "port", int, 9522 ),
C.Entry( "group", str, "USER" ),
C.Entry( "password", str, "0000" ),
C.Entry( "logFile", util.path.expand ),
C.Entry( "logLevel", int, 20 ), # INFO
C.Entry( "pollPower", int, 10 ),
C.Entry( "pollEnergy", int, 600 ), # 10 minutes
C.Entry( "pollFull", int, 0 ), # disabled
C.Entry( "mqttEnergy", str, "elec/solar/meter" ),
C.Entry( "mqttPower", str, "elec/solar/instant" ),
C.Entry( "mqttFull", str, "elec/solar/detail" ),
C.Entry( "lat", float ),
C.Entry( "lon", float ),
C.Entry( "timePad", float, 600 ), # 10 minutes
]
#===========================================================================
def parse( configDir, configFile='sma.py' ):
return C.readAndCheck( configDir, configFile, configEntries )
#===========================================================================
def log( config, logFile=None ):
if not logFile:
logFile = config.logFile
return util.log.get( "sma", config.logLevel, logFile )
#===========================================================================

View File

@@ -0,0 +1,99 @@
#===========================================================================
#
# Report functions
#
#===========================================================================
import time
from ..util import Data
from .Link import Link
#===========================================================================
def power( *args, **kwargs ):
"""Return instantaneous AC and DC power generation.
Inputs are the same as Link() constructor:
obj = report.instant( '192.168.1.15' )
print obj
"""
with Link( *args, **kwargs ) as link:
link.decode = False
link.raw = False
dcBytes, dc = link.dcPower()
acBytes, ac = link.acTotalPower()
now = time.time()
obj = dc.decode( dcBytes )
obj.update( ac.decode( acBytes ) )
obj.time = now
obj.dcPower = obj.dcPower1 + obj.dcPower2
return obj
#===========================================================================
def energy( *args, **kwargs ):
"""Return instantaneous power and total energy status.
Get instantaneous AC and DC power generation and energy created for
the day.
Inputs are the same as Link() constructor:
obj = report.energy( '192.168.1.15' )
print obj
"""
with Link( *args, **kwargs ) as link:
link.decode = False
dcBytes, dc = link.dcPower()
acBytes, ac = link.acTotalPower()
totBytes, total = link.acTotalEnergy()
now = time.time()
obj = dc.decode( dcBytes )
obj.update( ac.decode( acBytes ) )
obj.update( total.decode( totBytes ) )
obj.time = now
obj.dcPower = obj.dcPower1 + obj.dcPower2
return obj
#===========================================================================
def full( *args, **kwargs ):
"""Return all possible fields.
Inputs are the same as Link() constructor:
obj = report.full( '192.168.1.15' )
print obj
"""
funcs = [
Link.info,
Link.status,
Link.gridRelayStatus,
Link.temperature,
Link.version,
Link.acTotalEnergy,
Link.acTotalPower,
Link.acPower,
Link.acMaxPower,
Link.operationTime,
Link.dcPower,
Link.dcVoltage,
Link.acVoltage,
Link.gridFrequency,
]
with Link( *args, **kwargs ) as link:
link.decode = False
results = [ f( link ) for f in funcs ]
now = time.time()
obj = Data()
for bytes, decoder in results:
obj.update( decoder.decode( bytes ) )
obj.time = now
obj.dcPower = obj.dcPower1 + obj.dcPower2
return obj
#===========================================================================

View File

@@ -0,0 +1,207 @@
#===========================================================================
#
# Main report processing
#
#===========================================================================
import astral
import calendar
import datetime
import json
import time
from .. import util
from . import report
#===========================================================================
def start( config, client, debug=False ):
fromts = datetime.datetime.fromtimestamp
log = util.log.get( "sma" )
linkArgs = { "ip" : config.host, "port" : config.port,
"group" : config.group, "password" : config.password, }
reports = []
if config.pollFull > 0:
reports.append( util.Data( lbl="full", func=msgFull,
poll=config.pollFull, nextT=0 ) )
if config.pollEnergy > 0:
reports.append( util.Data( lbl="energy", func=msgEnergy,
poll=config.pollEnergy, nextT=0 ) )
if config.pollPower > 0:
reports.append( util.Data( lbl="power", func=msgPower,
poll=config.pollPower, nextT=0 ) )
if not reports:
log.error( "No reports specified, all poll times <= 0" )
return
t0 = time.time()
useRiseSet = ( config.lat is not None and config.lon is not None )
log.info( "Startup time : %s" % fromts( t0 ) )
if useRiseSet:
timeRise = sunRise( config.lat, config.lon )
timeSet = sunSet( config.lat, config.lon )
log.info( "Today's sun rise: %s" % fromts( timeRise ) )
log.info( "Today's sun set : %s" % fromts( timeSet ) )
else:
timeRise = 0
timeSet = 3e9 # jan, 2065
# Current time is before todays sun rise. Start reporting at the
# rise time and stop reporting at the set time.
if t0 < timeRise:
timeBeg = timeRise - config.timePad
timeEnd = timeSet + config.timePad
log.info( "Before sun rise, sleeping until %s" % fromts( timeBeg ) )
# Current time is after todays sun set. Start reporting at
# tomorrows rise time and stop reporting at tomorrows set time.
elif t0 > timeSet:
timeBeg = sunRise( config.lat, config.lon, +1 ) - config.timePad
timeEnd = sunSet( config.lat, config.lon, +1 ) + config.timePad
log.info( "After sun set, sleeping until %s" % fromts( timeBeg ) )
# Current time is between todays rise and set. Start reporting
# immediately and stop reporting at the set time.
else:
timeBeg = t0
timeEnd = timeSet + config.timePad
log.info( "Sun up, run until sunset %s" % fromts( timeEnd ) )
_initTimes( reports, timeBeg )
while True:
nextReport = min( reports, key=lambda x: x.nextT )
dt = nextReport.nextT - t0
if dt > 0:
log.info( "Sleeping %s sec for report %s" % ( dt, nextReport.lbl ) )
time.sleep( dt )
log.info( "Running report : %s" % nextReport.lbl )
try:
nextReport.func( client, linkArgs, config, log )
except:
log.exception( "Report failed to run" )
nextReport.nextT += nextReport.poll
t0 = time.time()
# Time is now after the end time for today.
if t0 > timeEnd:
# If the last report wasn't the largest one, run the largest
# report one last time. This gives us a final tally of
# energy production for example.
if nextReport != reports[0]:
reports[0].func( client, linkArgs, config, log )
timeBeg = sunRise( config.lat, config.lon, +1 ) - config.timePad
timeEnd = sunSet( config.lat, config.lon, +1 ) + config.timePad
_initTimes( reports, timeBeg )
log.info( "After sun set, sleeping until %s" % fromts( timeBeg ) )
#===========================================================================
def sunRise( lat, lon, dayOffset=0 ):
t = datetime.date.today() + datetime.timedelta( days=dayOffset )
a = astral.Astral()
utc = a.sunrise_utc( t, lat, lon )
# Convert the UTC datetime to a UNIX time stamp.
return calendar.timegm( utc.timetuple() )
#===========================================================================
def sunSet( lat, lon, dayOffset=0 ):
t = datetime.date.today() + datetime.timedelta( days=dayOffset )
a = astral.Astral()
utc = a.sunset_utc( t, lat, lon )
# Convert the UTC datetime to a UNIX time stamp.
return calendar.timegm( utc.timetuple() )
#===========================================================================
def msgPower( client, linkArgs, config, log ):
data = report.power( **linkArgs )
msg = _buildPowerMsg( data, log )
payload = json.dumps( msg )
log.info( "Publish: %s: %s" % ( config.mqttPower, payload ) )
client.publish( config.mqttPower, payload )
#===========================================================================
def msgEnergy( client, linkArgs, config, log ):
data = report.energy( **linkArgs )
msgs = [
# ( topic name, msg dict )
( config.mqttPower, _buildPowerMsg( data, log ) ),
( config.mqttEnergy, _buildEnergyMsg( data, log ) ),
]
for topic, data in msgs:
payload = json.dumps( data )
log.info( "Publish: %s: %s" % ( topic, payload ) )
client.publish( topic, payload )
#===========================================================================
def msgFull( client, linkArgs, config, log ):
data = report.full( **linkArgs )
msgs = [
# ( topic name, msg dict )
( config.mqttPower, _buildPowerMsg( data, log ) ),
( config.mqttEnergy, _buildEnergyMsg( data, log ) ),
( config.mqttFull, _buildFullMsg( data, log ) ),
]
for topic, data in msgs:
payload = json.dumps( data )
client.publish( topic, payload )
#===========================================================================
def _buildPowerMsg( data, log ):
msg = {
"time" : data["time"],
"acPower" : data["acPower"], # in W
"dcPower" : data["dcPower"], # in W
}
log.info( "AC power: %(acPower)s W", msg )
return msg
#===========================================================================
def _buildEnergyMsg( data, log ):
msg = {
"time" : data["time"],
"dailyEnergy" : data["dailyEnergy"] / 1000.0, # Wh -> kWh
"totalEnergy" : data["totalEnergy"] / 1000.0, # Wh -> kWh
}
log.info( "Daily energy: %(dailyEnergy)s kWh", msg )
return msg
#===========================================================================
def _buildFullMsg( data, log ):
return data.__dict__
#===========================================================================
def _initTimes( reports, timeBeg ):
# Set the first time to be the begin time + the polling interval.
for r in reports:
r.nextT = timeBeg + r.poll
# Run the biggest priority report once right at startup.
reports[0].nextT = timeBeg
#===========================================================================

3431
src/python/tHome/sma/tags.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,16 @@
class FakeSocket:
def __init__( self, reply ):
self.sent = None
self.reply = reply
self.closed = False
def send( self, bytes ):
self.sent = bytes
def recv( self, bufLen ):
return self.reply
def close( self ):
self.closed = True

View File

@@ -0,0 +1,51 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestAcMaxPower ( T.util.test.Case ) :
def test_acMaxPower( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 7A 00 10 60 65 1E 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
0B 80 01 02 00 51 01 00 00 00
03 00 00 00 01 1E 41 00 82 22
AF 53 88 13 00 00 88 13 00 00
88 13 00 00 88 13 00 00 01 00
00 00 01 1F 41 00 82 22 AF 53
88 13 00 00 88 13 00 00 00 00
00 00 88 13 00 00 00 00 00 00
01 20 41 00 82 22 AF 53 88 13
00 00 88 13 00 00 00 00 00 00
88 13 00 00 00 00 00 00 00 00
00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.acMaxPower()
l.decode = False
buf, decoder = l.acMaxPower()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
acMaxPower1 = 5000.0,
acMaxPower2 = 5000.0,
acMaxPower3 = 5000.0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,51 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestAcPower ( T.util.test.Case ) :
def test_acPower( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 7A 00 10 60 65 1E 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
10 80 01 02 00 51 07 00 00 00
09 00 00 00 01 40 46 40 86 22
AF 53 B5 07 00 00 B5 07 00 00
B5 07 00 00 B5 07 00 00 01 00
00 00 01 41 46 40 86 22 AF 53
B5 07 00 00 B5 07 00 00 B5 07
00 00 B5 07 00 00 01 00 00 00
01 42 46 40 86 22 AF 53 00 00
00 80 00 00 00 80 00 00 00 80
00 00 00 80 01 00 00 00 00 00
00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.acPower()
l.decode = False
buf, decoder = l.acPower()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
acPower1 = 1973.0,
acPower2 = 1973.0,
acPower3 = 0.0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,44 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestAcTotalEnergy ( T.util.test.Case ) :
def test_acTotalEnergy( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 46 00 10 60 65 11 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
0C 80 01 02 00 54 00 00 00 00
01 00 00 00 01 01 26 00 85 22
AF 53 D0 6A 09 00 00 00 00 00
01 22 26 00 82 22 AF 53 2F 3B
00 00 00 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.acTotalEnergy()
l.decode = False
buf, decoder = l.acTotalEnergy()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
totalEnergy = 617168.0,
dailyEnergy = 15151.0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,43 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestAcTotalPower ( T.util.test.Case ) :
def test_acTotalPower( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 42 00 10 60 65 10 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
12 80 01 02 00 51 00 00 00 00
00 00 00 00 01 3F 26 40 86 22
AF 53 6A 0F 00 00 6A 0F 00 00
6A 0F 00 00 6A 0F 00 00 01 00
00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.acTotalPower()
l.decode = False
buf, decoder = l.acTotalPower()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
acPower = 3946.0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,73 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestAcVoltage ( T.util.test.Case ) :
def test_acVoltage( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 01 22 00 10 60 65 48 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
11 80 01 02 00 51 0A 00 00 00
12 00 00 00 01 48 46 00 86 22
AF 53 A6 2F 00 00 A6 2F 00 00
A6 2F 00 00 A6 2F 00 00 01 00
00 00 01 49 46 00 86 22 AF 53
9B 2F 00 00 9B 2F 00 00 9B 2F
00 00 9B 2F 00 00 01 00 00 00
01 4A 46 00 86 22 AF 53 FF FF
FF FF FF FF FF FF FF FF FF FF
FF FF FF FF 01 00 00 00 01 4B
46 00 86 22 AF 53 43 5F 00 00
43 5F 00 00 43 5F 00 00 43 5F
00 00 01 00 00 00 01 4C 46 00
86 22 AF 53 FF FF FF FF FF FF
FF FF FF FF FF FF FF FF FF FF
01 00 00 00 01 4D 46 00 86 22
AF 53 FF FF FF FF FF FF FF FF
FF FF FF FF FF FF FF FF 01 00
00 00 01 50 46 00 86 22 AF 53
35 3F 00 00 35 3F 00 00 35 3F
00 00 35 3F 00 00 01 00 00 00
01 51 46 00 86 22 AF 53 35 3F
00 00 35 3F 00 00 35 3F 00 00
35 3F 00 00 01 00 00 00 01 52
46 00 86 22 AF 53 FF FF FF FF
FF FF FF FF FF FF FF FF FF FF
FF FF 01 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.acVoltage()
l.decode = False
buf, decoder = l.acVoltage()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
acVoltage1 = 121.98,
acVoltage2 = 121.87,
acVoltage3 = 0,
acGridVoltage = 243.87,
unknown1 = 0,
unknown2 = 0,
acCurrent1 = 16.181,
acCurrent2 = 16.181,
acCurrent3 = 0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,47 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestDcPower ( T.util.test.Case ) :
def test_dcPower( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 5E 00 10 60 65 17 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
0E 80 01 02 80 53 00 00 00 00
01 00 00 00 01 1E 25 40 85 22
AF 53 13 08 00 00 13 08 00 00
13 08 00 00 13 08 00 00 01 00
00 00 02 1E 25 40 85 22 AF 53
21 08 00 00 21 08 00 00 21 08
00 00 21 08 00 00 01 00 00 00
00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.dcPower()
l.decode = False
buf, decoder = l.dcPower()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
dcPower1 = 2067.0,
dcPower2 = 2081.0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,54 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestDcVoltage ( T.util.test.Case ) :
def test_dcVoltage( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 96 00 10 60 65 25 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
0F 80 01 02 80 53 02 00 00 00
05 00 00 00 01 1F 45 40 85 22
AF 53 B1 5E 00 00 B1 5E 00 00
B1 5E 00 00 B1 5E 00 00 01 00
00 00 02 1F 45 40 85 22 AF 53
D5 5E 00 00 D5 5E 00 00 D5 5E
00 00 D5 5E 00 00 01 00 00 00
01 21 45 40 85 22 AF 53 51 21
00 00 51 21 00 00 51 21 00 00
51 21 00 00 01 00 00 00 02 21
45 40 85 22 AF 53 7D 21 00 00
7D 21 00 00 7D 21 00 00 7D 21
00 00 01 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.dcVoltage()
l.decode = False
buf, decoder = l.dcVoltage()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
dcVoltage1 = 242.41,
dcVoltage2 = 242.77,
dcCurrent1 = 8.529,
dcCurrent2 = 8.573,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,43 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestGridFrequency ( T.util.test.Case ) :
def test_gridFrequency( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 42 00 10 60 65 10 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
13 80 01 02 00 51 13 00 00 00
13 00 00 00 01 57 46 00 86 22
AF 53 6C 17 00 00 6C 17 00 00
6C 17 00 00 6C 17 00 00 01 00
00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.gridFrequency()
l.decode = False
buf, decoder = l.gridFrequency()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
frequency = 59.96,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,44 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestGridRelayStatus ( T.util.test.Case ) :
def test_GridRelayStatus( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 4E 00 10 60 65 13 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
0A 80 01 02 80 51 06 00 00 00
06 00 00 00 01 64 41 08 85 22
AF 53 33 00 00 01 37 01 00 00
FD FF FF 00 FE FF FF 00 00 00
00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.gridRelayStatus()
l.decode = False
buf, decoder = l.gridRelayStatus()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
gridStatus = 'Closed',
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,58 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestInfo ( T.util.test.Case ) :
def test_info( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 C6 00 10 60 65 31 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
05 80 01 02 00 58 00 00 00 00
03 00 00 00 01 1E 82 10 82 19
AF 53 53 4E 3A 20 31 39 31 33
30 30 36 30 34 38 00 00 10 00
00 00 10 00 00 00 00 00 00 00
00 00 00 00 01 1F 82 08 82 19
AF 53 41 1F 00 01 42 1F 00 00
FE FF FF 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00
00 00 00 00 01 20 82 08 82 19
AF 53 EE 23 00 00 EF 23 00 00
F0 23 00 00 F1 23 00 01 F2 23
00 00 F3 23 00 00 F4 23 00 00
F5 23 00 00 01 20 82 08 82 19
AF 53 FE FF FF 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False, raw=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.info()
l.decode = False
buf, decoder = l.info()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
name = 'SN: 1913006048',
model = 'SB 5000TLUS-22',
type = 'Solar Inverter',
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,44 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestOperationTime ( T.util.test.Case ) :
def test_OperationTime( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 46 00 10 60 65 11 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
0D 80 01 02 00 54 03 00 00 00
04 00 00 00 01 2E 46 00 85 22
AF 53 00 FA 0E 00 00 00 00 00
01 2F 46 00 85 22 AF 53 42 97
0E 00 00 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.operationTime()
l.decode = False
buf, decoder = l.operationTime()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
operationTime = 981504.0,
feedTime = 956226.0,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,9 @@
#!/usr/bin/env python
import tHome as T
#T.util.log.get( 'sma', 10, 'stdout' )
r = T.sma.report.energy( ip="192.168.1.14" )
print "=================="
print r

View File

@@ -0,0 +1,9 @@
#!/usr/bin/env python
import tHome as T
#T.util.log.get( 'sma', 10, 'stdout' )
r = T.sma.report.full( ip="192.168.1.14" )
print "=================="
print r

View File

@@ -0,0 +1,9 @@
#!/usr/bin/env python
import tHome as T
#T.util.log.get( 'sma', 10, 'stdout' )
r = T.sma.report.power( ip="192.168.1.14" )
print "=================="
print r

View File

@@ -0,0 +1,44 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestStatus ( T.util.test.Case ) :
def test_status( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 4E 00 10 60 65 13 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
08 80 01 02 80 51 00 00 00 00
00 00 00 00 01 48 21 08 82 22
AF 53 23 00 00 00 2F 01 00 00
33 01 00 01 C7 01 00 00 FE FF
FF 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.status()
l.decode = False
buf, decoder = l.status()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
status = 'Ok',
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,43 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestTemp ( T.util.test.Case ) :
def test_temp( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 42 00 10 60 65 10 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
09 80 01 02 00 52 00 00 00 00
00 00 00 00 01 77 23 40 44 22
AF 53 AC 1B 00 00 B6 1B 00 00
B2 1B 00 00 B2 1B 00 00 01 00
00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.temperature()
l.decode = False
buf, decoder = l.temperature()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
temperature = 70.84,
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================

View File

@@ -0,0 +1,44 @@
import unittest
from FakeSocket import FakeSocket
import tHome as T
#===========================================================================
#===========================================================================
class TestVersion ( T.util.test.Case ) :
def test_version( self ):
reply = """
53 4D 41 00 00 04 02 A0 00 00
00 01 00 4E 00 10 60 65 13 90
7D 00 AB 94 40 3B 00 A0 F7 00
E0 27 06 72 00 00 00 00 00 00
04 80 01 02 00 58 07 00 00 00
07 00 00 00 01 34 82 00 94 19
AF 53 00 00 00 00 00 00 00 00
FE FF FF FF FE FF FF FF 04 06
60 02 04 06 60 02 00 00 00 00
00 00 00 00 00 00 00 00
"""
l = T.sma.Link( "fake", connect=False )
try:
l.socket = FakeSocket( T.util.hex.toBytes( reply ) )
o1 = l.version()
l.decode = False
buf, decoder = l.version()
o2 = decoder( buf )
finally:
l.socket = None
right = T.util.Data(
version = '02.60.06.R',
)
print o1
for k in right.keys():
r = right[k]
self.eq( getattr( o1, k ), r, k )
self.eq( getattr( o2, k ), r, k )
#===========================================================================