From b200cf3580cb1d4714864852fe0a33ec2d419695 Mon Sep 17 00:00:00 2001 From: evi1cg <6007471+Ridter@users.noreply.github.com.> Date: Wed, 8 Jan 2025 17:48:07 +0800 Subject: [PATCH] fixed RPC_C_AUTHN_NETLOGON auth error --- impacket/dcerpc/v5/nrpc.py | 32 +++++++++++++++++++++++++++++++- impacket/dcerpc/v5/rpcrt.py | 15 +++++++++------ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/impacket/dcerpc/v5/nrpc.py b/impacket/dcerpc/v5/nrpc.py index 94ed6cb19..3d801e15b 100644 --- a/impacket/dcerpc/v5/nrpc.py +++ b/impacket/dcerpc/v5/nrpc.py @@ -1769,7 +1769,7 @@ def SIGN(data, confounder, sequenceNum, key, aes = False): signature['SequenceNumber'] = encryptSequenceNumberRC4(deriveSequenceNumber(sequenceNum), signature['Checksum'], key) return signature else: - signature = NL_AUTH_SIGNATURE() + signature = NL_AUTH_SHA2_SIGNATURE() signature['SignatureAlgorithm'] = NL_SIGNATURE_HMAC_SHA256 if confounder == '': signature['SealAlgorithm'] = NL_SEAL_NOT_ENCRYPTED @@ -1844,6 +1844,36 @@ def UNSEAL(data, auth_data, key, aes = False): plain = cipher.decrypt(data) return plain, cfounder +def toCompressedUtf8String(domain_name): + if domain_name is None: + raise ValueError("domain_name cannot be None") + + MAX_LABEL_LENGTH = 63 + + buf = bytearray() + labels = domain_name.split('.') + + for label in labels: + label_bytes = label.encode('utf-8') + if len(label_bytes) > MAX_LABEL_LENGTH: + raise ValueError("Label exceeded max length of 63 bytes.") + buf.append(len(label_bytes)) + buf.extend(label_bytes) + buf.append(0) + + return bytes(buf) + +def createNlAuthMessage(clientComputerName, domainName): + auth = NL_AUTH_MESSAGE() + auth['MessageType'] = NL_AUTH_MESSAGE_REQUEST + if '.' in domainName: + auth['Flags'] = NL_AUTH_MESSAGE_NETBIOS_HOST | NL_AUTH_MESSAGE_DNS_DOMAIN + auth['Buffer'] = b(clientComputerName) + b'\x00' + toCompressedUtf8String(domainName) + else: + auth['Flags'] = NL_AUTH_MESSAGE_NETBIOS_DOMAIN | NL_AUTH_MESSAGE_NETBIOS_HOST + auth['Buffer'] = b(domainName) + b'\x00' + b(clientComputerName) + b'\x00' + return auth + def getSSPType1(workstation='', domain='', signingRequired=False): auth = NL_AUTH_MESSAGE() diff --git a/impacket/dcerpc/v5/rpcrt.py b/impacket/dcerpc/v5/rpcrt.py index ab5e4b5b5..2deb3b24d 100644 --- a/impacket/dcerpc/v5/rpcrt.py +++ b/impacket/dcerpc/v5/rpcrt.py @@ -921,9 +921,11 @@ def __init__(self, transport): self.__cipher = None self.__confounder = b'' self.__gss = None + self.__aseandsha = False - def set_session_key(self, session_key): + def set_session_key(self, session_key, aesandsha = False): self.__sessionKey = session_key + self.__aseandsha = aesandsha def get_session_key(self): return self.__sessionKey @@ -1006,7 +1008,8 @@ def bind(self, iface_uuid, alter = 0, bogus_binds = 0, transfer_syntax = ('8a885 use_ntlmv2=self._transport.doesSupportNTLMv2()) elif self.__auth_type == RPC_C_AUTHN_NETLOGON: from impacket.dcerpc.v5 import nrpc - auth = nrpc.getSSPType1(self.__username[:-1], self.__domain, signingRequired=True) + #auth = nrpc.getSSPType1(self.__username[:-1], self.__domain, signingRequired=True) + auth = nrpc.createNlAuthMessage(self.__username[:-1], self.__domain) elif self.__auth_type == RPC_C_AUTHN_GSS_NEGOTIATE: self.__cipher, self.__sessionKey, auth = kerberosv5.getKerberosType1(self.__username, self.__password, self.__domain, self.__lmhash, @@ -1200,7 +1203,7 @@ def _transport_send(self, rpc_packet, forceWriteAndx = 0, forceRecv = 0): self.__clientSealingHandle) elif self.__auth_type == RPC_C_AUTHN_NETLOGON: from impacket.dcerpc.v5 import nrpc - sealedMessage, signature = nrpc.SEAL(plain_data, self.__confounder, self.__sequence, self.__sessionKey, False) + sealedMessage, signature = nrpc.SEAL(plain_data, self.__confounder, self.__sequence, self.__sessionKey, self.__aseandsha) elif self.__auth_type == RPC_C_AUTHN_GSS_NEGOTIATE: sealedMessage, signature = self.__gss.GSS_Wrap(self.__sessionKey, plain_data, self.__sequence) @@ -1227,7 +1230,7 @@ def _transport_send(self, rpc_packet, forceWriteAndx = 0, forceRecv = 0): self.__confounder, self.__sequence, self.__sessionKey, - False) + self.__aseandsha) elif self.__auth_type == RPC_C_AUTHN_GSS_NEGOTIATE: signature = self.__gss.GSS_GetMIC(self.__sessionKey, plain_data, self.__sequence) @@ -1374,7 +1377,7 @@ def recv(self): answer, cfounder = nrpc.UNSEAL(answer, auth_data[len(sec_trailer):], self.__sessionKey, - False) + self.__aseandsha) self.__sequence += 1 elif self.__auth_type == RPC_C_AUTHN_GSS_NEGOTIATE: if self.__sequence > 0: @@ -1406,7 +1409,7 @@ def recv(self): self.__confounder, self.__sequence, self.__sessionKey, - False) + self.__aseandsha) self.__sequence += 1 elif self.__auth_type == RPC_C_AUTHN_GSS_NEGOTIATE: # Do NOT increment the sequence number when Signing Kerberos