from
boofuzz
import
*
class
EmptyConnection(TCPSocketConnection):
def
__init__(
self
, host, port):
super
().__init__(host, port)
def
open
(
self
):
pass
def
close(
self
):
pass
def
recv(
self
, _
=
1024
):
return
b""
def
send(
self
, data):
print
(f
"Sending fuzz data: {data}"
)
client.publish(
"v1/event/computer/resource/000c29963b29"
, data)
pass
empty_connection
=
EmptyConnection(
"localhost"
,
0
)
target
=
Target(connection
=
empty_connection)
session
=
Session(target
=
target)
while
True
:
try
:
s_initialize(
"MQTT Fuzz"
)
with s_block(
'{"body":'
):
s_static(
'{"body":'
)
s_static(
'{"cpu":'
)
s_static(
'0.004999999888241291'
)
s_static(
',"mem":'
)
s_string(
'0.244140625'
,fuzzable
=
True
)
s_static(
',"network":{"receive":'
)
s_string(
'14.853154182434082'
,fuzzable
=
True
)
s_static(
',"send":'
)
s_string(
'9.8922290802001953'
,fuzzable
=
True
)
s_static(
'},"state":"'
)
s_string(
'normal'
,fuzzable
=
True
)
s_string(
'","volumes":{"/dev/sda2":'
)
s_string(
'7'
)
s_string(
'}},"seq":'
)
s_static(
str
(
int
(
round
(time.time()
*
1000
))))
s_static(
'}'
)
session.connect(s_get(
"MQTT Fuzz"
))
session.fuzz()
time.sleep(
1
)
except
Exception as e:
print
(f
"Error during fuzzing: {e}"
)
break