; MQTTCommon.pbi
; (c)HeX0R 22.06.2025
; Version 1.09
;
; 1.01: (13.10.2022)
; Added unsubscribe events for Client
;
; 1.02: (20.10.2022)
; Added #MQTTEvent_Info signals
; Removed #Error_ThreadEnded and replaced it with #Info_ThreadEnded
;
; 1.03: (28.11.2022)
; Just improved the readability a little bit
;
; 1.04: (24.12.2022)
; Just improved the readability a little bit
;
; 1.05 (11.01.2023)
; prepared for TLS support
;
; 1.06 (22.02.2023)
; Added 4 Integers to the MQTT_EVENTDATA structure
;
; 1.07 (06.04.2023)
; Added #Error_SendingFailed
;
; 1.08 (25.04.2024)
; Just added some comments to the error constants
;
; 1.09 (22.06.2025)
; Added better network error handling
;
; ----------------------------------------------------------------------------
; "THE BEER-WARE LICENSE":
; <HeX0R@coderbu.de> wrote this file. as long as you retain this notice you
; can do whatever you want with this stuff. If we meet some day, and you think
; this stuff is worth it, you can buy me a beer in return
; or use this =>
; https://send-beer.to/hex0r/
; Or just go out and drink a few on your own/with your friends ;)
;=============================================================================
DeclareModule MQTT_Common
#USE_BASE64_PAYLOAD = #True
;the framework can use (internally) base64 decoded payloads, or memory buffers.
;while base64 should be o.k. usually, MQTT also allows to send huge binary data, even files are allowed (max.: 268,435,455bytes).
;in that case the base64 strings the broker will store, can get quite huge and the en/decoding procedure might also need much time.
;then you better switch to binary storage, maybe also, when you run it on a RaspBi due to it's less memory capacities.
;binary storage is not fully tested, it might lead to memory leaks (although I didn't recognize it until now).
#DEEP_DEBUG = #False ;for debugging purposes only, to see, if all packets and identifiers are being removed successfully
;some Networkerror definitions
#WSA_IO_INCOMPLETE = 996
#WSA_IO_PENDING = 997
CompilerSelect #PB_Compiler_OS
CompilerCase #PB_OS_Windows
#WSAEINTR = 10004
#WSAEMFILE = 10024
#WSAEWOULDBLOCK = 10035
#WSAEINPROGRESS = 10036
#WSAEALREADY = 10037
CompilerCase #PB_OS_Linux
#WSAEINTR = 4 ; EINTR
#WSAEMFILE = 17 ; ENOFILE
#WSAEWOULDBLOCK = 11 ; Eagain
#WSAEINPROGRESS = 115 ; EINPROGRESS
#WSAEALREADY = 114 ; EALREADY
CompilerCase #PB_OS_MacOS
#WSAEINTR = 4 ; EINTR
#WSAEMFILE = 24 ; EMFILE
#WSAEWOULDBLOCK = 35 ; EWOULDBLOCK = EAGAIN
#WSAEINPROGRESS = 36 ; EINPROGRESS
#WSAEALREADY = 37 ; EALREADY
CompilerEndSelect
Enumeration ControlPacketTypes
#Reserved1
#CONNECT
#CONNACK
#PUBLISH
#PUBACK
#PUBREC
#PUBREL
#PUBCOMP
#SUBSCRIBE
#SUBACK
#UNSUBSCRIBE
#UNSUBACK
#PINGREQ
#PINGRESP
#DISCONNECT
#Reserved2
EndEnumeration
CompilerIf #PB_Compiler_Debugger
Global Dim DTypes.s(#Reserved2) ;only used for debugging purposes, not needed for final product
DTypes(#Reserved1) = "Reserved!"
DTypes(#CONNECT) = "CONNECT"
DTypes(#CONNACK) = "CONNACK"
DTypes(#PUBLISH) = "PUBLISH"
DTypes(#PUBACK) = "PUBACK"
DTypes(#PUBREC) = "PUBREC"
DTypes(#PUBREL) = "PUBREL"
DTypes(#PUBCOMP) = "PUBCOMP"
DTypes(#SUBSCRIBE) = "SUBSCRIBE"
DTypes(#SUBACK) = "SUBACK"
DTypes(#UNSUBSCRIBE) = "UNSUBSCRIBE"
DTypes(#UNSUBACK) = "UNSUBACK"
DTypes(#PINGREQ) = "PINGREQ"
DTypes(#PINGRESP) = "PINGRESP"
DTypes(#DISCONNECT) = "DISCONNECT"
CompilerEndIf
;----Enumeration ConnectionReturnValues
Enumeration ConnectionReturnValues ;values are defined in the MQTT description, I just wanted to give them more understandable names
;internal Server Messages, the according messages you'll receive on the client side are mentioned in the comments
#ConnAccepted ;all fine
#ConnRefused_UnacceptableProtocolVersion ;see #Error_UnsupportedProtocolVersion below
#ConnRefused_IdentifierRejected ;see #Error_UnsupportedIdentifier below
#ConnRefused_ServerUnavailable ;see #Error_MQTTServiceUnavailable below
#ConnRefused_BadUsernameOrPassword ;see #Error_BadUsernameOrPassword below
#ConnRefused_NotAuthorized ;see #Error_NotAuthorizedToConnect below
EndEnumeration
;----Enumeration SessionStates
Enumeration SessionStates
#SessionState_inactive
#SessionState_active
EndEnumeration
;----Enumeration PacketState
Enumeration PacketState
#PacketState_Incoming
#PacketState_WaitForAnswer
#PacketState_OutgoingNotSendYet
EndEnumeration
;----Enumeration Errors
Enumeration Errors
#Error_None
#Error_CantStartServer ;SERVER: CreateNetworkServer() failed, maybe port in use? Note: Server thread will end
#Error_CantConnect ;CLIENT: OpenNetworkConnection() failed, wrong IP/URL or Port, or Broker not online? Note: Client thread will end
#Error_BeingDisconnected ;CLIENT: We've received a #PB_NetworkEvent_Disconnect (out of a sudden), Client thread will end
#Error_WrongAnswerReceived ;CLIENT: We've received an invalid MQTT packet, according to the MQTT specs, connection will be closed and thread will end
#Error_TimedOut ;CLIENT: We didn't receive a response from the broker in time, Connection will be closed and Client thread will end
;SERVER: The Client didn't respond in time, connection will be closed and Client removed
#Error_UnsupportedIdentifier ;CLIENT: The broker didn't accept our identifier (while connection handshake) for whatever reason, connection will be closed and thread will end
#Error_UseStopServerFirst ;SERVER: You might have tried to start a server, but it was already running
#Error_UseDeInitServerFirst ;SERVER: You need to call DeInitServer() first, before re-using InitServer()
#Error_NoNetworkAvailable ;SERVER: [deprecated] InitNetwork() failed (only for PB prior to 6.00)
#Error_UseInitServerFirst ;SERVER: Tried to start a server before setting its parameters (via InitServer())
#Error_MQTTServiceUnavailable ;CLIENT: Answer from broker (while connection handshake) for whatever reason, connection will be closed and thread will end
#Error_UnsupportedProtocolVersion ;CLIENT: Answer from broker (while connection handshake), maybe it is a MQTT 5.00 broker only, connection will be closed and thread will end
#Error_BadUsernameOrPassword ;CLIENT: Answer from broker (while connection handshake), username or password were wrong, connection will be closed and thread will end
#Error_NotAuthorizedToConnect ;CLIENT: Answer from broker (while connection handshake), maybe we are banned? Connection will be closed and thread will end
#Error_CorruptedPacketReceived ;SERVER: Corrupted MQTT packet received from client, connection will be closed and client removed.
#Error_LengthOfPacketIncorrect ;CLIENT: User tried to send a packet which exceeds the MQTT payload limit (268435455 Bytes)
;SERVER: Something is wrong with the packet we received from a client, connection will be closed and client removed
#Error_SendingFailed ;CLIENT: Sending a packet failed (for whatever reason)
EndEnumeration
;----some missing API constants (needed for additional TLS settings)
;{
CompilerIf #PB_Compiler_OS = #PB_OS_Windows
;https://github.com/tpn/winsdk-10/blob/master/Include/10.0.16299.0/shared/ws2ipdef.h
#TCP_EXPEDITED_1122 = 2
#TCP_KEEPALIVE = 3
#TCP_MAXSEG = 4
#TCP_MAXRT = 5
#TCP_STDURG = 6
#TCP_NOURG = 7
#TCP_ATMARK = 8
#TCP_NOSYNRETRIES = 9
#TCP_TIMESTAMPS = 10
#TCP_OFFLOAD_PREFERENCE = 11
#TCP_CONGESTION_ALGORITHM = 12
#TCP_DELAY_FIN_ACK = 13
#TCP_MAXRTMS = 14
#TCP_FASTOPEN = 15
#TCP_KEEPCNT = 16
#TCP_KEEPIDLE = #TCP_KEEPALIVE
#TCP_KEEPINTVL = 17
CompilerElse
;https://github.com/leostratus/netinet/blob/master/tcp.h
#TCP_MAXSEG = 2 ;/* set maximum segment size */
#TCP_NOPUSH = 4 ;/* don't push last block of write */
#TCP_NOOPT = 8 ;/* don't use TCP options */
#TCP_MD5SIG = 16 ;/* use MD5 digests (RFC2385) */
#TCP_INFO = 32 ;/* retrieve tcp_info Structure */
#TCP_CONGESTION = 64 ;/* get/set congestion control algorithm */
#TCP_KEEPINIT = 128 ;/* N, time To establish connection */
#TCP_KEEPIDLE = 256 ;/* L,N,X start keeplives after this period */
#TCP_KEEPINTVL = 512 ;/* L,N interval between keepalives */
#TCP_KEEPCNT = 1024 ;/* L,N number of keepalives before close */
CompilerEndIf
;}
;----Constants in preparation to TLS
CompilerIf Defined(PB_Network_TLSv1, #PB_Constant) = 0
#PB_Network_TLSv1_0 = $10
#PB_Network_TLSv1_1 = $20
#PB_Network_TLSv1_2 = $40
#PB_Network_TLSv1_3 = $80
#PB_Network_TLSv1 = #PB_Network_TLSv1_0 | #PB_Network_TLSv1_1 | #PB_Network_TLSv1_2 | #PB_Network_TLSv1_3
CompilerEndIf
;----Enumeration Info
Enumeration Info
#Info_ThreadStarted
#Info_ThreadEnded ;<- can be used for an easy way to know that a servcer/client shut-off/disconnected
EndEnumeration
;----Enumeration EventTypes
Enumeration EventTypes
#MQTTEvent_ClientConnected ;<- Broker only
#MQTTEvent_ClientDisconnected ;<- Broker only
#MQTTEvent_InfoPublished ;<- Broker only
#MQTTEvent_Subscription ;<- Broker only
#MQTTEvent_SuccessfullyConnected ;<- Client only
#MQTTEvent_SuccessfullyDisconnected ;<- Client only
#MQTTEvent_SubscriptionSuccessfull ;<- Client only
#MQTTEvent_SubscriptionDenied ;<- Client only
#MQTTEvent_UnsubscriptionSuccessfull ;<- Client only
#MQTTEvent_UnsibscriptionDenied ;<- Client only
#MQTTEvent_PublishIncoming ;<- Client only
#MQTTEvent_PublishingSuccessfull ;<- Client only
#MQTTEvent_PublishingDenied ;<- Client only
#MQTTEvent_PingSent ;<- Client only
#MQTTEvent_PingReplyReceived ;<- Client only
#MQTTEvent_Error ;<- Client AND Broker
#MQTTEvent_Info ;<- Client and Broker
EndEnumeration
;----Enumeration PayloadType
Enumeration PayloadType
#PayloadType_UnicodeString
#PayloadType_UTF8String
#PayloadType_base64
#PayloadType_Buffer
EndEnumeration
;----Enumeration SendResults
Enumeration SendResults
#SendFailed = -1
#SendNotFinished
#SendFinished
EndEnumeration
;----Structure HEADER
Structure HEADER
PacketType.b
bytes.a[0]
EndStructure
;----Structure MQTT_EVENTDATA
Structure MQTT_EVENTDATA ;used to communicate with the Event Window
Type.u
PacketIdentifier.u
Error.u
QoS.b
DUP.b
Retain.b
*PayLoad
PayLoadLength.i
Reserved.i[4]
D.b[0]
EndStructure
;----Structure Filter
Structure Filter
Topic.s
QoS.b
EndStructure
;----Structure Filter_tmp
Structure Filter_tmp Extends Filter
Add.b
EndStructure
;----Structure PAYLOAD
Structure PAYLOAD
*Buffer
BufferLengh.i
PayLoadBase64.s
EndStructure
;----Structure NOTYETSENT
Structure NOTYETSENT
*Buffer
BufferLength.i
EndStructure
;----Structure PACKET
Structure PACKET
Type.w ;part of the header of any packet => type of this packet, see enumeration ControlPacketTypes above
Flags.w ;part of the header of any packet => not often really used in MQTT, just make sure, it fits the requirements
PacketIdentifier.u ;used in PUBLISH packets with higher QoS and in SUBSCRIBE packets
QoS.b ;Quality of Service code
DUP.b ;used in PUBLISH packets, to indicate this is not the first time the server was sending that packet (DUPlicate)
Retain.b ;when set to 1 the server should keep the message, to send it to clients which are currently disconnected.
; as soon as they connect and re-subscribe, the retained messages will be sent to them.
TopicName.s ;Topic name of Published items
PacketState.b ;incoming, outgoing or waiting for a reply, see enumeration PacketState above
ReSendAt.q ;in case we didn't got a usable answer, we set a new time for resending it
WaitForAnswerSince.q ;holds the time, since when we are waiting for a reply
NYS.NOTYETSENT ;added this to handle huge payloads also.
; huge payloads can not be sent in one piece, therefore we have to store the remaining data here
PayLoad.PAYLOAD
; Content of ... whatever type this packet is
; MQTT supports binary content also, although most clients are sending UTF8 strings.
; to remain compatible, this framework will store the Payload as base64 encoded strings
; because the MQTT protocol V3.1.1 has no flag to show us, what kind of data it really is.
List tmpSubsc.Filter_tmp() ;temp. Filters being transmitted for subscriptions
; They will be integrated into Sessions()\Subscriptions() then
EndStructure
;----Structure MINI_PACKET
Structure MINI_PACKET ;comes in handy, for very small packets we have to send (like #PINGRESP e.g.)
a.a[4]
EndStructure
Declare.s ErrorDescription(Error)
Declare GetNetworkError()
CompilerIf #PB_Compiler_OS = #PB_OS_Linux
ImportC ""
errno_location() As "__errno_location"
EndImport
CompilerEndIf
EndDeclareModule
Module MQTT_Common
Procedure.s ErrorDescription(Error)
Protected Result.s
Select Error
Case #Error_None
Result = "No error!"
Case #Error_CantStartServer
Result = "Unable to start the Broker! (Listenport in use?)"
Case #Error_CantConnect
Result = "Can't connect to Broker!"
Case #Error_BeingDisconnected
Result = "Somehow we have been disconnected!"
Case #Error_WrongAnswerReceived
Result = "Wrong Answer from {CLIENT} received!"
Case #Error_TimedOut
Result = "{CLIENT} Timed out!"
Case #Error_UnsupportedIdentifier
Result = "This ClientIdentifier ({CLIENT}) is not allowed!"
Case #Error_UseStopServerFirst
Result = "Use StopServer() first!"
Case #Error_UseDeInitServerFirst
Result = "Use DeInitServer() first!"
Case #Error_NoNetworkAvailable
Result = "Can't initialize network!"
Case #Error_UseInitServerFirst
Result = "Use InitServer() first!"
Case #Error_MQTTServiceUnavailable
Result = "MQTT Service is unavailable!"
Case #Error_UnsupportedProtocolVersion
Result = "MQTT Protocol version not accepted!"
Case #Error_BadUsernameOrPassword
Result = "Username and/or Password wrong!"
Case #Error_NotAuthorizedToConnect
Result = "You are not authorized to connect to that broker!"
Case #Error_CorruptedPacketReceived
Result = "Corrupted Packet from {CLIENT} received!"
Case #Error_LengthOfPacketIncorrect
Result = "Invalid Packetlength from {CLIENT} received!"
Case #Error_SendingFailed
Result = "Sending Packet failed!"
EndSelect
ProcedureReturn Result
EndProcedure
Procedure GetNetworkError()
Protected Result
CompilerIf #PB_Compiler_OS = #PB_OS_Windows
Result = WSAGetLastError_()
CompilerElse
CompilerIf #PB_Compiler_Backend = #PB_Backend_C
! #include "errno.h"
! extern int errno;
! v_error = errno;
CompilerElse
Result = PeekL(__errno_location())
CompilerEndIf
CompilerEndIf
ProcedureReturn Result
EndProcedure
EndModule