diff --git a/Package.swift b/Package.swift index 4c18f2b..0d6f77c 100644 --- a/Package.swift +++ b/Package.swift @@ -35,10 +35,12 @@ let package = Package( .target( name: "sma2mqttLibrary", dependencies: [ + "CNative", .product(name: "BinaryCoder", package: "BinaryCoder"), .product(name: "AsyncHTTPClient", package: "async-http-client"), .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "JLog", package: "JLog"), + ], resources: [ .copy("Obis/Resources/obisdefinition.json"), @@ -47,6 +49,10 @@ let package = Package( .copy("SMAPacket/Resources/SMANetPacketDefinitions.json"), ] ), + .target( + name: "CNative", + dependencies: [] + ), .testTarget( name: "sma2mqttTests", dependencies: [ diff --git a/Sources/CNative/include/select.h b/Sources/CNative/include/select.h new file mode 100644 index 0000000..7dbcf3c --- /dev/null +++ b/Sources/CNative/include/select.h @@ -0,0 +1,7 @@ +#include + +__BEGIN_DECLS + +extern void SWIFT_FD_SET(int d, fd_set *set); + +__END_DECLS diff --git a/Sources/CNative/select.c b/Sources/CNative/select.c new file mode 100644 index 0000000..3f715d5 --- /dev/null +++ b/Sources/CNative/select.c @@ -0,0 +1,5 @@ +#include "select.h" + +void SWIFT_FD_SET(int d, fd_set *set) { + FD_SET(d, &(*set)); +} diff --git a/Sources/sma2mqtt/sma2mqtt.swift b/Sources/sma2mqtt/sma2mqtt.swift index da14f7f..df2f3ff 100644 --- a/Sources/sma2mqtt/sma2mqtt.swift +++ b/Sources/sma2mqtt/sma2mqtt.swift @@ -51,13 +51,10 @@ extension JLog.Level: ExpressibleByArgument {} "ac-side/grid-measurements/power:1", "ac-side/measured-values/daily-yield:30", - "immediate/feedin:1", - "immediate/usage:1", - "battery/state-of-charge:20", "battery/battery/temperature:30", - "battery/battery/battery-charge/battery-charge:20", -// "*:0", // all once + "battery/battery-charge/battery-charge:20", +// "*:600", // all once ] func run() async throws { diff --git a/Sources/sma2mqttLibrary/DataObjects/PublishedValue.swift b/Sources/sma2mqttLibrary/DataObjects/PublishedValue.swift index ed79094..a2332fe 100644 --- a/Sources/sma2mqttLibrary/DataObjects/PublishedValue.swift +++ b/Sources/sma2mqttLibrary/DataObjects/PublishedValue.swift @@ -23,11 +23,13 @@ public struct PublishedValue: Encodable public func encode(to encoder: Encoder) throws { - enum CodingKeys: String, CodingKey { case unit, value, scale, id, prio, write, event } + enum CodingKeys: String, CodingKey { case unit, value, scale, id, prio, write, event,date } var container = encoder.container(keyedBy: CodingKeys.self) let objectDefinition = tagTranslator.smaObjectDefinitions[objectID] let compacted = values.compactMap { $0 } + try container.encode(objectID, forKey: .id) +// try container.encode(Date(), forKey: .date) switch compacted.first { diff --git a/Sources/sma2mqttLibrary/SMADevice.swift b/Sources/sma2mqttLibrary/SMADevice.swift index 8310846..c8357f9 100644 --- a/Sources/sma2mqttLibrary/SMADevice.swift +++ b/Sources/sma2mqttLibrary/SMADevice.swift @@ -22,6 +22,7 @@ public actor SMADevice struct QueryObject: Hashable { let objectid: String + let path:String let interval: Int } @@ -38,6 +39,8 @@ public actor SMADevice var objectsToQueryContinously = [String: QueryObject]() var objectsToQueryNext = [QueryElement]() + var lastRequestSentDate = Date.distantPast + let minimumRequestInterval = 1.0 / 20.0 // 1 / maximumRequestsPerSecond let requestAllObjects: Bool @@ -47,6 +50,7 @@ public actor SMADevice let httpClient: HTTPClient private var sessionid: String? + let udpReceiver: UDPReceiver let udpEmitter: UDPEmitter? var udpSystemId: UInt16 = 0xFFFF var udpSerial: UInt32 = 0xFFFF_FFFF @@ -60,7 +64,7 @@ public actor SMADevice private var refreshTask: Task? private var tagTranslator = SMATagTranslator.shared - public init(address: String, userright: UserRight = .user, password: String = "00000", publisher: SMAPublisher? = nil, refreshInterval: Int = 30, interestingPaths: [String: Int] = [:], requestAllObjects: Bool = false, udpEmitter: UDPEmitter? = nil) async throws + public init(address: String, userright: UserRight = .user, password: String = "00000", publisher: SMAPublisher? = nil, refreshInterval: Int = 30, interestingPaths: [String: Int] = [:], requestAllObjects: Bool = false, bindAddress:String = "0.0.0.0", udpEmitter: UDPEmitter? = nil) async throws { self.address = address self.userright = userright @@ -72,6 +76,10 @@ public actor SMADevice self.interestingPaths = interestingPaths self.requestAllObjects = requestAllObjects self.udpEmitter = udpEmitter + + self.udpReceiver = try UDPReceiver(bindAddress: bindAddress, listenPort: 0) + + name = address hasDeviceName = false @@ -114,7 +122,7 @@ public extension SMADevice return nil } - JLog.trace("received udp packet:\(data.hexDump)") + JLog.debug("\(address):received udp packet:\(data.hexDump)") let smaPacket: SMAPacket @@ -199,7 +207,7 @@ public extension SMADevice { if obisvalue.mqtt != .invisible { - try? await publisher?.publish(to: name + "/" + obisvalue.topic, payload: obisvalue.json, qos: .atLeastOnce, retain: obisvalue.mqtt == .retained) + try? await publisher?.publish(to: name + "/" + obisvalue.topic, payload: obisvalue.json, qos: .atMostOnce, retain: obisvalue.mqtt == .retained) } } @@ -236,12 +244,16 @@ public extension SMADevice { let objectToQuery = try getNextRequest() - let timeToWait = objectToQuery.nextReadDate.timeIntervalSinceNow + let nextReadDate = max( objectToQuery.nextReadDate , lastRequestSentDate + minimumRequestInterval ) + + let timeToWait = nextReadDate.timeIntervalSinceNow if timeToWait > 0 { try await Task.sleep(for: .seconds(timeToWait)) } + lastRequestSentDate = Date() + try await udpQueryObject(objectID: objectToQuery.objectid) } @@ -267,7 +279,9 @@ public extension SMADevice } JLog.trace("\(address): sending udp packet:\(packetToSend)") - await udpEmitter?.sendPacket(data: [UInt8](packetToSend.hexStringToData()), address: address, port: 9522) + let packet = try await udpReceiver.sendReceivePacket(data: [UInt8](packetToSend.hexStringToData()), address: address, port: 9522, receiveTimeout: 1.0) + + let _ = await receivedUDPData(packet.data) } } @@ -335,12 +349,13 @@ extension SMADevice name = deviceName } + try await getInformationDictionary(atPath: "/dyn/getDashValues.json") + try await getInformationDictionary(atPath: "/dyn/getAllOnlValues.json") + for objectid in tagTranslator.smaObjectDefinitions.keys { addObjectToQueryContinouslyIfNeeded(objectid: objectid) } -// try await getInformationDictionary(atPath: "/dyn/getDashValues.json") -// try await getInformationDictionary(atPath: "/dyn/getAllOnlValues.json") try? await logout() } @@ -436,11 +451,15 @@ extension SMADevice return nil } - func objectIdIsInteresting(_ objectid: String) -> Int? + func objectIdIsInteresting(_ objectid: String) -> (path:String,interval:Int?) { let path = "/" + (tagTranslator.objectsAndPaths[objectid]?.path ?? "unkown-id-\(objectid)") - return pathIsInteresting(path) + let interval = pathIsInteresting(path) + + JLog.debug("\(address):\(objectid) \(path) interval:\( interval ?? -1 )") + + return (path:path,interval:interval) } @discardableResult @@ -448,9 +467,18 @@ extension SMADevice { JLog.trace("\(address):working on objectId:\(objectid)") - if let interval = objectIdIsInteresting(objectid) + let (path,interval) = objectIdIsInteresting(objectid) + + if let interval { - let queryObject = objectsToQueryContinously[objectid] ?? QueryObject(objectid: objectid, interval: interval) + if let inuse = objectsToQueryContinously.values.first(where:{ $0.path == path }) + { + JLog.notice("\(address): Won't query objectid:\(objectid) - object with same path:\(inuse.objectid) path:\(inuse.path)") + return false + } + JLog.debug("\(address): adding to objectsToQueryContinously objectid:\(objectid) path:\(path) interval:\(interval)") + + let queryObject = objectsToQueryContinously[objectid] ?? QueryObject(objectid: objectid, path:path, interval: interval) if interval <= queryObject.interval { @@ -498,7 +526,7 @@ extension SMADevice if hasDeviceName, addObjectToQueryContinouslyIfNeeded(objectid: objectId.key) { - try await publisher?.publish(to: mqttPath, payload: singleValue.json, qos: .atMostOnce, retain: true) + try await publisher?.publish(to: mqttPath, payload: singleValue.json, qos: .atMostOnce, retain: false) } } catch diff --git a/Sources/sma2mqttLibrary/SMALighthouse.swift b/Sources/sma2mqttLibrary/SMALighthouse.swift index 25ca5cc..e31dbb5 100644 --- a/Sources/sma2mqttLibrary/SMALighthouse.swift +++ b/Sources/sma2mqttLibrary/SMALighthouse.swift @@ -85,7 +85,7 @@ public actor SMALighthouse JLog.debug("Got new SMA Device with remoteAddress:\(remoteAddress)") - let task = Task { try await SMADevice(address: remoteAddress, userright: .user, password: password, publisher: mqttPublisher, interestingPaths: interestingPaths, udpEmitter: mcastReceiver) } + let task = Task { try await SMADevice(address: remoteAddress, userright: .user, password: password, publisher: mqttPublisher, interestingPaths: interestingPaths, bindAddress: bindAddress, udpEmitter: mcastReceiver) } smaDeviceCache[remoteAddress] = .inProgress(task) do diff --git a/Sources/sma2mqttLibrary/SMAPacket/SMAPacketGenerator.swift b/Sources/sma2mqttLibrary/SMAPacket/SMAPacketGenerator.swift index 2417f36..42998a3 100644 --- a/Sources/sma2mqttLibrary/SMAPacket/SMAPacketGenerator.swift +++ b/Sources/sma2mqttLibrary/SMAPacket/SMAPacketGenerator.swift @@ -21,6 +21,8 @@ extension SMAPacketGenerator return try generateCommandPacket(packetcounter: packetcounter, command: command, dstSystemId: dstSystemId, dstSerial: dstSerial) } + private static let localhostname = Host.current().names.filter({ $0 != "localhost" }).sorted(by: { $0.count < $1.count }).first ?? "localhost" + static func generateCommandPacket(packetcounter: Int, command: String, dstSystemId: UInt16 = 0xFFFF, dstSerial: UInt32 = 0xFFFF_FFFF) throws -> String { let jobid = String(format: "%02x", 1) @@ -30,7 +32,7 @@ extension SMAPacketGenerator let dstSysidString = String(format: "%02x%02x", dstSystemId & 0xFF, (dstSystemId & 0xFF00) >> 8) let dstSerialString = String(format: "%02x%02x%02x%02x", dstSerial & 0xFF, (dstSerial >> 8) & 0xFF, (dstSerial >> 16) & 0xFF, (dstSerial >> 24) & 0xFF) - let ownid = String(format: "%04x", generateRandomNumber()) + let ownidString = String(format: "%04x", Self.localhostname.hashValue & 0xFFFF ) let header = """ 534d 4100 0004 02a0 0000 0001 @@ -41,7 +43,7 @@ extension SMAPacketGenerator A0 \(dstSysidString) \(dstSerialString) 00 01 - 1234 \(ownid) 4321 00 + 1234 \(ownidString) 4321 00 \(jobid) \(result) \(remainingpackets) diff --git a/Sources/sma2mqttLibrary/Tools/Extensions.swift b/Sources/sma2mqttLibrary/Tools/Extensions.swift index 2ddee3c..b289810 100644 --- a/Sources/sma2mqttLibrary/Tools/Extensions.swift +++ b/Sources/sma2mqttLibrary/Tools/Extensions.swift @@ -8,6 +8,7 @@ public extension UInt32 { var ipv4String: String { "\(self >> 24).\(self >> 16 & #if os(Linux) public let NSEC_PER_SEC: Int64 = 1_000_000_000 + public let USEC_PER_SEC: Int64 = 1_000_000 #endif public extension Data diff --git a/Sources/sma2mqttLibrary/Tools/MQTTPublisher.swift b/Sources/sma2mqttLibrary/Tools/MQTTPublisher.swift index 73fd668..d6da633 100644 --- a/Sources/sma2mqttLibrary/Tools/MQTTPublisher.swift +++ b/Sources/sma2mqttLibrary/Tools/MQTTPublisher.swift @@ -48,6 +48,7 @@ public actor MQTTPublisher: SMAPublisher { _ = self.mqttClient.connect() } + JLog.debug("publish:\(topic) payload:\(payload)") _ = self.mqttClient.publish(to: topic, payload: byteBuffer, qos: qos, retain: retain) } } diff --git a/Sources/sma2mqttLibrary/Tools/UDPReceiver.swift b/Sources/sma2mqttLibrary/Tools/UDPReceiver.swift new file mode 100644 index 0000000..3d3a25f --- /dev/null +++ b/Sources/sma2mqttLibrary/Tools/UDPReceiver.swift @@ -0,0 +1,199 @@ +// +// MutlicastReceiver.swift +// + +import Foundation +import JLog + +import CNative + +//public struct Packet +//{ +// let data: Data +// let sourceAddress: String +//} + +private enum UDPReceiverError: Error +{ + case socketCreationFailed(Int32) + case socketOptionReuseAddressFailed(Int32) + case socketOptionBroadcastFailed(Int32) + case socketOptionPreventMulticastLoopbackFailed(Int32) + case socketBindingFailed(Int32) + case multicastJoinFailed(Int32) + case receiveError(Int32) + case invalidSocketAddress + case invalidReceiveBuffer + case addressStringConversionFailed(Int32) + case timeoutErrorOuter + case timeoutErrorRecv +} + +class UDPReceiver: UDPEmitter +{ + private let socketFileDescriptor: Int32 + private let bufferSize: Int + private var isListening: Bool = true + private var readSet = fd_set() + + + init(bindAddress: String, listenPort: UInt16, bufferSize: Int = 65536) throws + { + JLog.debug("bindAddress:\(bindAddress) listenPort:\(listenPort)") + self.bufferSize = bufferSize + + socketFileDescriptor = socket(AF_INET, SOCK_DGRAM_VALUE, 0) // IPPROTO_UDP) // 0 , IPPROTO_MTP + guard socketFileDescriptor != -1 else { throw UDPReceiverError.socketCreationFailed(errno) } + + var reuseAddress: Int32 = 1 + guard setsockopt(socketFileDescriptor, SOL_SOCKET, SO_REUSEADDR, &reuseAddress, socklen_t(MemoryLayout.size)) != -1 + else + { + throw UDPReceiverError.socketOptionReuseAddressFailed(errno) + } + + var enableBroadcast: Int32 = 1 + guard setsockopt(socketFileDescriptor, SOL_SOCKET, SO_BROADCAST, &enableBroadcast, socklen_t(MemoryLayout.size)) != -1 + else + { + throw UDPReceiverError.socketOptionBroadcastFailed(errno) + } + + var preventReceivingOwnPacket: Int32 = 0 + guard setsockopt(socketFileDescriptor, Int32(IPPROTO_IP), IP_MULTICAST_LOOP, &preventReceivingOwnPacket, socklen_t(MemoryLayout.size)) != -1 + else + { + throw UDPReceiverError.socketOptionPreventMulticastLoopbackFailed(errno) + } + + var socketAddress = sockaddr_in() + socketAddress.sin_family = sa_family_t(AF_INET) + socketAddress.sin_port = listenPort.bigEndian + socketAddress.sin_addr.s_addr = inet_addr(bindAddress) // INADDR_ANY +// socketAddress.sin_addr.s_addr = INADDR_ANY // INADDR_ANY + + guard bind(socketFileDescriptor, sockaddr_cast(&socketAddress), socklen_t(MemoryLayout.size)) != -1 + else + { + throw UDPReceiverError.socketBindingFailed(errno) + } + + JLog.debug("Started listening on \(bindAddress)") + } + + private nonisolated func sockaddr_cast(_ ptr: UnsafeMutablePointer) -> UnsafeMutablePointer { UnsafeMutableRawPointer(ptr).assumingMemoryBound(to: sockaddr.self) } + + deinit + { + if socketFileDescriptor != -1 { close(socketFileDescriptor) } +// receiveBuffer?.deallocate() + } + + func startListening() + { + guard !isListening else { return } + isListening = true + } + + func stopListening() { isListening = false } + + func shutdown() + { + isListening = false + close(socketFileDescriptor) + } + + func receiveNextPacket(from address:String = "0.0.0.0",port:UInt16 = 0,timeout:Double) async throws -> Packet + { + + let socket = socketFileDescriptor + return try await withUnsafeThrowingContinuation + { continuation in + DispatchQueue.global().async + { + func sockaddr_cast(_ ptr: UnsafeMutablePointer) -> UnsafeMutablePointer { UnsafeMutableRawPointer(ptr).assumingMemoryBound(to: sockaddr.self) } + var socketAddress = sockaddr_in() + socketAddress.sin_family = sa_family_t(AF_INET) + socketAddress.sin_port = port.bigEndian + socketAddress.sin_addr.s_addr = inet_addr(address) + + var socketAddressLength = socklen_t(MemoryLayout.size) + JLog.debug("recvfrom") + + let seconds = time_t(timeout) + var timeout = timeval(tv_sec: seconds, tv_usec: suseconds_t( (timeout - Double(seconds)) * Double(USEC_PER_SEC) )) + var readset:fd_set = fd_set() + + SWIFT_FD_SET(socket, &readset) + + let rv = select(socket+1, &readset, nil, nil, &timeout) + guard rv > 0 else { return continuation.resume(throwing: UDPReceiverError.timeoutErrorRecv) } + + let receiveBuffer = UnsafeMutablePointer.allocate(capacity: self.bufferSize) + + let bytesRead = recvfrom(self.socketFileDescriptor, receiveBuffer, self.bufferSize, 0, sockaddr_cast(&socketAddress), &socketAddressLength) + guard bytesRead != -1 else { continuation.resume(throwing: UDPReceiverError.receiveError(errno)); return } + + var addr = socketAddress.sin_addr // sa.sin_addr + var addrBuffer = [CChar](repeating: 0, count: Int(INET_ADDRSTRLEN)) + guard let addrString = inet_ntop(AF_INET, &addr, &addrBuffer, socklen_t(INET_ADDRSTRLEN)) else { continuation.resume(throwing: UDPReceiverError.addressStringConversionFailed(errno)); return } + + let packet = Packet(data: Data(bytes: receiveBuffer, count: bytesRead), sourceAddress: String(cString: addrString)) + receiveBuffer.deallocate() + + continuation.resume(returning: packet) + } + } + } + + func sendPacket(data: [UInt8], address: String, port: UInt16) + { + var destinationAddress = sockaddr_in() + + destinationAddress.sin_family = sa_family_t(AF_INET) + destinationAddress.sin_port = port.bigEndian + destinationAddress.sin_addr.s_addr = inet_addr(address) + + let genericPointer = withUnsafePointer(to: &destinationAddress) + { + UnsafeRawPointer($0).assumingMemoryBound(to: sockaddr.self) + } + + let sent = sendto(socketFileDescriptor, data, data.count, 0, genericPointer, socklen_t(MemoryLayout.size)) + + if sent < 0 + { + JLog.error("Could not sent Packet") + } + else + { + JLog.debug("Sent Packet successfull to:\(address):\(port) sent:\(sent) == \(data.count)") + } + } + + + func sendReceivePacket(data: [UInt8], address: String, port: UInt16,receiveTimeout:Double) async throws -> Packet + { + + sendPacket(data: data, address: address, port: port) + let startDate = Date() + + let endTime = Date.init(timeIntervalSinceNow: receiveTimeout) + + while( endTime.timeIntervalSinceNow > 0 ) + { + let packet = try await receiveNextPacket(from:address,port:port,timeout:receiveTimeout) + + if packet.sourceAddress == address + { + JLog.debug("\(address):answered in \(startDate.timeIntervalSinceNow * -1000.0)ms") + return packet + } + else + { + JLog.notice("received packet from\(packet.sourceAddress) expected from:\(address) - ignoring") + } + } + throw UDPReceiverError.timeoutErrorRecv + } +}