;/---------------------------
;|
;| MQTT Broker Framework
;| V1.14
;| 21.05.2024
;|
;| Added MQTTInfo Messages to signal, when thread has been started and when it has been ended
;|
;| Supports MQTT <= 3.1.1
;| http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
;|
;| only ONE Server supported!
;| Usually I prepare my codes for unforseen scenarios, but here I couldn't imagine,
;| that anyone needs more than one MQTT broker
;| But now, since I've added TLS functionality, it might have been better to
;| offer more than just one.
;| Then you could start one broker listening on non-TLS and one on TLS
;| But well... shit happens :)
;| It's still just a little tinkering with MQTT, the source is here
;| add what you need!
;| (or simply start it twice!)
;|
;| (c)HeX0R
;|
;| Had been done because of boredom within three days.
;| I saw too late, that there is MQTT V5.00 already.
;| => http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
;|
;| New in V1.10 (24.12.2022)
;| -Code clean-up, private procedures are now prefixed with _
;|
;| New in V1.11 (14.01.2023)
;| -prepared for TLS action
;| see TLS.pbi here: https://www.purebasic.fr/english/viewtopic.php?p=593738#p593738
;| You need a constant defined, which containes the path to TLS.pbi and is named: #TLS_PBI
;| I simply set that in the project settings (where needed)
;| If none is set, TLS is deactivated
;| Take also a look at new (TLS) parameters in InitServer()
;| You need also a compiled libtls-xx.dll or *.a
;| Or use one from idle here:
;| https://www.purebasic.fr/english/viewtopic.php?p=593079#p593079
;|
;| New in V1.12 (06.04.2024)
;| -handling payloads of size 0 correctly
;|
;| New in V1.13 (11.05.2024)
;| -dynamic delay, when no incoming network traffic, might need some fine tuning
;|
;| New in V1.14 (21.05.2024)
;| - added some missing #PB_ByteLength
;|
;\---------------------------
; ----------------------------------------------------------------------------
; "THE BEER-WARE LICENSE":
; <HeX0R@coderbu.de> wrote this file. as long as you retain this notice you
; can do whatever you want with this stuff. If we meet some day, and you think
; this stuff is worth it, you can buy me a beer in return
; or use this =>
; https://send-beer.to/hex0r/
; Or just go out and drink a few on your own/with your friends ;)
;=============================================================================
CompilerIf #PB_Compiler_Thread = 0
CompilerError "Please enable Thread-Safe Options!"
CompilerEndIf
XIncludeFile "MQTT_Common.pbi"
DeclareModule MQTT_BROKER
UseMD5Fingerprint()
Enumeration AccessFlags
#AccessFlag_NoEncryption ;passwords in clean text
#AccessFlag_PasswordMD5 ;passwords are md5 hashes
#AccessFlag_BothMD5 ;passwords and username are md5 hashes
;add more if needed to _IsClientAllowed()
EndEnumeration
;----public Structures
;{
Structure SERVER_ACCESS
Username.s
Password.s
Flag.b
EndStructure
Structure SERVER_TLS
Protocols.i ;set it to one (or more) of the #PB_Network_TLSxxx constants
CertFile.s ;path and filename, where the TLS certificate is
CertKey.s ;path and filename, where the public Key file is
CertCa.s ;path and filename of a cacerts file
;see => https://docs.microfocus.com/SM/9.52/Hybrid/Content/security/concepts/what_is_a_cacerts_file.htm
EndStructure
Structure LAST_WILL_SERVER
Flag.b
Topic.s
MessageBase64.s ;The WillMessage will be stored as base64, no matter if #Use_Base64_PayLoad is #True or #False
; I don't expect very huge WillMessages...
QoS.b
Retain.b
EndStructure
Structure SERVER_INIT
Port.i ;if empty, 1883 will be used
BindIP.s ;if empty, broker will listen on ALL ips
ClientTimeOUT.i ;if empty, new clients will timeout after 5s
LogWindow.i ;Window, where broker will send log data to
LogWindowEvent.i ;WindowEvent, broker will use, when sending data to LogWindow
;if both LogWindow and LogWindowEvent are empty, no messages will be send to anywhere
LogFile.s ;Broker will use that to log data into a file, or leave empty to log nothing
PersistantStoragePath.s ;Broker will store persistant data (when it went offline) in this path, or nowhere if nothing provided
InitialBufferSize.i ;if empty, initial buffer size will be 65536Bytes, the buffer will increase anyway, if packets are bigger
TLS.SERVER_TLS ;TLS encryption
List Access.SERVER_ACCESS() ;a list which contains allowed usernames/passwords.
;See SERVER_ACCESS structure and AccessFlags above.
;Keep the list empty to allow anonymous connections
EndStructure
;}
;----public Procedures
;{
Declare InitServer(*Config.SERVER_INIT, RunServerAlso.i = #False)
; use that to initialize the server first
; set the SERVER_INIT structure to fit your needs.
; When RunServerAlso is #True, it will also start-up the server (no need to call StartServer())
Declare StartServer() ;Broker will be started with that command
Declare StopServer() ;Broker will be stopped with that command
Declare DeInitServer() ;all resources of the server will be removed, it will also stop the server (in case it is still running)
Declare _PublishViaServer(Topic.s, *Payload, PayLoadLength.i, QoS = 0, PayLoadType = MQTT_Common::#PayloadType_UnicodeString)
; for testing purposes only!
; a broker usually doesn't publish on its own, only clients publish topics
Declare _ClearServerData() ;will remove resources, which had been stored for persistant sessions
; can be only called, when server is deinitialized.
Declare GetLastError() ;get the last occured error, see Errors Enumeration in MQTT_Common.pbi
;}
EndDeclareModule
Module MQTT_BROKER
EnableExplicit
UseModule MQTT_Common
CompilerIf Defined(TLS_PBI, #PB_Constant)
Global LIBTLSFILE$ = "libtls-26.dll"
IncludeFile #TLS_PBI
;https://hex0rs.coderbu.de/cgi-bin/hv.cgi?version=-1&id=93&ia=1917
CompilerEndIf
;----private Structures
;{
Structure connPACKETs Extends PACKET
; connPACKETS are only valid while a new client connected, until it had authenticated itself correctly.
;
Level.b ;revision level of the protocol, this module here can work with level <= 4, where 4 equals V3.1.1 of the protocol
ConnFlags.a ;The Connect Flags byte contains a number of parameters specifying the behavior of the MQTT connection.
; It also indicates the presence or absence of fields in the payload.
CleanSession.b ;This byte specifies the handling of the Session state.
; the Client and Server can store Session state to enable reliable messaging to continue across a sequence of Network Connections.
; This bit is used to control the lifetime of the Session state.
KeepAlive.l ;The Keep Alive is a time interval measured in seconds.
; Expressed as a 16-bit word, it is the maximum time interval that is permitted to elapse between
; the point at which the Client finishes transmitting one Control Packet and the point it starts sending the next.
; It is the responsibility of the Client to ensure that the interval between
; control packets being sent does not exceed the Keep Alive value.
ClientIdentifier.s ;The Client Identifier (ClientId) identifies the Client to the Server.
; It is part of the payload
Will.LAST_WILL_SERVER ;Willxxx... WTF?? => https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/
Password.s ;Password transmitted by Client to make sure it can connect to our server
Username.s ;Username transmitted by Client to make sure it can connect to our server
ClientID.i ;That's the OS network ID
TimeOUT.i ;Make sure, there are no corpses of connPACKETs
*Buffer ;Memory Buffer
BufferPos.i ;Memory Buffer current position
EndStructure
Structure Publishes
Topic.s
*Payload
PayLoadLength.i
PayLoadBase64.s
Qos.b
EndStructure
Structure Session ;one Session equals one MQTT client
ClientID.i
ClientIdentifier.s
Level.b ;<- might be useful in future, if we decide to support MQTT version 5 also
*Buffer
BufferPos.i
Persistant.b
Will.LAST_WILL_SERVER
KeepAlive.l
ConnFlags.a
LastActivity.q
SessionState.b
Map PacketIdentifiersUsed.b()
List Subscriptions.Filter()
List Packets.PACKET()
EndStructure
Structure _SERVER_
InitNetwork.i
IsInit.i
LogWindow.i
LogWindowEvent.i
LogFile.s
Mutex.i
Semaphore.i
PersistantStoragePath.s
ThreadID.i
StopIt.i
Port.i
BindIP.s
Error.i
TimeOUT.i
InitBufferSize.i
TLS.SERVER_TLS
List PublishToClient.Publishes() ;<- with those messages you can publish topics from the server to clients
; not really part of MQTT, it is used for testing purposes
List Sessions.Session() ;<- those are the established sessions from clients
List RetainedMessages.Publishes() ;<- store Published messages, which should retain
List Access.SERVER_ACCESS() ;<- keeps userdata and passwords which are allowed to connect
EndStructure
;}
Global SERVER._SERVER_
;----private Procedures
;{
Procedure _LogServerAction(ClientIdentifier.s, Type.i, Topic.s = "", *Payload = 0, PayLoadLength = 0, ErrorText.s = "", Error = 0, QoS = 0, Retain = 0, DUP = 0, PacketIdentifier = 0)
Protected *send.MQTT_EVENTDATA, Size, Strings.s, FID, i, Payload.s, *Buffer
;Will log incoming/outgoing events
;Can send messages to a window, and/or into a log file
If IsWindow(SERVER\LogWindow) And SERVER\LogWindowEvent
Strings = ClientIdentifier + #ESC$ + Topic + #ESC$ + ErrorText
Size = SizeOf(MQTT_EVENTDATA) + StringByteLength(Strings, #PB_UTF8) + 1
*send = AllocateMemory(Size)
*send\Type = Type
*send\Error = Error
*send\QoS = QoS
*send\Retain = Retain
*send\DUP = DUP
*send\PacketIdentifier = PacketIdentifier
*send\PayLoadLength = PayLoadLength
If PayLoadLength
*send\PayLoad = AllocateMemory(PayLoadLength)
CompilerIf #USE_BASE64_PAYLOAD
Base64Decoder(PeekS(*Payload), *send\PayLoad, PayLoadLength)
CompilerElse
CopyMemory(*Payload, *send\PayLoad, *send\PayLoadLength)
CompilerEndIf
EndIf
PokeS(*send + OffsetOf(MQTT_EVENTDATA\D), Strings, -1, #PB_UTF8)
PostEvent(SERVER\LogWindowEvent, SERVER\LogWindow, 0, 0, *send)
EndIf
If SERVER\LogFile
FID = OpenFile(#PB_Any, SERVER\LogFile, #PB_UTF8)
If FID
FileSeek(FID, Lof(FID))
Select Type
Case #MQTTEvent_ClientConnected
Strings = "Client [" + ClientIdentifier + "] connected!"
Case #MQTTEvent_ClientDisconnected
Strings = "Client [" + ClientIdentifier + "] disconnected!"
Case #MQTTEvent_InfoPublished
If PayLoadLength
CompilerIf #USE_BASE64_PAYLOAD
*Buffer = AllocateMemory(PayLoadLength)
If *Buffer
i = Base64Decoder(PeekS(*Payload), *Buffer, MemorySize(*Buffer))
Payload = PeekS(*Buffer, i, #PB_UTF8 | #PB_ByteLength)
FreeMemory(*Buffer)
EndIf
CompilerElse
Payload = PeekS(*Payload, PayLoadLength, #PB_UTF8 | #PB_ByteLength)
CompilerEndIf
EndIf
Strings = "Client [" + ClientIdentifier + "] Published Topic: " + Topic + ", Payload: " + Payload
Case #MQTTEvent_Subscription
Strings = "Client [" + ClientIdentifier + "] Subscribed to Topic: " + Topic
Case #MQTTEvent_Error
Strings = "[ERROR!] " + ErrorText
Case #MQTTEvent_Info
Strings = "[INFO!] " + ErrorText
EndSelect
WriteStringN(FID, FormatDate("[%dd.%mm %hh:%ii:%ss] ", Date()) + Strings)
CloseFile(FID)
EndIf
EndIf
EndProcedure
Procedure _SetGetUniqePacketIdentifiers(Set = -1, Remove = #False)
Protected Result.u, i, Size
;Multi procedure for unique packet identifiers.
;If Set > -1 it will add a packet identifier (sent by client)
;If Set > -1 AND Remove = #true, it will remove an identifier
;If Set = -1 and Remove = #false, it will return the next unused packet identifier (and store it in the map)
;packet identifiers have to be unique between broker and client, but NOT accross all clients!
With SERVER\Sessions()
If Set <> -1
If Remove
DeleteMapElement(\PacketIdentifiersUsed(), Str(Set))
Else
\PacketIdentifiersUsed(Str(Set)) = #True
EndIf
Else
Size = MapSize(\PacketIdentifiersUsed()) + 1
Dim w.u(Size)
ForEach \PacketIdentifiersUsed()
If Val(MapKey(\PacketIdentifiersUsed())) < Size
w(Val(MapKey(\PacketIdentifiersUsed()))) = #True
EndIf
Next
For i = 1 To Size
If W(i) = #False
Result = i
Break
EndIf
Next i
\PacketIdentifiersUsed(Str(Result)) = #True
EndIf
EndWith
ProcedureReturn Result
EndProcedure
Procedure _GetPacketSize(*Stream.HEADER, Size, *i.INTEGER = #False, *reallength.INTEGER = #False)
Protected i, Roll, Result
;get length of a received packet, or -1 if it is not complete, yet
;*Stream => is the Buffer received
;Size => is the length we received until now
;*i.INTEGER => will be set to the amount of bytes needed to store the length information
; MQTT uses a dynamical amount of bytes (1...4) to store the packetlength
;*reallength.INTEGER => will be set to the packet length in case Result = -1
; so, if Result = -1, but *reallength\i would be e.g. 100000
; we know, that this packet has 100000bytes, but is not complete, yet.
; we can also check then, if our receiving buffer would be huge enough for that packet and resize it if needed.
For i = 0 To 3
If Size < SizeOf(Header) + 1 + i
Result = -1
Break
EndIf
Result | ((*Stream\bytes[i] & $7F) << Roll)
If *Stream\bytes[i] & $80 = 0
i + 1
Break
EndIf
Roll + 7
Next i
If Result <> -1
If Size < SizeOf(HEADER) + i + Result
If *reallength
*reallength\i = Result
EndIf
Result = -1
EndIf
EndIf
If Result <> -1 And *i
*i\i = i
EndIf
ProcedureReturn Result
EndProcedure
Procedure _SetStreamLength(*Stream.HEADER, Length)
Protected Result, More
;more or less the opposite of _GetPacketSize() to set the length of a packet (when sending packets)
Repeat
If Length > 127
More = $80
Else
More = 0
EndIf
*Stream\bytes[Result] = (Length & $7F) | More
Length >> 7
Result + 1
Until Length = 0
ProcedureReturn Result
EndProcedure
Procedure _Conn_PacketDataReceived(*Stream.HEADER, Size, *P.connPACKETs)
Protected RemainingLength, CurrPos, L, Length
;When a MQTT client connected to a broker, the broker doesn't respond yet.
;It waits for the CONNECT packet from the client, which will contain username/password for authentication and some other flags
;This procedure will do nothing but handle those CONNECT packets.
*P\Flags = *Stream\PacketType & $F
*P\Type = (*Stream\PacketType & $F0) >> 4
*P\PacketState = #PacketState_Incoming
Length = _GetPacketSize(*Stream, Size, @CurrPos)
If Length = -1
ProcedureReturn 0
EndIf
RemainingLength = Length
If *P\Type <> #CONNECT
;New clients MUST send a connect packet first, and NOTHING else
ProcedureReturn 0
EndIf
If *P\Flags <> 0 Or *Stream\bytes[CurrPos + 1] <> 4 Or *Stream\bytes[CurrPos + 2] <> Asc("M") Or *Stream\bytes[CurrPos + 3] <> Asc("Q") Or
*Stream\bytes[CurrPos + 4] <> Asc("T") Or *Stream\bytes[CurrPos + 5] <> Asc("T")
ProcedureReturn 0
EndIf
*P\Level = *Stream\bytes[CurrPos + 6]
*P\ConnFlags = *Stream\bytes[CurrPos + 7]
*P\CleanSession = (*P\ConnFlags & $02) >> 1
*P\Will\Flag = (*P\ConnFlags & $04) >> 2
If *P\Will\Flag
If *P\ConnFlags & $18
;Will QoS
*P\Will\QoS = *P\ConnFlags >> 3
*P\Will\QoS = *P\Will\QoS & $03
EndIf
If *P\ConnFlags & $20
;Will Retain
*P\Will\Retain = *P\ConnFlags >> 5
*P\Will\Retain = *P\Will\Retain & $01
EndIf
EndIf
CurrPos + 8
RemainingLength - 8
*P\KeepAlive = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
RemainingLength - 2
CurrPos + 2
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
*P\ClientIdentifier = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
If *P\ConnFlags & $04
;Will Flag
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
*P\Will\Topic = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
If L > 0
*P\Will\MessageBase64 = Base64Encoder(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L)
CurrPos + L
RemainingLength - L
EndIf
EndIf
If *P\ConnFlags & $80
;Username
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
If L > 0
*P\Username = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
EndIf
EndIf
If *P\ConnFlags & $40
;Password
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
If L > 0
*P\Password = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
EndIf
EndIf
ProcedureReturn #True
EndProcedure
Procedure _PacketDataReceived(*Stream.HEADER, Size, *P.PACKET)
Protected RemainingLength, CurrPos, L, Topic.s, Length
;This procedure will pre-handle all other packets, which have been received from a client
*P\Flags = *Stream\PacketType & $F
*P\Type = (*Stream\PacketType & $F0) >> 4
*P\PacketState = #PacketState_Incoming
Length = _GetPacketSize(*Stream, Size, @CurrPos)
If Length = -1
ProcedureReturn 0
EndIf
*P\PayLoad\PayLoadBase64 = ""
RemainingLength = Length
ClearList(*P\tmpSubsc())
Select *P\Type
Case #PINGREQ, #DISCONNECT
;nothing to do
Case #PUBACK, #PUBREC, #PUBREL, #PUBCOMP, #SUBSCRIBE, #SUBACK, #UNSUBSCRIBE, #UNSUBACK
*P\PacketIdentifier = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
Select *P\Type
Case #SUBSCRIBE
While RemainingLength > 0
AddElement(*P\tmpSubsc())
*P\tmpSubsc()\Add = 1
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
*P\tmpSubsc()\Topic = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
*P\tmpSubsc()\QoS = *Stream\bytes[CurrPos] & $03
CurrPos + 1
RemainingLength - 1
Wend
Case #UNSUBSCRIBE
While RemainingLength > 0
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
Topic = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
AddElement(*P\tmpSubsc())
*P\tmpSubsc()\Topic = Topic
Wend
EndSelect
Case #PUBLISH
*P\Retain = *P\Flags & $01
*P\QoS = (*P\Flags & $06) >> 1
*P\DUP = (*P\Flags & $08) >> 3
L = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
*P\TopicName = PeekS(*Stream + OffsetOf(HEADER\bytes) + CurrPos, L, #PB_UTF8 | #PB_ByteLength)
CurrPos + L
RemainingLength - L
If *P\QoS = 1 Or *P\QoS = 2
*P\PacketIdentifier = *Stream\bytes[CurrPos + 1] + (*Stream\bytes[CurrPos] * 256)
CurrPos + 2
RemainingLength - 2
EndIf
If RemainingLength > 0
CompilerIf #USE_BASE64_PAYLOAD
*P\PayLoad\BufferLengh = RemainingLength
*P\PayLoad\PayLoadBase64 = Base64Encoder(*Stream + OffsetOf(HEADER\bytes) + CurrPos, RemainingLength)
CompilerElse
*P\PayLoad\BufferLengh = RemainingLength
*P\PayLoad\Buffer = AllocateMemory(RemainingLength)
CopyMemory(*Stream + OffsetOf(HEADER\bytes) + CurrPos, *P\PayLoad\Buffer, RemainingLength)
CompilerEndIf
EndIf
Default
;no other types should be possible here
ProcedureReturn 0
EndSelect
ProcedureReturn #True
EndProcedure
Procedure _Send_Connack(CID, SP, ReturnCode)
Protected CP.MINI_PACKET
;simple procedure to send a CONNACK (connection acknoledged) back to the clients
CP\a[0] = #CONNACK << 4
CP\a[1] = 2
CP\a[2] = SP
CP\a[3] = ReturnCode
SendNetworkData(CID, @CP, 4)
EndProcedure
Procedure _PublishViaServer(Topic.s, *Payload, PayLoadLength.i, QoS = 0, PayLoadType = #PayloadType_UnicodeString)
Protected *Buff, s.s
;for debugging purposes only
LockMutex(SERVER\Mutex)
LastElement(SERVER\PublishToClient())
AddElement(SERVER\PublishToClient())
SERVER\PublishToClient()\Topic = Topic
If PayLoadLength
CompilerIf #USE_BASE64_PAYLOAD
Select PayLoadType
Case #PayloadType_base64
SERVER\PublishToClient()\PayLoadBase64 = PeekS(*Payload)
SERVER\PublishToClient()\PayLoadLength = PayLoadLength
Case #PayloadType_UnicodeString
;MQTT uses UTF8 strings!
s = PeekS(*Payload, PayLoadLength)
*Buff = UTF8(s)
SERVER\PublishToClient()\PayLoadBase64 = Base64Encoder(*Buff, MemorySize(*Buff))
SERVER\PublishToClient()\PayLoadLength = StringByteLength(s, #PB_UTF8)
FreeMemory(*Buff)
Case #PayloadType_UTF8String
SERVER\PublishToClient()\PayLoadLength = MemoryStringLength(*Payload, #PB_UTF8 | #PB_ByteLength)
SERVER\PublishToClient()\PayLoadBase64 = Base64Encoder(*Payload, SERVER\PublishToClient()\PayLoadLength)
Case #PayloadType_Buffer
SERVER\PublishToClient()\PayLoadLength = PayLoadLength
SERVER\PublishToClient()\PayLoadBase64 = Base64Encoder(*Payload, PayLoadLength)
EndSelect
CompilerElse
Select PayLoadType
Case #PayloadType_base64
SERVER\PublishToClient()\Payload = AllocateMemory(PayLoadLength)
SERVER\PublishToClient()\PayLoadLength = Base64Decoder(PeekS(*Payload), SERVER\PublishToClient()\Payload, PayLoadLength)
Case #PayloadType_UnicodeString
;MQTT uses UTF8 strings!
s = PeekS(*Payload, PayLoadLength)
*Buff = UTF8(s)
SERVER\PublishToClient()\PayLoad = *Buff
SERVER\PublishToClient()\PayLoadLength = StringByteLength(s, #PB_UTF8)
Case #PayloadType_UTF8String
SERVER\PublishToClient()\PayLoadLength = MemoryStringLength(*Payload, #PB_UTF8 | #PB_ByteLength)
SERVER\PublishToClient()\PayLoad = AllocateMemory(SERVER\PublishToClient()\PayLoadLength)
CopyMemory(*Payload, SERVER\PublishToClient()\PayLoad, SERVER\PublishToClient()\PayLoadLength)
Case #PayloadType_Buffer
SERVER\PublishToClient()\PayLoadLength = PayLoadLength
SERVER\PublishToClient()\PayLoad = AllocateMemory(PayLoadLength)
CopyMemory(*Payload, SERVER\PublishToClient()\PayLoad, PayLoadLength)
EndSelect
CompilerEndIf
SERVER\PublishToClient()\QoS = QoS
EndIf
SignalSemaphore(SERVER\Semaphore)
UnlockMutex(SERVER\Mutex)
EndProcedure
Procedure _TopicMatches(Topic.s, FilterContainingWildcards.s)
Protected *T.CHARACTER, *F.CHARACTER, Result
;procedure to check if a subscription matches a published topic
;it includes the MQTT wildcard handling
If FilterContainingWildcards = "" Or Topic = ""
ProcedureReturn 0
EndIf
*T = @Topic
*F = @FilterContainingWildcards
While *T\c <> 0
If *F\c = '#'
Result = #True
Break
ElseIf *F\c = '+'
;skip Topic word
*F + SizeOf(CHARACTER)
While *T\c <> '/' And *T\c <> 0
*T + SizeOf(CHARACTER)
Wend
If *T\c = 0
Break
EndIf
EndIf
If *F\c <> *T\c
Break
EndIf
*F + SizeOf(CHARACTER)
*T + SizeOf(CHARACTER)
Wend
If Result = #False And *F\c = *T\c
Result = #True
EndIf
ProcedureReturn Result
EndProcedure
Procedure _ClearPacket()
;make sure the packet is cleared gracefully
CompilerIf #USE_BASE64_PAYLOAD = #False
If SERVER\Sessions()\Packets()\PayLoad\Buffer
FreeMemory(SERVER\Sessions()\Packets()\PayLoad\Buffer)
EndIf
CompilerEndIf
If SERVER\Sessions()\Packets()\NYS\Buffer
FreeMemory(SERVER\Sessions()\Packets()\NYS\Buffer)
EndIf
DeleteElement(SERVER\Sessions()\Packets())
EndProcedure
Procedure _AddSessionPacket(Type, Topic.s = "", *Payload = 0, PayloadLength = 0, QoS = 0, PacketIdentifier = 0, DUP = 0, Retain = 0)
;simple procedure to add an outgoing packet to the queue
PushListPosition(SERVER\Sessions()\Packets())
LastElement(SERVER\Sessions()\Packets())
AddElement(SERVER\Sessions()\Packets())
With SERVER\Sessions()\Packets()
\PacketState = #PacketState_OutgoingNotSendYet
\Type = Type
\PacketIdentifier = PacketIdentifier
\DUP = DUP
\TopicName = Topic
If PayloadLength > 0
CompilerIf #USE_BASE64_PAYLOAD
\PayLoad\BufferLengh = PayloadLength
\PayLoad\PayLoadBase64 = PeekS(*Payload)
CompilerElse
\PayLoad\Buffer = AllocateMemory(PayloadLength)
\PayLoad\BufferLengh = PayloadLength
CopyMemory(*Payload, \PayLoad\Buffer, PayloadLength)
CompilerEndIf
EndIf
\QoS = QoS
\Retain = Retain
EndWith
PopListPosition(SERVER\Sessions()\Packets())
EndProcedure
Procedure _IsSocketBlocked()
Protected Result
;in heavy network traffic times, we might see a WSAEWOULDBLOCK
CompilerSelect #PB_Compiler_OS
CompilerCase #PB_OS_Windows
If WSAGetLastError_() = #WSAEWOULDBLOCK
Result = #True
EndIf
CompilerCase #PB_OS_Linux
; If PeekL(errno_location()) = #EWOULDBLOCK
; Result = #True
; EndIf
CompilerEndSelect
ProcedureReturn Result
EndProcedure
Procedure _SendCommand()
Protected CP.MINI_PACKET, Result
Protected *Buff.HEADER, *Buff2, CurrPos, i, Size, *Payload
Protected Length, LengthShould, LengthIs
;main sending procedure
;will make sure, that packets get send completely
If SERVER\Sessions()\SessionState = #SessionState_inactive
If SERVER\Sessions()\Packets()\QoS = 0
;run out of luck
_ClearPacket()
EndIf
Else
With SERVER\Sessions()\Packets()
If \NYS\Buffer
*Buff = \NYS\Buffer
\NYS\Buffer = 0
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, *Buff, \NYS\BufferLength)
LengthShould = \NYS\BufferLength
Else
Select \Type
Case #PINGRESP
CP\a[0] = #PINGRESP << 4
CP\a[1] = 0
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, @CP, 2)
LengthShould = 2
Case #SUBACK
If \PayLoad\BufferLengh
CompilerIf #USE_BASE64_PAYLOAD
*Buff2 = AllocateMemory(StringByteLength(\PayLoad\PayLoadBase64))
If *Buff2
Size = Base64Decoder(\PayLoad\PayLoadBase64, *Buff2, 1024)
*Buff = AllocateMemory(6 + Size)
If *Buff
*Buff\PacketType = #SUBACK << 4
CurrPos = _SetStreamLength(*Buff, 2 + Size)
*Buff\bytes[CurrPos + 1] = \PacketIdentifier & $FF
*Buff\bytes[CurrPos] = (\PacketIdentifier & $FF00) >> 8
CurrPos + 2
i = CurrPos
CopyMemory(*Buff2, *Buff + OffsetOf(HEADER\bytes) + CurrPos, Size)
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, *Buff, 1 + CurrPos + Size)
LengthShould = 1 + CurrPos + Size
;FreeMemory(*Buff)
EndIf
FreeMemory(*Buff2)
EndIf
CompilerElse
Size = \PayLoad\BufferLengh
*Buff = AllocateMemory(6 + Size)
If *Buff
*Buff\PacketType = #SUBACK << 4
CurrPos = _SetStreamLength(*Buff, 2 + Size)
*Buff\bytes[CurrPos + 1] = \PacketIdentifier & $FF
*Buff\bytes[CurrPos] = (\PacketIdentifier & $FF00) >> 8
CurrPos + 2
i = CurrPos
CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, Size)
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, *Buff, 1 + CurrPos + Size)
LengthShould = 1 + CurrPos + Size
; FreeMemory(*Buff)
EndIf
CompilerEndIf
EndIf
Case #PUBACK, #PUBREC, #PUBCOMP, #UNSUBACK
CP\a[0] = \Type << 4
CP\a[1] = 2
CP\a[2] = (\PacketIdentifier & $FF00) >> 8
CP\a[3] = \PacketIdentifier & $FF
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, @CP, 4)
LengthShould = 4
Case #PUBREL
CP\a[0] = 2 | (\Type << 4)
CP\a[1] = 2
CP\a[2] = (\PacketIdentifier & $FF00) >> 8
CP\a[3] = \PacketIdentifier & $FF
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, @CP, 4)
LengthShould = 4
Case #PUBLISH
If \PayLoad\BufferLengh >= 0
i = \PayLoad\BufferLengh
EndIf
Length = StringByteLength(\TopicName, #PB_UTF8) + i + 2
If \QoS > 0
Length + 2
EndIf
*Buff = AllocateMemory(64 + Length)
If *Buff
*Buff\PacketType = \Type << 4
*Buff\PacketType = *Buff\PacketType | (\QoS << 1)
*Buff\PacketType = *Buff\PacketType | (\DUP << 3)
*Buff\PacketType = *Buff\PacketType | \Retain
CurrPos = _SetStreamLength(*Buff, Length)
Length = StringByteLength(\TopicName, #PB_UTF8)
*Buff\bytes[CurrPos + 1] = Length & $FF
*Buff\bytes[CurrPos] = (Length & $FF00) >> 8
CurrPos + 2
PokeS(*Buff + OffsetOf(HEADER\bytes) + CurrPos, \TopicName, -1, #PB_UTF8)
CurrPos + StringByteLength(\TopicName, #PB_UTF8)
If \Qos > 0
*Buff\bytes[CurrPos + 1] = \PacketIdentifier & $FF
*Buff\bytes[CurrPos] = (\PacketIdentifier & $FF00) >> 8
CurrPos + 2
EndIf
If i > 0
CompilerIf #USE_BASE64_PAYLOAD
Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
CompilerElse
CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
CompilerEndIf
EndIf
CurrPos + i
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, *Buff, CurrPos + 1)
LengthShould = CurrPos + 1
; FreeMemory(*Buff)
EndIf
EndSelect
EndIf
If LengthIs = LengthShould
If LengthIs > 4
FreeMemory(*Buff)
EndIf
\NYS\Buffer = 0
Result = #SendFinished
ElseIf LengthIs = -1
If _IsSocketBlocked()
\NYS\Buffer = AllocateMemory(LengthShould)
\NYS\BufferLength = LengthShould
If LengthShould <= 4
CopyMemory(@CP, \NYS\Buffer, \NYS\BufferLength)
Else
CopyMemory(*Buff, \NYS\Buffer, \NYS\BufferLength)
FreeMemory(*Buff)
EndIf
Result = #SendNotFinished
Else
;freak out
Result = #SendFailed
If LengthShould > 4
FreeMemory(*Buff)
EndIf
\NYS\Buffer = 0
EndIf
Else
;packet not finished
\NYS\Buffer = AllocateMemory(LengthShould - LengthIs)
\NYS\BufferLength = LengthShould - LengthIs
If LengthShould <= 4
CopyMemory(@CP + LengthIs, \NYS\Buffer, \NYS\BufferLength)
Else
CopyMemory(*Buff + LengthIs, \NYS\Buffer, \NYS\BufferLength)
FreeMemory(*Buff)
EndIf
Result = #SendNotFinished
EndIf
EndWith
EndIf
ProcedureReturn Result
EndProcedure
Procedure _CheckSessionPackets()
Protected *Session, *P.PACKET, Topic.s, *Payload, PayLoadLength, QoS, QoSR, Identifier, Found, a$
Protected *Buffer, i, l, OwnIdentifier, DUP, Retain, PayLoadBase64.s, R
;quite huge procedure, where all the packets will be handled.
ForEach SERVER\Sessions()
If SERVER\Sessions()\SessionState = #SessionState_inactive
Continue
EndIf
ForEach SERVER\Sessions()\Packets()
With SERVER\Sessions()\Packets()
If \ReSendAt > 0 And \ReSendAt > ElapsedMilliseconds()
;not yet
Continue
EndIf
If \WaitForAnswerSince > 0
If \WaitForAnswerSince > ElapsedMilliseconds() + 2000
\WaitForAnswerSince = 0
\PacketState = #PacketState_OutgoingNotSendYet
Else
Continue
EndIf
EndIf
Select \Type
Case #SUBACK
If \PacketState = #PacketState_OutgoingNotSendYet
R = _SendCommand()
Select R
Case #SendFinished, #SendFailed
*P = @SERVER\Sessions()\Packets()
ForEach SERVER\Sessions()\Packets()
If @SERVER\Sessions()\Packets() <> *P And \PacketIdentifier = *P\PacketIdentifier
_ClearPacket()
EndIf
Next
ChangeCurrentElement(SERVER\Sessions()\Packets(), *P)
_SetGetUniqePacketIdentifiers(\PacketIdentifier, #True)
_ClearPacket()
If R = #SendFailed
Break
EndIf
Case #SendNotFinished
\ReSendAt = ElapsedMilliseconds() + 2000
Break
EndSelect
EndIf
Case #UNSUBACK
If \PacketState = #PacketState_OutgoingNotSendYet
R = _SendCommand()
Select R
Case #SendFinished, #SendFailed
*P = @SERVER\Sessions()\Packets()
ForEach SERVER\Sessions()\Packets()
If @SERVER\Sessions()\Packets() <> *P And \PacketIdentifier = *P\PacketIdentifier
_ClearPacket()
EndIf
Next
ChangeCurrentElement(SERVER\Sessions()\Packets(), *P)
_SetGetUniqePacketIdentifiers(\PacketIdentifier, #True)
_ClearPacket()
If R = #SendFailed
Break
EndIf
Case #SendNotFinished
\ReSendAt = ElapsedMilliseconds() + 2000
Break
EndSelect
EndIf
Case #PINGREQ
If SERVER\Sessions()\SessionState = #SessionState_active And \PacketState = #PacketState_Incoming
\Type = #PINGRESP
\PacketState = #PacketState_OutgoingNotSendYet
_SendCommand()
_ClearPacket()
EndIf
Case #PUBLISH
Select \PacketState
Case #PacketState_Incoming
Topic = \TopicName
*Payload = \PayLoad\Buffer
PayLoadLength = \PayLoad\BufferLengh
QoS = \QoS
DUP = \DUP
Retain = \Retain
Identifier = \PacketIdentifier
CompilerIf #USE_BASE64_PAYLOAD
PayLoadBase64 = \PayLoad\PayLoadBase64
*Payload = @PayLoadBase64
CompilerEndIf
If QoS > 0
_SetGetUniqePacketIdentifiers(Identifier)
EndIf
If \Retain
;find and delete existing message
ForEach SERVER\RetainedMessages()
If SERVER\RetainedMessages()\Topic = Topic
CompilerIf #USE_BASE64_PAYLOAD = #False
If SERVER\RetainedMessages()\PayLoadLength
FreeMemory(SERVER\RetainedMessages()\Payload)
EndIf
CompilerEndIf
DeleteElement(SERVER\RetainedMessages())
EndIf
Next
If PayLoadLength > 0
AddElement(SERVER\RetainedMessages())
SERVER\RetainedMessages()\Topic = Topic
SERVER\RetainedMessages()\Qos = Qos
SERVER\RetainedMessages()\PayLoadLength = PayLoadLength
CompilerIf #USE_BASE64_PAYLOAD
SERVER\RetainedMessages()\PayLoadBase64 = PayLoadBase64
CompilerElse
SERVER\RetainedMessages()\Payload = AllocateMemory(PayLoadLength)
CopyMemory(*Payload, SERVER\RetainedMessages()\Payload, PayLoadLength)
CompilerEndIf
EndIf
EndIf
If Qos = 0 Or Qos = 1
;we can publish that directly
PushListPosition(SERVER\Sessions()\Packets())
*Session = @SERVER\Sessions()
ForEach SERVER\Sessions()
If @SERVER\Sessions() <> *Session
ForEach SERVER\Sessions()\Subscriptions()
If SERVER\Sessions()\SessionState = #SessionState_active Or SERVER\Sessions()\Persistant
If _TopicMatches(Topic, SERVER\Sessions()\Subscriptions()\Topic)
;send publish command to client
QoSR = SERVER\Sessions()\Subscriptions()\QoS
If QoSR > QoS
QoSR = QoS
EndIf
If QoSR > 0
OwnIdentifier = _SetGetUniqePacketIdentifiers()
EndIf
_AddSessionPacket(#PUBLISH, Topic, *Payload, PayLoadLength, QoSR, OwnIdentifier)
EndIf
EndIf
Next
EndIf
Next
ChangeCurrentElement(SERVER\Sessions(), *Session)
PopListPosition(SERVER\Sessions()\Packets())
If QoS = 0
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_InfoPublished, Topic, *PayLoad, PayLoadLength, "", 0, QoS, Retain, DUP)
Else
_AddSessionPacket(#PUBACK, Topic, *Payload, PayLoadLength, QoS, Identifier, 0, 0)
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_InfoPublished, Topic, *PayLoad, PayLoadLength, "", 0, QoS, Retain, DUP, Identifier)
EndIf
_ClearPacket()
ElseIf QoS = 2
\PacketState = #PacketState_WaitForAnswer
;keep the original publish message
_AddSessionPacket(#PUBREC, "", 0, 0, 0, Identifier, 0, 0)
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_InfoPublished, Topic, *PayLoad, PayLoadLength, "", 0, QoS, Retain, DUP, Identifier)
EndIf
Case #PacketState_OutgoingNotSendYet
QoS = \QoS
Identifier = \PacketIdentifier
Select QoS
Case 0
Select _SendCommand()
Case #SendFinished
_ClearPacket()
Case #SendFailed
_ClearPacket()
Break
EndSelect
;nothing more to do
Case 1
Select _SendCommand()
Case #SendFinished
;send answer
\PacketState = #PacketState_WaitForAnswer
\WaitForAnswerSince = ElapsedMilliseconds()
_AddSessionPacket(#PUBACK, "", 0, 0, QoS, Identifier)
Case #SendNotFinished
;try again next time
Break
Case #SendFailed
_ClearPacket()
Break
EndSelect
Case 2
R = _SendCommand()
Select R
Case #SendFinished
\PacketState = #PacketState_WaitForAnswer
\WaitForAnswerSince = ElapsedMilliseconds()
_AddSessionPacket(#PUBREC, "", 0, 0, QoS, Identifier)
Case #SendNotFinished
\ReSendAt = ElapsedMilliseconds() + 2000
Case #SendFailed
_ClearPacket()
Break
EndSelect
EndSelect
EndSelect
Case #PUBACK ;QoS 1 reply
Select \PacketState
Case #PacketState_OutgoingNotSendYet
R = _SendCommand()
Select R
Case #SendFinished
_SetGetUniqePacketIdentifiers(\PacketIdentifier, #True)
_ClearPacket()
Case #SendNotFinished
Break
Case #SendFailed
Break
EndSelect
Case #PacketState_Incoming
;finished
;search for all packets with this identifier and remove them
*P = @SERVER\Sessions()\Packets()
ForEach SERVER\Sessions()\Packets()
If @SERVER\Sessions()\Packets() <> *P And \PacketIdentifier = *P\PacketIdentifier
_ClearPacket()
EndIf
Next
ChangeCurrentElement(SERVER\Sessions()\Packets(), *P)
_SetGetUniqePacketIdentifiers(\PacketIdentifier, #True)
_ClearPacket()
EndSelect
Case #PUBREC ;first QoS 2 reply
Select \PacketState
Case #PacketState_OutgoingNotSendYet
Select _SendCommand()
Case #SendFinished
\PacketState = #PacketState_WaitForAnswer
\WaitForAnswerSince = ElapsedMilliseconds()
Case #SendFailed, #SendNotFinished
Break
EndSelect
Case #PacketState_Incoming
*P = @SERVER\Sessions()\Packets()
ForEach SERVER\Sessions()\Packets()
If @SERVER\Sessions()\Packets() <> *P And \PacketIdentifier = *P\PacketIdentifier
_ClearPacket()
EndIf
Next
ChangeCurrentElement(SERVER\Sessions()\Packets(), *P)
\Type = #PUBREL
\PacketState = #PacketState_OutgoingNotSendYet
EndSelect
Case #PUBREL ;second QoS 2 reply
If \PacketState = #PacketState_Incoming
Identifier = \PacketIdentifier
;search for main publish packet
ForEach SERVER\Sessions()\Packets()
If \Type = #PUBLISH And \PacketIdentifier = Identifier And \PacketState = #PacketState_WaitForAnswer
Topic = \TopicName
*Payload = \PayLoad\Buffer
PayLoadLength = \PayLoad\BufferLengh
QoS = \QoS
Identifier = \PacketIdentifier
DUP = \DUP
Retain = \Retain
*P = @SERVER\Sessions()\Packets()
CompilerIf #USE_BASE64_PAYLOAD
PayLoadBase64 = \PayLoad\PayLoadBase64
*Payload = @PayLoadBase64
CompilerEndIf
Break
EndIf
Next
*Session = @SERVER\Sessions()
If *P
;now publish it to subscribers
ForEach SERVER\Sessions()
If @SERVER\Sessions() <> *Session
ForEach SERVER\Sessions()\Subscriptions()
If SERVER\Sessions()\SessionState = #SessionState_active Or SERVER\Sessions()\Persistant
If _TopicMatches(Topic, SERVER\Sessions()\Subscriptions()\Topic)
;send publish command to client
QoSR = SERVER\Sessions()\Subscriptions()\QoS
OwnIdentifier = _SetGetUniqePacketIdentifiers()
If QoSR > QoS
QoSR = QoS
EndIf
_AddSessionPacket(#PUBLISH, Topic, *Payload, PayLoadLength, QoSR, OwnIdentifier)
EndIf
EndIf
Next
EndIf
Next
EndIf
ChangeCurrentElement(SERVER\Sessions(), *Session)
ForEach SERVER\Sessions()\Packets()
If \PacketIdentifier = Identifier
_ClearPacket()
EndIf
Next
_AddSessionPacket(#PUBCOMP, "", 0, 0, QoS, Identifier)
ElseIf \PacketState = #PacketState_OutgoingNotSendYet
Select _SendCommand()
Case #SendFinished
\PacketState = #PacketState_WaitForAnswer
Case #SendNotFinished
\ReSendAt = ElapsedMilliseconds() + 2000
Break
Case #SendFailed
Break
EndSelect
EndIf
Case #PUBCOMP ;final QoS 2 reply
Identifier = \PacketIdentifier
If \PacketState = #PacketState_OutgoingNotSendYet
R = _SendCommand()
Else
R = #SendFinished
EndIf
If R = #SendFinished
ForEach SERVER\Sessions()\Packets()
If \PacketIdentifier = Identifier
_ClearPacket()
EndIf
Next
_SetGetUniqePacketIdentifiers(Identifier, #True)
Else
Break
EndIf
Case #DISCONNECT
;remove all packets
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_ClientDisconnected)
ClearList(SERVER\Sessions()\Packets())
CloseNetworkConnection(SERVER\Sessions()\ClientID)
SERVER\Sessions()\SessionState = #SessionState_inactive
SERVER\Sessions()\ClientID = #Null
Break
Case #SUBSCRIBE
;check if allowed, but who will decide that??
;we allow any description now
If \PacketState = #PacketState_Incoming
If ListSize(\tmpSubsc()) > 0
ForEach \tmpSubsc()
If \tmpSubsc()\Add
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_Subscription, \tmpSubsc()\Topic, 0, 0, "", 0, \tmpSubsc()\QoS)
;check if subscription is known already
Found = #False
ForEach SERVER\Sessions()\Subscriptions()
If \tmpSubsc()\Topic = SERVER\Sessions()\Subscriptions()\Topic
;yes, udate QoS only
SERVER\Sessions()\Subscriptions()\QoS = \tmpSubsc()\QoS
Found = #True
Break
EndIf
Next
If Found = 0
;no, add it
AddElement(SERVER\Sessions()\Subscriptions())
SERVER\Sessions()\Subscriptions()\Topic = \tmpSubsc()\Topic
SERVER\Sessions()\Subscriptions()\QoS = \tmpSubsc()\QoS
EndIf
EndIf
Next
;now send answer
*Buffer = AllocateMemory(ListSize(\tmpSubsc()))
i = 0
ForEach \tmpSubsc()
PokeB(*Buffer + i, \tmpSubsc()\QoS)
i + 1
Next
CompilerIf #USE_BASE64_PAYLOAD
a$ = Base64Encoder(*Buffer, MemorySize(*Buffer))
_AddSessionPacket(#SUBACK, "", @a$, MemorySize(*Buffer), 0, \PacketIdentifier)
CompilerElse
_AddSessionPacket(#SUBACK, "", *Buffer, MemorySize(*Buffer), 0, \PacketIdentifier)
CompilerEndIf
FreeMemory(*Buffer)
_SetGetUniqePacketIdentifiers(\PacketIdentifier)
;Successfully subscribed, now check for existing retained messages for those subscriptions
ForEach SERVER\RetainedMessages()
ForEach \tmpSubsc()
If _TopicMatches(SERVER\RetainedMessages()\Topic, \tmpSubsc()\Topic)
OwnIdentifier = 0
If SERVER\RetainedMessages()\QoS > 0
OwnIdentifier = _SetGetUniqePacketIdentifiers()
EndIf
CompilerIf #USE_BASE64_PAYLOAD
*Payload = @SERVER\RetainedMessages()\PayLoadBase64
CompilerElse
*Payload = SERVER\RetainedMessages()\Payload
CompilerEndIf
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_InfoPublished, SERVER\RetainedMessages()\Topic,
*Payload, SERVER\RetainedMessages()\PayLoadLength, "", 0, \tmpSubsc()\QoS, 1, 0, OwnIdentifier)
_AddSessionPacket(#PUBLISH, SERVER\RetainedMessages()\Topic, *Payload, SERVER\RetainedMessages()\PayLoadLength, SERVER\RetainedMessages()\Qos, OwnIdentifier, 0, 1)
Break
EndIf
Next
Next
ClearList(\tmpSubsc())
EndIf
EndIf
_ClearPacket()
Case #UNSUBSCRIBE
If \PacketState = #PacketState_Incoming
Identifier = \PacketIdentifier
ForEach \tmpSubsc()
If \tmpSubsc()\Add = 0
ForEach SERVER\Sessions()\Subscriptions()
If _TopicMatches(\tmpSubsc()\Topic, SERVER\Sessions()\Subscriptions()\Topic)
DeleteElement(SERVER\Sessions()\Subscriptions())
Break
EndIf
Next
EndIf
Next
_SetGetUniqePacketIdentifiers(\PacketIdentifier)
;now send answer
ClearList(\tmpSubsc())
_AddSessionPacket(#UNSUBACK, "", 0, 0, 0, Identifier)
EndIf
_ClearPacket()
Default
If \PacketState = #PacketState_OutgoingNotSendYet
Select _SendCommand()
Case #SendFinished
_SetGetUniqePacketIdentifiers(\PacketIdentifier, #True)
_ClearPacket()
Default
Break
EndSelect
EndIf
EndSelect
EndWith
Next
Next
EndProcedure
Procedure _IsClientAllowed(Username.s, Password.s)
Protected Result
;check if the client, who connected and has sent the CONNACK, is accepted
If ListSize(SERVER\Access()) = 0
;no access data provided, anyone is allowed to connect!
ProcedureReturn #True
EndIf
ForEach SERVER\Access()
Select SERVER\Access()\Flag
Case #AccessFlag_NoEncryption
;not hashed
If SERVER\Access()\Username = Username And SERVER\Access()\Password = Password
Result = #True
Break
EndIf
Case #AccessFlag_PasswordMD5
;password is a md5 hash
If SERVER\Access()\Username = Username
If SERVER\Access()\Password = ""
Result = #True
Break
ElseIf Fingerprint(@Password, StringByteLength(Password), #PB_Cipher_MD5) = Fingerprint(@SERVER\Access()\Password, StringByteLength(SERVER\Access()\Password), #PB_Cipher_MD5)
Result = #True
Break
EndIf
EndIf
Case #AccessFlag_BothMD5
If Fingerprint(@Username, StringByteLength(Username), #PB_Cipher_MD5) = Fingerprint(@SERVER\Access()\Username, StringByteLength(SERVER\Access()\Username), #PB_Cipher_MD5)
If SERVER\Access()\Password = ""
Result = #True
Break
ElseIf Fingerprint(@Password, StringByteLength(Password), #PB_Cipher_MD5) = Fingerprint(@SERVER\Access()\Password, StringByteLength(SERVER\Access()\Password), #PB_Cipher_MD5)
Result = #True
Break
EndIf
EndIf
EndSelect
Next
ProcedureReturn Result
EndProcedure
Procedure _CheckConnPackets(List ConnPackets.ConnPackets())
Protected Found, Found2
;handle CONNACK packets
;if anything is fine, that client will be added as a new Sessions() element
With ConnPackets()
ForEach ConnPackets()
If ConnPackets()\TimeOUT < ElapsedMilliseconds()
;timed out
CloseNetworkConnection(ConnPackets()\ClientID)
If ConnPackets()\Buffer
FreeMemory(ConnPackets()\Buffer)
EndIf
ClearList(ConnPackets()\tmpSubsc())
DeleteElement(ConnPackets())
ElseIf \PacketState = #PacketState_Incoming
Select \Type
Case #CONNECT
If _IsClientAllowed(\Username, \Password) = #False
_Send_Connack(\ClientID, 0, #ConnRefused_BadUsernameOrPassword)
CloseNetworkConnection(\ClientID)
FreeMemory(\Buffer)
DeleteElement(ConnPackets())
ElseIf \Level > 4
_Send_Connack(\ClientID, 0, #ConnRefused_UnacceptableProtocolVersion)
CloseNetworkConnection(\ClientID)
FreeMemory(\Buffer)
DeleteElement(ConnPackets())
Else
Found = 0
Found2 = 0
ForEach SERVER\Sessions()
If SERVER\Sessions()\ClientIdentifier = \ClientIdentifier
;we have an existing session!
If \CleanSession = 0
Found = 1
Else
Found2 = 1
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
ClearList(SERVER\Sessions()\Packets())
ClearList(SERVER\Sessions()\Subscriptions())
EndIf
Break
EndIf
Next
If Found = 0 And Found2 = 0
AddElement(SERVER\Sessions())
EndIf
SERVER\Sessions()\LastActivity = ElapsedMilliseconds()
SERVER\Sessions()\ClientID = \ClientID
SERVER\Sessions()\ClientIdentifier = \ClientIdentifier
SERVER\Sessions()\SessionState = #SessionState_active
SERVER\Sessions()\ConnFlags = \ConnFlags
SERVER\Sessions()\KeepAlive = \KeepAlive
SERVER\Sessions()\Will\QoS = \Will\QoS
SERVER\Sessions()\Will\Flag = \Will\Flag
SERVER\Sessions()\Will\MessageBase64 = \Will\MessageBase64
SERVER\Sessions()\Will\Retain = \Will\Retain
SERVER\Sessions()\Will\Topic = \Will\Topic
SERVER\Sessions()\Level = \Level
SERVER\Sessions()\Persistant = 1 - \CleanSession
If SERVER\Sessions()\Buffer = 0
SERVER\Sessions()\Buffer = AllocateMemory(SERVER\InitBufferSize)
EndIf
SERVER\Sessions()\BufferPos = 0
_Send_Connack(SERVER\Sessions()\ClientID, Found2, #ConnAccepted)
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_ClientConnected)
FreeMemory(\Buffer)
DeleteElement(ConnPackets())
EndIf
Default
CloseNetworkConnection(ConnPackets()\ClientID)
If ConnPackets()\Buffer
FreeMemory(ConnPackets()\Buffer)
EndIf
ClearList(ConnPackets()\tmpSubsc())
DeleteElement(ConnPackets())
EndSelect
EndIf
Next
EndWith
EndProcedure
Procedure _CheckForLastWill()
Protected *S.Session, Identifier.i, *PayLoad, L, *Buffer
;check if the disconnected client has a last will stored, and if so, publish it to all subscribers
If SERVER\Sessions()\Will\Topic
*S = SERVER\Sessions()
If *S\Will\MessageBase64
*Buffer = AllocateMemory(StringByteLength(*S\Will\MessageBase64))
L = Base64Decoder(*S\Will\MessageBase64, *Buffer, MemorySize(*Buffer))
CompilerIf #USE_BASE64_PAYLOAD
*PayLoad = @*S\Will\MessageBase64
CompilerElse
*PayLoad = *Buffer
CompilerEndIf
EndIf
ForEach SERVER\Sessions()
If @SERVER\Sessions() <> *S And SERVER\Sessions()\SessionState = #SessionState_active
ForEach SERVER\Sessions()\Subscriptions()
If _TopicMatches(*S\Will\Topic, SERVER\Sessions()\Subscriptions()\Topic)
If SERVER\Sessions()\Subscriptions()\QoS > 0
Identifier = _SetGetUniqePacketIdentifiers()
Else
Identifier = 0
EndIf
_AddSessionPacket(#PUBLISH, *S\Will\Topic, *PayLoad, L, SERVER\Sessions()\Subscriptions()\QoS, Identifier, 0, *S\Will\Retain)
EndIf
Next
EndIf
Next
ChangeCurrentElement(SERVER\Sessions(), *S)
If *Buffer
FreeMemory(*Buffer)
EndIf
EndIf
EndProcedure
Procedure _ServerThread(Dummy)
Protected SID, CID, Found, Pos, *P.PACKET
Protected i, j, k, l, Add, Length, *S.Session, *Payload
Protected LogTimer.q, RealLength, TLSMode, Mode, Packets
;here is all the magic :)
Mode = #PB_Network_TCP
CompilerIf Defined(TLS_PBI, #PB_Constant) And Defined(PB_Network_Extra, #PB_Constant)
If SERVER\TLS\Protocols & #PB_Network_TLSv1
Mode = Mode | (SERVER\TLS\Protocols & #PB_Network_TLSv1)
EndIf
CompilerEndIf
SID = CreateNetworkServer(#PB_Any, SERVER\Port, Mode, SERVER\BindIP)
If SID = 0
SERVER\Error = #Error_CantStartServer
_LogServerAction("BROKER", #MQTTEvent_Error, "", 0, 0, ErrorDescription(SERVER\Error), SERVER\Error)
ProcedureReturn
EndIf
_LogServerAction("", #MQTTEvent_Info, "", 0, 0, "Server Thread Started", #Info_ThreadStarted)
NewList ConnPackets.ConnPackets() ;<- those are packets not (yet) belonging to an established session
; we will do the whole connecting-handshake with those
; when finished, they will create a new session element
LogTimer = ElapsedMilliseconds()
Repeat
Select NetworkServerEvent(SID)
Case #PB_NetworkEvent_None
If Packets > 20
Delay(0)
ElseIf Packets > 10
Delay(2)
Else
Delay(5)
EndIf
Case #PB_NetworkEvent_Connect
;ConnPackets are more or less temp. packets
;As soon as they are logged in it will be integrated into a session
AddElement(ConnPackets())
ConnPackets()\ClientID = EventClient()
ConnPackets()\TimeOUT = ElapsedMilliseconds() + SERVER\TimeOUT
ConnPackets()\Buffer = AllocateMemory(SERVER\InitBufferSize)
ConnPackets()\PacketState = #PacketState_WaitForAnswer
Case #PB_NetworkEvent_Disconnect
Found = #False
ForEach ConnPackets()
If ConnPackets()\ClientID = EventClient()
_LogServerAction(ConnPackets()\ClientIdentifier, #MQTTEvent_ClientDisconnected)
FreeMemory(ConnPackets()\Buffer)
DeleteElement(ConnPackets())
Found = #True
Break
EndIf
Next
If Found = #False
ForEach SERVER\Sessions()
If SERVER\Sessions()\ClientID = EventClient()
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_ClientDisconnected)
_CheckForLastWill()
;error, close connection
;(removed, seems PB made the eventclient() invalid already)
;CloseNetworkConnection(SERVER\Sessions()\ClientID)
If SERVER\Sessions()\Persistant
SERVER\Sessions()\SessionState = #SessionState_inactive
SERVER\Sessions()\ClientID = #Null
Else
FreeMemory(SERVER\Sessions()\Buffer)
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
DeleteElement(SERVER\Sessions())
EndIf
Break
EndIf
Next
EndIf
Case #PB_NetworkEvent_Data
CID = EventClient()
Found = 0
;check not yet authorized clients first
With ConnPackets()
ForEach ConnPackets()
If \ClientID = CID
Found = 1
Break
EndIf
Next
If Found = 1
;get packet, here we only receive the packet, we will handle it down below outside of the network loop
Pos = ReceiveNetworkData(CID, \Buffer, MemorySize(\Buffer) - \BufferPos)
If Pos > -1
\BufferPos + Pos
;dynamically increase buffer if needed
If \BufferPos > MemorySize(\Buffer) - 256
\Buffer = ReAllocateMemory(\Buffer, MemorySize(\Buffer) + SERVER\InitBufferSize)
EndIf
Repeat
Length = _GetPacketSize(\Buffer, \BufferPos, @Add)
If Length = -1
Else
If _Conn_PacketDataReceived(\Buffer, \BufferPos, @ConnPackets())
If \BufferPos - Length - 1 - Add < 0
;client sends rubbish, we better freak out
SERVER\Error = #Error_LengthOfPacketIncorrect
_LogServerAction(\ClientIdentifier, #MQTTEvent_Error, "", 0, 0, ErrorDescription(SERVER\ERROR), SERVER\Error)
CloseNetworkConnection(CID)
FreeMemory(\Buffer)
DeleteElement(ConnPackets())
Break
EndIf
MoveMemory(\Buffer + Length + 1 + Add, \Buffer, \BufferPos - Length - 1 - Add)
\BufferPos - Length - 1 - Add
Else
;error, close connection
SERVER\Error = #Error_CorruptedPacketReceived
_LogServerAction(\ClientIdentifier, #MQTTEvent_Error, "", 0, 0, ErrorDescription(SERVER\ERROR), SERVER\Error)
CloseNetworkConnection(CID)
FreeMemory(\Buffer)
DeleteElement(ConnPackets())
Break
EndIf
EndIf
Until Length = -1
Else
;network error? handle it?
EndIf
EndIf
EndWith
With SERVER\Sessions()
If Found = 0
;now check for known clients
ForEach SERVER\Sessions()
If \SessionState = #SessionState_active
If \ClientID = CID
\LastActivity = ElapsedMilliseconds()
;this client is known and accepted already
Found = 2
Break
EndIf
EndIf
Next
EndIf
If Found = 0
;???
CloseNetworkConnection(CID)
ElseIf Found = 2
;get packet, here we only receive the packet, we will handle it down below outside of the network loop
Pos = ReceiveNetworkData(CID, \Buffer + \BufferPos, MemorySize(\Buffer) - \BufferPos)
If Pos > -1
\BufferPos + Pos
Repeat
RealLength = -1
Length = _GetPacketSize(\Buffer, \BufferPos, @Add, @RealLength)
If Length = -1
If RealLength <> -1
If MemorySize(\Buffer) < SizeOf(HEADER) + Add + RealLength
\Buffer = ReAllocateMemory(\Buffer, SizeOf(HEADER) + Add + RealLength + 64)
EndIf
EndIf
Else
LastElement(SERVER\Sessions()\Packets())
AddElement(SERVER\Sessions()\Packets())
If _PacketDataReceived(\Buffer, \BufferPos, @SERVER\Sessions()\Packets())
If \BufferPos - Length - 1 - Add < 0
;client sends rubbish, we better freak out
SERVER\Error = #Error_LengthOfPacketIncorrect
_LogServerAction(\ClientIdentifier, #MQTTEvent_Error, "", 0, 0, ErrorDescription(SERVER\ERROR), SERVER\Error)
CloseNetworkConnection(CID)
FreeMemory(SERVER\Sessions()\Buffer)
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
DeleteElement(SERVER\Sessions())
Break
EndIf
MoveMemory(\Buffer + Length + SizeOf(HEADER) + Add, \Buffer, \BufferPos - Length - SizeOf(HEADER) - Add)
\BufferPos - Length - SizeOf(HEADER) - Add
Else
;error, close connection
SERVER\Error = #Error_CorruptedPacketReceived
_LogServerAction(\ClientIdentifier, #MQTTEvent_Error, "", 0, 0, ErrorDescription(SERVER\ERROR), SERVER\Error)
CloseNetworkConnection(CID)
If SERVER\Sessions()\Persistant
SERVER\Sessions()\SessionState = #SessionState_inactive
SERVER\Sessions()\ClientID = #Null
Else
FreeMemory(SERVER\Sessions()\Buffer)
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
DeleteElement(SERVER\Sessions())
EndIf
Break
EndIf
EndIf
Until Length = -1
Else
;network error? handle it?
EndIf
EndIf
EndWith
EndSelect
;first check connection packets
_CheckConnPackets(ConnPackets())
;next, check packets from established sessions
_CheckSessionPackets()
;check for timed out sessions
Packets = 0
ForEach SERVER\Sessions()
CompilerIf #PB_Compiler_Debugger And #DEEP_DEBUG
If LogTimer + 2000 < ElapsedMilliseconds()
LogTimer = ElapsedMilliseconds()
Debug "Identifiers: " + Str(MapSize(SERVER\Sessions()\PacketIdentifiersUsed()))
Debug "Packets: " + Str(ListSize(SERVER\Sessions()\Packets()))
Debug "Retained Packets: " + Str(ListSize(SERVER\RetainedMessages()))
EndIf
CompilerEndIf
If SERVER\Sessions()\SessionState = #SessionState_active
If SERVER\Sessions()\LastActivity + (1500 * SERVER\Sessions()\KeepAlive) < ElapsedMilliseconds()
;timed out
_CheckForLastWill()
SERVER\Error = #Error_TimedOut
_LogServerAction(SERVER\Sessions()\ClientIdentifier, #MQTTEvent_Error, "", 0, 0, ErrorDescription(SERVER\ERROR), SERVER\Error)
CloseNetworkConnection(SERVER\Sessions()\ClientID)
SERVER\Sessions()\ClientID = #Null
SERVER\Sessions()\SessionState = #SessionState_inactive
Else
Packets + ListSize(SERVER\Sessions()\Packets())
EndIf
EndIf
Next
;Broker should not publish directly to clients.
;I've added that only for testing purpose
If TrySemaphore(SERVER\Semaphore)
LockMutex(SERVER\Mutex)
If FirstElement(SERVER\PublishToClient())
ForEach SERVER\Sessions()
If SERVER\Sessions()\ClientID And SERVER\Sessions()\SessionState = #SessionState_active
ForEach SERVER\Sessions()\Subscriptions()
If _TopicMatches(SERVER\PublishToClient()\Topic, SERVER\Sessions()\Subscriptions()\Topic)
If SERVER\PublishToClient()\QoS > 0
l = _SetGetUniqePacketIdentifiers()
EndIf
CompilerIf #USE_BASE64_PAYLOAD
*Payload = @SERVER\PublishToClient()\PayLoadBase64
CompilerElse
*Payload = SERVER\PublishToClient()\Payload
CompilerEndIf
_AddSessionPacket(#PUBLISH, SERVER\PublishToClient()\Topic, *Payload, SERVER\PublishToClient()\PayLoadLength, SERVER\PublishToClient()\QoS, l)
EndIf
Next
EndIf
Next
CompilerIf #USE_BASE64_PAYLOAD = #False
If SERVER\PublishToClient()\Payload
FreeMemory(SERVER\PublishToClient()\Payload)
EndIf
CompilerEndIf
DeleteElement(SERVER\PublishToClient())
EndIf
UnlockMutex(SERVER\Mutex)
EndIf
Until SERVER\StopIt = #True
;save session data
ForEach SERVER\Sessions()
If SERVER\Sessions()\ClientID
CloseNetworkConnection(SERVER\Sessions()\ClientID)
SERVER\Sessions()\ClientID = #Null
EndIf
If SERVER\Sessions()\Persistant = #False
FreeMemory(SERVER\Sessions()\Buffer)
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
ClearList(SERVER\Sessions()\Subscriptions())
ClearList(SERVER\Sessions()\Packets())
ClearMap(SERVER\Sessions()\PacketIdentifiersUsed())
DeleteElement(SERVER\Sessions())
EndIf
Next
ForEach ConnPackets()
If ConnPackets()\ClientID
CloseNetworkConnection(ConnPackets()\ClientID)
EndIf
If ConnPackets()\Buffer
FreeMemory(ConnPackets()\Buffer)
EndIf
Next
ClearList(ConnPackets())
CloseNetworkServer(SID)
_LogServerAction("", #MQTTEvent_Info, "", 0, 0, "Server Thread Ended", #Info_ThreadEnded)
EndProcedure
Procedure _ClearServerData()
;can only be used, when no server is currently running.
;it will delete all persistant messages and stored sessions.
;the server will then behave as been started for the first time
If SERVER\ThreadID And IsThread(SERVER\ThreadID)
SERVER\Error = #Error_UseStopServerFirst
ProcedureReturn #False
EndIf
If SERVER\PersistantStoragePath = "" Or FileSize(SERVER\PersistantStoragePath) > -2
ProcedureReturn #True
;that directory does not exist, which means, we are clean already, therefore we return #True
EndIf
If FileSize(SERVER\PersistantStoragePath + "sessions.json") > 0
DeleteFile(SERVER\PersistantStoragePath + "sessions.json")
EndIf
If FileSize(SERVER\PersistantStoragePath + "retainmsg.json") > 0
DeleteFile(SERVER\PersistantStoragePath + "retainmsg.json")
EndIf
ProcedureReturn #True
EndProcedure
Procedure _InitDefault(Value.i, Def.i)
If Value = 0
ProcedureReturn Def
EndIf
ProcedureReturn Value
EndProcedure
;}
;----public Procedures
;{
Procedure StartServer()
Protected i, a$, JSON
If SERVER\isInit = #False
SERVER\Error = #Error_UseInitServerFirst
ProcedureReturn #False ;InitServer first
EndIf
If SERVER\ThreadID And IsThread(SERVER\ThreadID)
SERVER\Error = #Error_UseStopServerFirst
ProcedureReturn #False ;StopServer first
EndIf
If ListSize(SERVER\Sessions()) > 0
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
ClearList(SERVER\Sessions()\Packets())
ClearList(SERVER\Sessions())
EndIf
ClearList(SERVER\RetainedMessages())
;read old Session data (if exists)
If SERVER\PersistantStoragePath
If FileSize(SERVER\PersistantStoragePath + "sessions.json") > 0
JSON = LoadJSON(#PB_Any, SERVER\PersistantStoragePath + "sessions.json")
If JSON
ExtractJSONList(JSONValue(JSON), SERVER\Sessions())
FreeJSON(JSON)
EndIf
EndIf
ForEach SERVER\Sessions()
SERVER\Sessions()\SessionState = #SessionState_inactive
SERVER\Sessions()\Buffer = AllocateMemory(SERVER\InitBufferSize)
SERVER\Sessions()\BufferPos = 0
SERVER\Sessions()\ClientID = 0
CompilerIf #USE_BASE64_PAYLOAD = #False
ForEach SERVER\Sessions()\Packets()
If SERVER\Sessions()\Packets()\PayLoad\BufferLengh
SERVER\Sessions()\Packets()\PayLoad\Buffer = AllocateMemory(SERVER\Sessions()\Packets()\PayLoad\BufferLengh)
Base64Decoder(SERVER\Sessions()\Packets()\PayLoad\PayLoadBase64, SERVER\Sessions()\Packets()\PayLoad\Buffer, SERVER\Sessions()\Packets()\PayLoad\BufferLengh)
SERVER\Sessions()\Packets()\PayLoad\PayLoadBase64 = ""
EndIf
Next
CompilerEndIf
Next
If FileSize(SERVER\PersistantStoragePath + "retainmsg.json") > 0
JSON = LoadJSON(#PB_Any, SERVER\PersistantStoragePath + "retainmsg.json")
If JSON
ExtractJSONList(JSONValue(JSON), SERVER\RetainedMessages())
FreeJSON(JSON)
CompilerIf #USE_BASE64_PAYLOAD = #False
ForEach SERVER\RetainedMessages()
If SERVER\RetainedMessages()\PayLoadBase64
SERVER\RetainedMessages()\Payload = AllocateMemory(SERVER\RetainedMessages()\PayLoadLength)
Base64Decoder(SERVER\RetainedMessages()\PayLoadBase64, SERVER\RetainedMessages()\Payload, SERVER\RetainedMessages()\PayLoadLength)
SERVER\RetainedMessages()\PayLoadBase64 = ""
EndIf
Next
CompilerEndIf
EndIf
EndIf
EndIf
CompilerIf Defined(TLS_PBI, #PB_Constant) And Defined(PB_Network_Extra, #PB_Constant)
If SERVER\TLS\Protocols & #PB_Network_TLSv1
Init_TLS(SERVER\TLS\CertFile, SERVER\TLS\CertKey, SERVER\TLS\CertCa)
EndIf
CompilerEndIf
SERVER\StopIt = #False
SERVER\ThreadID = CreateThread(@_ServerThread(), 0)
ProcedureReturn SERVER\ThreadID
EndProcedure
Procedure InitServer(*Config.SERVER_INIT, RunServerAlso.i = #False)
Protected a$, i, Result
If SERVER\InitNetwork = #False
CompilerIf #PB_Compiler_Version < 600
If InitNetwork() = 0
SERVER\Error = #Error_NoNetworkAvailable
ProcedureReturn 0
EndIf
CompilerEndIf
SERVER\InitNetwork = #True
EndIf
If SERVER\isInit = #True
SERVER\Error = #Error_UseDeInitServerFirst
ProcedureReturn 0
EndIf
SERVER\isInit = #True
SERVER\Port = _InitDefault(*Config\Port, 1883)
SERVER\BindIP = *Config\BindIP
SERVER\InitBufferSize = _InitDefault(*Config\InitialBufferSize, $10000)
SERVER\LogFile = *Config\LogFile
SERVER\PersistantStoragePath = *Config\PersistantStoragePath
SERVER\TimeOUT = _InitDefault(*Config\ClientTimeOUT, 5000)
SERVER\LogWindowEvent = *Config\LogWindowEvent
SERVER\LogWindow = *Config\LogWindow
SERVER\Mutex = CreateMutex()
SERVER\Semaphore = CreateSemaphore()
SERVER\TLS\Protocols = *Config\TLS\Protocols
SERVER\TLS\CertFile = *Config\TLS\CertFile
SERVER\TLS\CertKey = *Config\TLS\CertKey
SERVER\TLS\CertCa = *Config\TLS\CertCa
If *Config\LogWindowEvent = 0
SERVER\LogWindow = -1
EndIf
If SERVER\PersistantStoragePath And FileSize(SERVER\PersistantStoragePath) <> -2
;Make sure directory exists
a$ = StringField(SERVER\PersistantStoragePath, 1, #PS$)
For i = 1 To CountString(SERVER\PersistantStoragePath, #PS$) - 1
a$ + #PS$
If FileSize(a$) <> -2
CreateDirectory(a$)
EndIf
a$ + StringField(SERVER\PersistantStoragePath, i + 1, #PS$)
Next i
EndIf
CopyList(*Config\Access(), SERVER\Access())
Result = #True
If RunServerAlso
Result = StartServer()
EndIf
ProcedureReturn Result
EndProcedure
Procedure StopServer()
Protected JSON
If SERVER\ThreadID And IsThread(SERVER\ThreadID)
SERVER\StopIt = #True
If WaitThread(SERVER\ThreadID, 1000) = 0
KillThread(SERVER\ThreadID)
EndIf
SERVER\ThreadID = 0
EndIf
If SERVER\PersistantStoragePath
If ListSize(SERVER\Sessions()) = 0
If FileSize(SERVER\PersistantStoragePath + "sessions.json") > 0
DeleteFile(SERVER\PersistantStoragePath + "sessions.json")
EndIf
Else
CompilerIf #USE_BASE64_PAYLOAD = #False
ForEach SERVER\Sessions()
ForEach SERVER\Sessions()\Packets()
If SERVER\Sessions()\Packets()\PayLoad\Buffer
SERVER\Sessions()\Packets()\PayLoad\PayLoadBase64 = Base64Encoder(SERVER\Sessions()\Packets()\PayLoad\Buffer, SERVER\Sessions()\Packets()\PayLoad\BufferLengh)
FreeMemory(SERVER\Sessions()\Packets()\PayLoad\Buffer)
SERVER\Sessions()\Packets()\PayLoad\Buffer = 0
EndIf
Next
Next
CompilerEndIf
JSON = CreateJSON(#PB_Any)
InsertJSONList(JSONValue(JSON), SERVER\Sessions())
SaveJSON(JSON, SERVER\PersistantStoragePath + "sessions.json")
FreeJSON(JSON)
EndIf
If ListSize(SERVER\RetainedMessages()) = 0
CompilerIf #USE_BASE64_PAYLOAD = #False
ForEach SERVER\RetainedMessages()
If SERVER\RetainedMessages()\Payload
SERVER\RetainedMessages()\PayLoadBase64 = Base64Encoder(SERVER\RetainedMessages()\Payload, SERVER\RetainedMessages()\PayLoadLength)
FreeMemory(SERVER\RetainedMessages()\Payload)
SERVER\RetainedMessages()\Payload = 0
EndIf
Next
CompilerEndIf
If FileSize(SERVER\PersistantStoragePath + "retainmsg.json") > 0
DeleteFile(SERVER\PersistantStoragePath + "retainmsg.json")
EndIf
Else
JSON = CreateJSON(#PB_Any)
InsertJSONList(JSONValue(JSON), SERVER\RetainedMessages())
SaveJSON(JSON, SERVER\PersistantStoragePath + "retainmsg.json")
FreeJSON(JSON)
EndIf
EndIf
;clean up
ForEach SERVER\Sessions()
If SERVER\Sessions()\Buffer
FreeMemory(SERVER\Sessions()\Buffer)
EndIf
ClearList(SERVER\Sessions()\Subscriptions())
ForEach SERVER\Sessions()\Packets()
_ClearPacket()
Next
ClearList(SERVER\Sessions()\Packets())
Next
ClearList(SERVER\Sessions())
ClearList(SERVER\RetainedMessages())
EndProcedure
Procedure DeInitServer()
StopServer()
SERVER\isInit = #False
If SERVER\Mutex
FreeMutex(SERVER\Mutex)
SERVER\Mutex = 0
EndIf
If SERVER\Semaphore
FreeSemaphore(SERVER\Semaphore)
SERVER\Semaphore = 0
EndIf
EndProcedure
Procedure GetLastError()
ProcedureReturn SERVER\Error
EndProcedure
;}
EndModule
;--------------###########---------------
; EOF
;--------------###########---------------
CompilerIf #PB_Compiler_IsMainFile
; Quick example, with a log window
; for all who own those cool shelly plugs (https://www.amazon.de/gp/product/B07TCQ7BFN/ref=ppx_yo_dt_b_asin_title_o06_s00?ie=UTF8&psc=1)
; => they can be switched on and off with this example.
; (need to activate MQTT support in their settings)
; set a certificate and keyfile to enable TLS
; remember to also set #TLS_PBI to the path and filename of TLS.pbi
; which can be found here: https://www.purebasic.fr/english/viewtopic.php?p=593738#p593738
; you also need a compiled libtls-xx.dll or *.a
; idle offers a compiled 3.5.0 here: https://www.purebasic.fr/english/viewtopic.php?p=593079#p593079
Global TLS_CERTIFICATE$ = ""
Global TLS_KEYFILE$ = ""
#LOG_ANYTHING = #True
CompilerIf Defined(PB_Network_TLSv1, #PB_Constant) = 0
#PB_Network_TLSv1_0 = $10
#PB_Network_TLSv1_1 = $20
#PB_Network_TLSv1_2 = $40
#PB_Network_TLSv1_3 = $80
#PB_Network_TLSv1 = #PB_Network_TLSv1_0 | #PB_Network_TLSv1_1 | #PB_Network_TLSv1_2 | #PB_Network_TLSv1_3
#PB_Network_TLS_DEFAULT = #PB_Network_TLSv1_2 | #PB_Network_TLSv1_3
#PB_Network_KeepAlive = $1000
#PB_Network_Extra = #PB_Network_TLSv1 | #PB_Network_KeepAlive
CompilerEndIf
Enumeration #PB_Event_FirstCustomValue
#MQTT_Event
EndEnumeration
Enumeration
#MQTT_LogWindow
EndEnumeration
Enumeration
#Editor_Log
#Combo_Shellies
#Button_SwitchShelly
EndEnumeration
Procedure example_LogIT(Text.s)
If Text
Text = FormatDate("%hh:%ii:%ss", Date()) + " " + Text
EndIf
AddGadgetItem(#Editor_Log, -1, Text)
CompilerSelect #PB_Compiler_OS
CompilerCase #PB_OS_Windows
Select GadgetType(#Editor_Log)
Case #PB_GadgetType_ListView
SendMessage_(GadgetID(#Editor_Log), #LB_SETTOPINDEX, CountGadgetItems(#Editor_Log) - 1, #Null)
Case #PB_GadgetType_ListIcon
SendMessage_(GadgetID(#Editor_Log), #LVM_ENSUREVISIBLE, CountGadgetItems(#Editor_Log) - 1, #False)
Case #PB_GadgetType_Editor
SendMessage_(GadgetID(#Editor_Log), #EM_SCROLLCARET, #SB_BOTTOM, 0)
EndSelect
CompilerCase #PB_OS_Linux
Protected *Adjustment.GtkAdjustment
*Adjustment = gtk_scrolled_window_get_vadjustment_(gtk_widget_get_parent_(GadgetID(#Editor_Log)))
*Adjustment\value = *Adjustment\upper
gtk_adjustment_value_changed_(*Adjustment)
CompilerEndSelect
EndProcedure
Procedure example_OnEvent_ComboShellies()
Protected i
i = GetGadgetState(#Combo_Shellies)
If i = -1
DisableGadget(#Button_SwitchShelly, 1)
SetGadgetText(#Button_SwitchShelly, "")
Else
DisableGadget(#Button_SwitchShelly, 0)
If GetGadgetItemData(#Combo_Shellies, i)
SetGadgetText(#Button_SwitchShelly, "Switch off")
Else
SetGadgetText(#Button_SwitchShelly, "Switch on")
EndIf
EndIf
EndProcedure
Procedure example_OnEvent_MQTT_LogData()
Protected *Values.MQTT_Common::MQTT_EVENTDATA, a$
Protected *Buffer, i, no, ClientIdentifier.s, Type, Payload.s
Protected Topic.s, ErrorText.s, Error, QoS, DUP, Retain, RetMsg.s, Identifier
*Values = EventData()
If *Values
a$ = PeekS(*Values + OffsetOf(MQTT_Common::MQTT_EVENTDATA\D), -1, #PB_UTF8)
ClientIdentifier = StringField(a$, 1, #ESC$)
Topic = StringField(a$, 2, #ESC$)
ErrorText = ReplaceString(StringField(a$, 3, #ESC$), "{CLIENT}", ClientIdentifier)
Type = *Values\Type
Error = *Values\Error
QoS = *Values\QoS
DUP = *Values\DUP
Retain = *Values\Retain
Identifier = *Values\PacketIdentifier
If *Values\PayLoadLength
Payload = PeekS(*Values\PayLoad, *Values\PayLoadLength, #PB_UTF8 | #PB_ByteLength)
FreeMemory(*Values\PayLoad)
EndIf
FreeMemory(*Values)
;handle it...
Select Type
Case MQTT_Common::#MQTTEvent_ClientConnected
example_LogIT("Client [" + ClientIdentifier + "] connected!")
Case MQTT_Common::#MQTTEvent_ClientDisconnected
example_LogIT("Client [" + ClientIdentifier + "] disconnected!")
Case MQTT_Common::#MQTTEvent_InfoPublished
;for shelly plugs
If #LOG_ANYTHING Or Left(Topic, 9) = "shellies/"
no = #True
For i = 0 To CountGadgetItems(#Combo_Shellies) - 1
If GetGadgetItemText(#Combo_Shellies, i) = ClientIdentifier
no = #False
If Payload = "on"
SetGadgetItemData(#Combo_Shellies, i, 1)
Else
SetGadgetItemData(#Combo_Shellies, i, 0)
EndIf
Break
EndIf
Next i
If no
i = CountGadgetItems(#Combo_Shellies)
AddGadgetItem(#Combo_Shellies, -1, ClientIdentifier)
If Payload = "on"
SetGadgetItemData(#Combo_Shellies, i, 1)
Else
SetGadgetItemData(#Combo_Shellies, i, 0)
EndIf
SetGadgetState(#Combo_Shellies, i)
EndIf
example_OnEvent_ComboShellies()
EndIf
If Retain
RetMsg = "(Retained) "
EndIf
example_LogIT("Client [" + ClientIdentifier + "] Published " + RetMsg + "Topic: " + Topic + ", Payload: " + Payload +
" (Q=" + Str(QoS) + ", R=" + Str(Retain) + ", D=" + Str(DUP) + ", M=" + Str(Identifier) + ")")
Case MQTT_Common::#MQTTEvent_Subscription
example_LogIT("Client [" + ClientIdentifier + "] Subscribed to Topic: " + Topic + " (Q=" + Str(QoS) + ")")
Case MQTT_Common::#MQTTEvent_Error
example_LogIT("[ERROR] " + ErrorText)
EndSelect
EndIf
EndProcedure
Procedure example_OnEvent_ResizeWindow()
ResizeGadget(#Editor_Log, #PB_Ignore, #PB_Ignore, WindowWidth(#MQTT_LogWindow) - 10, WindowHeight(#MQTT_LogWindow) - 40)
ResizeGadget(#Button_SwitchShelly, WindowWidth(#MQTT_LogWindow) / 2 - 40, WindowHeight(#MQTT_LogWindow) - 30, #PB_Ignore, #PB_Ignore)
ResizeGadget(#Combo_Shellies, #PB_Ignore, WindowHeight(#MQTT_LogWindow) - 30, WindowWidth(#MQTT_LogWindow) / 2 - 60, #PB_Ignore)
EndProcedure
Procedure example_OnEvent_Button()
Protected i, Topic.s, *Payload, L
i = GetGadgetState(#Combo_Shellies)
If i > -1
Topic = "shellies/" + GetGadgetItemText(#Combo_Shellies, i) + "/relay/0/command"
If GetGadgetItemData(#Combo_Shellies, i) = 1
*Payload = UTF8("off")
L = 3
Else
*Payload = UTF8("on")
L = 2
EndIf
MQTT_BROKER::_PublishViaServer(Topic, *Payload, L, 0, MQTT_Common::#PayloadType_Buffer)
FreeMemory(*Payload)
EndIf
EndProcedure
Procedure example_main()
Protected BrokerConfig.MQTT_BROKER::SERVER_INIT
OpenWindow(#MQTT_LogWindow, 0, 0, 800, 400, "MQTT Broker Log", #PB_Window_SystemMenu | #PB_Window_SizeGadget | #PB_Window_ScreenCentered | #PB_Window_MinimizeGadget)
EditorGadget(#Editor_Log, 5, 5, 790, 360, #PB_Editor_ReadOnly)
ComboBoxGadget(#Combo_Shellies, 5, 370, 150, 25)
ButtonGadget(#Button_SwitchShelly, 360, 370, 80, 25, "")
DisableGadget(#Button_SwitchShelly, 1)
BindGadgetEvent(#Button_SwitchShelly, @example_OnEvent_Button())
BindGadgetEvent(#Combo_Shellies, @example_OnEvent_ComboShellies())
BindEvent(#PB_Event_SizeWindow, @example_OnEvent_ResizeWindow())
;Set Broker parameters
BrokerConfig\LogWindow = #MQTT_LogWindow
BrokerConfig\LogWindowEvent = #MQTT_Event
;BrokerConfig\PersistantStoragePath = GetUserDirectory(#PB_Directory_ProgramData) + "hex0r" + #PS$ + "MQTT" + #PS$
;BrokerConfig\LogFile = "I:\mqtt.log"
AddElement(BrokerConfig\Access())
BrokerConfig\Access()\Username = "aaaaa"
BrokerConfig\Access()\Password = "bbbbb"
BrokerConfig\Access()\Flag = MQTT_BROKER::#AccessFlag_NoEncryption
CompilerIf Defined(TLS_PBI, #PB_Constant)
If TLS_CERTIFICATE$ And TLS_KEYFILE$
BrokerConfig\Port = 8883
BrokerConfig\TLS\Protocols = #PB_Network_TLS_DEFAULT
BrokerConfig\TLS\CertFile = TLS_CERTIFICATE$
BrokerConfig\TLS\CertKey = TLS_KEYFILE$
EndIf
CompilerEndIf
If MQTT_BROKER::InitServer(@BrokerConfig, #True)
example_LogIT("Listen Server started, waiting for incoming data...")
BindEvent(#MQTT_Event, @example_OnEvent_MQTT_LogData(), #MQTT_LogWindow)
example_OnEvent_ResizeWindow()
Repeat : Until WaitWindowEvent() = #PB_Event_CloseWindow
MQTT_BROKER::DeInitServer()
EndIf
EndProcedure
example_main()
CompilerEndIf