-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathutils.py
144 lines (106 loc) · 4.7 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
from maltego_trx.maltego import MaltegoEntity
from lxml import html
from settings import bot_token
def message_is_forwarded_from_another_chat(message, username):
if hasattr(message, "forward_from_chat") \
and message.forward_from_chat is not None \
and message.forward_from_chat.username != username:
return True
return False
def make_http_request(url, method="GET", params=None, retries=3, backoff_factor=0.3, timeout=10):
try:
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504]
)
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
try:
response = session.request(method, url, params=params, timeout=timeout)
response.raise_for_status()
return response.json()
except ValueError:
return response.content
except requests.RequestException as e:
logging.error(f"HTTP request failed for {url}: {e}")
return None
def fetch_web_info(username):
photo = None
full_name = None
response_data = make_http_request(f"https://t.me/{username}")
tree = html.fromstring(response_data)
images = tree.cssselect("img.tgme_page_photo_image")
if images:
photo = images[0].get("src")
title = tree.cssselect('.tgme_page_title span')
if title:
full_name = title[0].text_content()
return {"full_name": full_name, "photo": photo}
def create_maltego_entity(entity, obj):
identity = obj.username if obj.username else obj.id
entity = MaltegoEntity(entity, identity)
exclude_keys = ["raw", "photo", "_client", "usernames"]
if isinstance(obj, dict):
attributes = obj.items()
elif hasattr(obj, "__dict__"):
attributes = vars(obj).items()
elif hasattr(obj, "__slots__"):
attributes = [(attr, getattr(obj, attr)) for attr in obj.__slots__]
else:
raise ValueError("Unsupported object type. Must be dict or object with __dict__ or __slots__.")
for key, value in attributes:
if key is not None and value and key not in exclude_keys:
entity.addProperty(
f"properties.{key}",
value=value
)
return entity
def process_profile_entity(profile):
if profile.username:
profile_entity = MaltegoEntity("interlinked.telegram.UserProfile", value=profile.username)
user_info = fetch_web_info(profile.username)
profile_entity.addProperty("properties.photo", value=user_info["photo"])
else:
profile_entity = MaltegoEntity("interlinked.telegram.UserProfile", value=profile.id)
profile_entity.addProperty("properties.id", value=profile.id)
if profile.phone_number:
profile_entity.addProperty("properties.phone", value=profile.phone_number)
profile_entity.addProperty("properties.first_name", value=profile.first_name)
if profile.last_name:
profile_entity.addProperty("properties.last_name", value=profile.last_name)
return profile_entity
class MediaFetcher:
def __init__(self):
self.bot_token = bot_token
self.base_url = "https://api.telegram.org"
def get_media_file_id(self, name, media_type="sticker"):
url = (
f"{self.base_url}/bot{self.bot_token}/getStickerSet"
if media_type == "sticker"
else f"{self.base_url}/bot{self.bot_token}/getCustomEmojiStickers"
)
params = {"name": name} if media_type == "sticker" else {"custom_emoji_ids": [name]}
response_data = make_http_request(url, params=params)
if response_data:
if media_type == "sticker":
return response_data["result"]["stickers"][0]["thumbnail"].get("file_id")
elif media_type == "emoji":
return response_data["result"][0]["thumbnail"]
return None
def get_file_path(self, file_id):
url = f"{self.base_url}/bot{self.bot_token}/getFile"
params = {"file_id": file_id}
response_data = make_http_request(url, params=params)
return response_data["result"].get("file_path") if response_data else None
def get_media_preview_url(self, name, file_id=None, media_type="sticker"):
file_id = self.get_media_file_id(name, media_type=media_type) if file_id is None else file_id
if file_id:
file_path = self.get_file_path(file_id)
return f"{self.base_url}/file/bot{self.bot_token}/{file_path}" if file_path else None
return None
media_fetcher = MediaFetcher()