From 489045cefe2782fb410979ed527926deeff6d372 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 19 Nov 2024 16:48:09 +0100 Subject: [PATCH 1/6] fix: [stix2 import] Fixed missing method name --- misp_stix_converter/stix2misp/external_stix2_to_misp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misp_stix_converter/stix2misp/external_stix2_to_misp.py b/misp_stix_converter/stix2misp/external_stix2_to_misp.py index 7719f94..c4149ea 100644 --- a/misp_stix_converter/stix2misp/external_stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/external_stix2_to_misp.py @@ -114,7 +114,7 @@ def malware_parser(self) -> ExternalSTIX2MalwareConverter: @property def observable_object_parser(self) -> STIX2ObservableObjectConverter: if not hasattr(self, '_observable_object_parser'): - self._set_observable_object_parser() + self._observable_object_parser = STIX2ObservableObjectConverter(self) return self._observable_object_parser @property From 74a8d7c9e28d113c955cbfbde9b0fa5cb215e2e7 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 21 Nov 2024 09:46:42 +0100 Subject: [PATCH 2/6] fix: [stix2 import] Better parsing for observables referenced in malwares objects - Merged parsing methods common to standalone observable objects & observable referenced by malware objects, in order to improve the conversion as observable referenced by malware objects were missing for instance the file references parsing --- .../stix2_observable_objects_converter.py | 407 ++++++++---------- 1 file changed, 179 insertions(+), 228 deletions(-) diff --git a/misp_stix_converter/stix2misp/converters/stix2_observable_objects_converter.py b/misp_stix_converter/stix2misp/converters/stix2_observable_objects_converter.py index c580550..3e4b50b 100644 --- a/misp_stix_converter/stix2misp/converters/stix2_observable_objects_converter.py +++ b/misp_stix_converter/stix2misp/converters/stix2_observable_objects_converter.py @@ -34,41 +34,16 @@ ] -class STIX2ObservableObjectConverter(ExternalSTIX2ObservableConverter): - def __init__(self, main: 'ExternalSTIX2toMISPParser'): - self._set_main_parser(main) - self._mapping = ExternalSTIX2ObservableMapping - - def _create_misp_attribute( - self, attribute_type: str, observable: DomainName, - feature: Optional[str] = 'value', comment: Optional[str] = None, - **kwargs: Dict[str, str]) -> MISPAttribute: - return { - 'value': getattr(observable, feature), 'type': attribute_type, - **kwargs, **self.main_parser._sanitise_attribute_uuid( - observable.id, comment=comment - ) - } - - def _create_misp_object_from_observable_object( - self, name: str, observable: _OBSERVABLE_TYPING) -> MISPObject: - misp_object = MISPObject( - name, force_timestamps=True, - misp_objects_path_custom=_MISP_OBJECTS_PATH - ) - self.main_parser._sanitise_object_uuid( - misp_object, observable.get('id') - ) - return misp_object +class STIX2SampleObervableParser(metaclass=ABCMeta): def _parse_artifact_observable_object( - self, artifact_ref: str) -> MISPObject: + self, artifact_ref: str, *args) -> MISPObject: observable = self._fetch_observable(artifact_ref) if observable['used'].get(self.event_uuid, False): return observable['misp_object'] artifact = observable['observable'] - artifact_object = self._create_misp_object_from_observable_object( - 'artifact', artifact + artifact_object = self._create_misp_object_from_observable( + 'artifact', artifact, *args ) for attribute in self._parse_artifact_observable(artifact): artifact_object.add_attribute(**attribute) @@ -79,47 +54,8 @@ def _parse_artifact_observable_object( observable['misp_object'] = misp_object return misp_object - def _parse_as_observable_object(self, as_ref: str) -> _MISP_CONTENT_TYPING: - observable = self._fetch_observable(as_ref) - if observable['used'].get(self.event_uuid, False): - return observable.get( - 'misp_object', observable.get('misp_attribute') - ) - autonomous_system = observable['observable'] - attributes = tuple( - self._parse_ip_addresses_belonging_to_AS(autonomous_system.id) - ) - observable['used'][self.event_uuid] = True - if attributes: - AS_object = self._create_misp_object_from_observable_object( - 'asn', autonomous_system - ) - value = f'AS{autonomous_system.number}' - AS_object.add_attribute( - 'asn', value, - uuid=self.main_parser._create_v5_uuid( - f'{autonomous_system.id} - asn - {value}' - ) - ) - for attribute in attributes: - AS_object.add_attribute(**attribute) - misp_object = self.main_parser._add_misp_object( - AS_object, autonomous_system - ) - observable['misp_object'] = misp_object - return misp_object - attribute = { - 'type': 'AS', 'value': f'AS{autonomous_system.number}', - **self.main_parser._sanitise_attribute_uuid(autonomous_system.id) - } - misp_attribute = self.main_parser._add_misp_attribute( - attribute, autonomous_system - ) - observable['misp_attribute'] = misp_attribute - return misp_attribute - def _parse_directory_observable_object( - self, directory_ref: str, child: Optional[str] = None + self, directory_ref: str, *args, child: Optional[str] = None ) -> _MISP_CONTENT_TYPING: observable = self._fetch_observable(directory_ref) if observable['used'].get(self.event_uuid, False): @@ -135,8 +71,8 @@ def _parse_directory_observable_object( ) ) if force_object: - directory_object = self._create_misp_object_from_observable_object( - 'directory', directory + directory_object = self._create_misp_object_from_observable( + 'directory', directory, *args ) for attribute in attributes: directory_object.add_attribute(**attribute) @@ -173,6 +109,170 @@ def _parse_directory_observable_object( observable['misp_object'] = file_object return misp_object + def _parse_file_observable_object(self, file_ref: str, *args) -> MISPObject: + observable = self._fetch_observable(file_ref) + if observable['used'].get(self.event_uuid, False): + return observable['misp_object'] + _file = observable['observable'] + file_object = self._create_misp_object_from_observable( + 'file', _file, *args + ) + for attribute in self._parse_file_observable(_file): + file_object.add_attribute(**attribute) + observable['used'][self.event_uuid] = True + misp_object = self.main_parser._add_misp_object(file_object, _file) + observable['misp_object'] = misp_object + if hasattr(_file, 'content_ref'): + artifact_object = self._parse_artifact_observable_object( + _file.content_ref + ) + artifact_object.add_reference(misp_object.uuid, 'content-of') + if hasattr(_file, 'parent_directory_ref'): + self._parse_directory_observable_object( + _file.parent_directory_ref, child=_file.id + ) + if hasattr(_file, 'extensions'): + extensions = _file.extensions + if extensions.get('archive-ext'): + archive_ext = extensions['archive-ext'] + if hasattr(archive_ext, 'comment'): + comment = archive_ext.comment + if hasattr(misp_object, 'comment'): + comment = f'{comment} - {misp_object.comment}' + misp_object.comment = comment + for contains_ref in archive_ext.contains_refs: + object_type = contains_ref.split('--')[0] + contained_object = getattr( + self, + f'_parse_{object_type}_observable_object' + )( + contains_ref + ) + misp_object.add_reference(contained_object.uuid, 'contains') + if extensions.get('windows-pebinary-ext'): + pe_object = self._parse_file_pe_extension_observable_object( + _file + ) + misp_object.add_reference(pe_object.uuid, 'includes') + return misp_object + + def _parse_file_pe_extension_observable_object( + self, observable: File) -> MISPObject: + extension = observable.extensions['windows-pebinary-ext'] + pe_object = self._create_misp_object('pe') + pe_object.from_dict( + uuid=self.main_parser._create_v5_uuid( + f'{observable.id} - windows-pebinary-ext' + ) + ) + attributes = self._parse_pe_extension_observable( + extension, f'{observable.id} - windows-pebinary-ext' + ) + for attribute in attributes: + pe_object.add_attribute(**attribute) + misp_object = self.main_parser._add_misp_object(pe_object, observable) + if hasattr(extension, 'sections'): + for index, section in enumerate(extension.sections): + section_object = self._create_misp_object('pe-section') + section_object.from_dict( + uuid=self.main_parser._create_v5_uuid( + f'{observable.id} - section #{index}' + ) + ) + attributes = self._parse_pe_section_observable( + section, f'{observable.id} - section #{index}' + ) + for attribute in attributes: + section_object.add_attribute(**attribute) + self.main_parser._add_misp_object(section_object, observable) + misp_object.add_reference(section_object.uuid, 'includes') + return misp_object + + def _parse_software_observable_object( + self, software_ref: str, *args) -> MISPObject: + observable = self._fetch_observable(software_ref) + if observable['used'].get(self.event_uuid, False): + return observable['misp_object'] + software = observable['observable'] + software_object = self._create_misp_object_from_observable( + 'software', software, *args + ) + for attribute in self._parse_generic_observable(software, 'software'): + software_object.add_attribute(**attribute) + observable['used'][self.event_uuid] = True + misp_object = self.main_parser._add_misp_object( + software_object, software + ) + observable['misp_object'] = misp_object + return misp_object + +class STIX2ObservableObjectConverter( + STIX2SampleObervableParser, ExternalSTIX2ObservableConverter): + def __init__(self, main: 'ExternalSTIX2toMISPParser'): + self._set_main_parser(main) + self._mapping = ExternalSTIX2ObservableMapping + + def _create_misp_attribute( + self, attribute_type: str, observable: DomainName, + feature: Optional[str] = 'value', comment: Optional[str] = None, + **kwargs: Dict[str, str]) -> MISPAttribute: + return { + 'value': getattr(observable, feature), 'type': attribute_type, + **kwargs, **self.main_parser._sanitise_attribute_uuid( + observable.id, comment=comment + ) + } + + def _create_misp_object_from_observable( + self, name: str, observable: _OBSERVABLE_TYPING) -> MISPObject: + misp_object = MISPObject( + name, force_timestamps=True, + misp_objects_path_custom=_MISP_OBJECTS_PATH + ) + self.main_parser._sanitise_object_uuid( + misp_object, observable.get('id') + ) + return misp_object + + def _parse_as_observable_object(self, as_ref: str) -> _MISP_CONTENT_TYPING: + observable = self._fetch_observable(as_ref) + if observable['used'].get(self.event_uuid, False): + return observable.get( + 'misp_object', observable.get('misp_attribute') + ) + autonomous_system = observable['observable'] + attributes = tuple( + self._parse_ip_addresses_belonging_to_AS(autonomous_system.id) + ) + observable['used'][self.event_uuid] = True + if attributes: + AS_object = self._create_misp_object_from_observable( + 'asn', autonomous_system + ) + value = f'AS{autonomous_system.number}' + AS_object.add_attribute( + 'asn', value, + uuid=self.main_parser._create_v5_uuid( + f'{autonomous_system.id} - asn - {value}' + ) + ) + for attribute in attributes: + AS_object.add_attribute(**attribute) + misp_object = self.main_parser._add_misp_object( + AS_object, autonomous_system + ) + observable['misp_object'] = misp_object + return misp_object + attribute = { + 'type': 'AS', 'value': f'AS{autonomous_system.number}', + **self.main_parser._sanitise_attribute_uuid(autonomous_system.id) + } + misp_attribute = self.main_parser._add_misp_attribute( + attribute, autonomous_system + ) + observable['misp_attribute'] = misp_attribute + return misp_attribute + def _parse_domain_observable_object( self, domain_ref: str) -> _MISP_CONTENT_TYPING: observable = self._fetch_observable(domain_ref) @@ -182,7 +282,7 @@ def _parse_domain_observable_object( ) domain_name = observable['observable'] if hasattr(domain_name, 'resolves_to_refs'): - domain_object = self._create_misp_object_from_observable_object( + domain_object = self._create_misp_object_from_observable( 'domain-ip', domain_name ) for attribute in self._parse_domain_observable(domain_name): @@ -267,7 +367,7 @@ def _parse_email_message_observable_object( if observable['used'].get(self.event_uuid, False): return observable['misp_object'] email_message = observable['observable'] - email_object = self._create_misp_object_from_observable_object( + email_object = self._create_misp_object_from_observable( 'email', email_message ) for attribute in self._parse_email_observable(email_message): @@ -300,85 +400,6 @@ def _parse_email_message_observable_object( observable['misp_object'] = misp_object return misp_object - def _parse_file_observable_object(self, file_ref: str) -> MISPObject: - observable = self._fetch_observable(file_ref) - if observable['used'].get(self.event_uuid, False): - return observable['misp_object'] - _file = observable['observable'] - file_object = self._create_misp_object_from_observable_object( - 'file', _file - ) - for attribute in self._parse_file_observable(_file): - file_object.add_attribute(**attribute) - observable['used'][self.event_uuid] = True - misp_object = self.main_parser._add_misp_object(file_object, _file) - observable['misp_object'] = misp_object - if hasattr(_file, 'content_ref'): - artifact_object = self._parse_artifact_observable_object( - _file.content_ref - ) - artifact_object.add_reference(misp_object.uuid, 'content-of') - if hasattr(_file, 'parent_directory_ref'): - self._parse_directory_observable_object( - _file.parent_directory_ref, child=_file.id - ) - if hasattr(_file, 'extensions'): - extensions = _file.extensions - if extensions.get('archive-ext'): - archive_ext = extensions['archive-ext'] - if hasattr(archive_ext, 'comment'): - comment = archive_ext.comment - if hasattr(misp_object, 'comment'): - comment = f'{comment} - {misp_object.comment}' - misp_object.comment = comment - for contains_ref in archive_ext.contains_refs: - object_type = contains_ref.split('--')[0] - contained_object = getattr( - self, - f'_parse_{object_type}_observable_object' - )( - contains_ref - ) - misp_object.add_reference(contained_object.uuid, 'contains') - if extensions.get('windows-pebinary-ext'): - pe_object = self._parse_file_pe_extension_observable_object( - _file - ) - misp_object.add_reference(pe_object.uuid, 'includes') - return misp_object - - def _parse_file_pe_extension_observable_object( - self, observable: File) -> MISPObject: - extension = observable.extensions['windows-pebinary-ext'] - pe_object = self._create_misp_object('pe') - pe_object.from_dict( - uuid=self.main_parser._create_v5_uuid( - f'{observable.id} - windows-pebinary-ext' - ) - ) - attributes = self._parse_pe_extension_observable( - extension, f'{observable.id} - windows-pebinary-ext' - ) - for attribute in attributes: - pe_object.add_attribute(**attribute) - misp_object = self.main_parser._add_misp_object(pe_object, observable) - if hasattr(extension, 'sections'): - for index, section in enumerate(extension.sections): - section_object = self._create_misp_object('pe-section') - section_object.from_dict( - uuid=self.main_parser._create_v5_uuid( - f'{observable.id} - section #{index}' - ) - ) - attributes = self._parse_pe_section_observable( - section, f'{observable.id} - section #{index}' - ) - for attribute in attributes: - section_object.add_attribute(**attribute) - self.main_parser._add_misp_object(section_object, observable) - misp_object.add_reference(section_object.uuid, 'includes') - return misp_object - def _parse_ip_addresses_belonging_to_AS(self, AS_id: str): for content in self.main_parser._observable.values(): observable = content['observable'] @@ -435,7 +456,7 @@ def _parse_network_traffic_observable_object( return observable['misp_object'] network_traffic = observable['observable'] name = self._parse_network_traffic_observable_fields(network_traffic) - network_object = self._create_misp_object_from_observable_object( + network_object = self._create_misp_object_from_observable( name, network_traffic ) feature = f"_parse_{name.replace('-', '_')}_observable" @@ -477,7 +498,7 @@ def _parse_process_observable_object(self, process_ref: str) -> MISPObject: if observable['used'].get(self.event_uuid, False): return observable['misp_object'] process = observable['observable'] - process_object = self._create_misp_object_from_observable_object( + process_object = self._create_misp_object_from_observable( 'process', process ) for attribute in self._parse_process_observable(process): @@ -517,7 +538,7 @@ def _parse_registry_key_observable_object(self, registry_key_ref: str): if observable['used'].get(self.event_uuid, False): return observable['misp_object'] registry_key = observable['observable'] - registry_key_object = self._create_misp_object_from_observable_object( + registry_key_object = self._create_misp_object_from_observable( 'registry-key', registry_key ) for attribute in self._parse_registry_key_observable(registry_key): @@ -545,24 +566,6 @@ def _parse_registry_key_observable_object(self, registry_key_ref: str): self.main_parser._add_misp_object(value_object, registry_key) return misp_object - def _parse_software_observable_object( - self, software_ref: str) -> MISPObject: - observable = self._fetch_observable(software_ref) - if observable['used'].get(self.event_uuid, False): - return observable['misp_object'] - software = observable['observable'] - software_object = self._create_misp_object_from_observable_object( - 'software', software - ) - for attribute in self._parse_generic_observable(software, 'software'): - software_object.add_attribute(**attribute) - observable['used'][self.event_uuid] = True - misp_object = self.main_parser._add_misp_object( - software_object, software - ) - observable['misp_object'] = misp_object - return misp_object - def _parse_url_observable_object(self, url_ref: str) -> MISPAttribute: observable = self._fetch_observable(url_ref) if observable['used'].get(self.event_uuid, False): @@ -581,7 +584,7 @@ def _parse_user_account_observable_object( if observable['used'].get(self.event_uuid, False): return observable['misp_object'] user_account = observable['observable'] - user_account_object = self._create_misp_object_from_observable_object( + user_account_object = self._create_misp_object_from_observable( 'user-account', user_account ) for attribute in self._parse_user_account_observable(user_account): @@ -598,7 +601,7 @@ def _parse_x509_observable_object(self, x509_ref: str) -> MISPObject: if observable['used'].get(self.event_uuid, False): return observable['misp_object'] x509 = observable['observable'] - x509_object = self._create_misp_object_from_observable_object( + x509_object = self._create_misp_object_from_observable( 'x509', x509 ) for attribute in self._parse_x509_observable(x509): @@ -609,7 +612,7 @@ def _parse_x509_observable_object(self, x509_ref: str) -> MISPObject: return misp_object -class STIX2SampleObservableConverter(metaclass=ABCMeta): +class STIX2SampleObservableConverter(STIX2SampleObervableParser): def __init__(self, main: _MAIN_CONVERTER_TYPING): self._main_converter = main @@ -634,58 +637,6 @@ def _create_misp_object_from_observable( misp_object.from_dict(**self._main_converter._parse_timeline(malware)) return misp_object - def _parse_artifact_observable_object( - self, artifact_ref: str, malware: Malware) -> MISPObject: - observable = self.main_parser._observable[artifact_ref] - if observable['used'][self.event_uuid]: - return observable['misp_object'] - artifact = observable['observable'] - artifact_object = self._create_misp_object_from_observable( - 'artifact', artifact, malware - ) - for attribute in self._parse_artifact_observable(artifact): - artifact_object.add_attribute(**attribute) - observable['used'][self.event_uuid] = True - misp_object = self._main_parser._add_misp_object( - artifact_object, artifact - ) - observable['misp_object'] = misp_object - return misp_object - - def _parse_file_observable_object( - self, file_ref: str, malware: Malware) -> MISPObject: - observable = self.main_parser._observable[file_ref] - if observable['used'][self.event_uuid]: - return observable['misp_object'] - _file = observable['observable'] - file_object = self._create_misp_object_from_observable( - 'file', _file, malware - ) - for attribute in self._parse_file_observable(_file): - file_object.add_attribute(**attribute) - observable['used'][self.event_uuid] = True - misp_object = self.main_parser._add_misp_object(file_object, _file) - observable['misp_object'] = misp_object - return misp_object - - def _parse_software_observable_object( - self, software_ref: str, malware: Malware) -> MISPObject: - observable = self.main_parser._observable[software_ref] - if observable['used'][self.event_uuid]: - return observable['misp_object'] - software = observable['observable'] - software_object = self._create_misp_object_from_observable( - 'software', software, malware - ) - for attribute in self._parse_generic_observable(software, 'software'): - software_object.add_attribute(**attribute) - observable['used'][self.event_uuid] = True - misp_object = self.main_parser._add_misp_object( - software_object, software - ) - observable['misp_object'] = misp_object - return misp_object - class ExternalSTIX2SampleObservableConverter( STIX2SampleObservableConverter, ExternalSTIX2ObservableConverter): From a28b07312f2cd984e6804ea37aed94dd556903c2 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Mon, 25 Nov 2024 12:13:37 +0100 Subject: [PATCH 3/6] fix: [stix2 import] Avoiding KeyError exceptions while parsing standalone STIX 2.1 observable objects --- misp_stix_converter/stix2misp/external_stix2_to_misp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misp_stix_converter/stix2misp/external_stix2_to_misp.py b/misp_stix_converter/stix2misp/external_stix2_to_misp.py index c4149ea..6ede5b5 100644 --- a/misp_stix_converter/stix2misp/external_stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/external_stix2_to_misp.py @@ -225,7 +225,7 @@ def _handle_unparsed_content(self): return super()._handle_unparsed_content() unparsed_content = defaultdict(list) for object_id, content in self._observable.items(): - if content['used'][self.misp_event.uuid]: + if content['used'].get(self.misp_event.uuid, True): continue unparsed_content[content['observable'].type].append(object_id) for observable_type in self._mapping.observable_object_types(): From 6badfd6eb3783744fa2314bcea6dcf1062c49695 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Fri, 29 Nov 2024 12:15:32 +0100 Subject: [PATCH 4/6] fix: [stix2 import] Avoiding issues with event tags variable when we are parsing STIX documents with no report or grouping --- misp_stix_converter/stix2misp/stix2_to_misp.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index f5f05be..4275d66 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -191,8 +191,8 @@ def _parse_stix_bundle(self): ############################################################################ @property - def event_tags(self) -> list: - return self.__event_tags + def event_tags(self) -> set: + return getattr(self, '_event_tags', set()) @property def generic_info_field(self) -> str: @@ -422,7 +422,7 @@ def _handle_object(self, object_type: str, object_ref: str): def _handle_misp_event_tags( self, misp_event: MISPEvent, stix_object: _GROUPING_REPORT_TYPING): - self.__event_tags = set() + self._event_tags = set() for marking in self._handle_tags_from_stix_fields(stix_object): if isinstance(marking, str): misp_event.add_tag(marking) From 01baf4bfa7e18f0dbbcc0eccb8feca0aeadaa31f Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Fri, 29 Nov 2024 12:16:50 +0100 Subject: [PATCH 5/6] fix: [stix2 import] Utilising the set of creator id references to skip parsing identity objects that are mentioned is STIX objects with the `created_by_ref` field --- .../converters/stix2_identity_converter.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/misp_stix_converter/stix2misp/converters/stix2_identity_converter.py b/misp_stix_converter/stix2misp/converters/stix2_identity_converter.py index 48439bc..d6587f3 100644 --- a/misp_stix_converter/stix2misp/converters/stix2_identity_converter.py +++ b/misp_stix_converter/stix2misp/converters/stix2_identity_converter.py @@ -104,16 +104,17 @@ def __init__(self, main: 'ExternalSTIX2toMISPParser'): self._mapping = ExternalSTIX2IdentityMapping def parse(self, identity_ref: str): - identity = self.main_parser._get_stix_object(identity_ref) - if getattr(identity, 'identity_class', None) == 'class': - self._parse_galaxy(identity, 'sector') - else: - name = self._fetch_identity_object_name(identity) - getattr(self, f'_parse_{name}_object')(identity) - misp_object = self._create_misp_object(name, identity) - for attribute in self._generic_parser(identity, name): - misp_object.add_attribute(**attribute) - self.main_parser._add_misp_object(misp_object, identity) + if identity_ref not in self.main_parser._creators: + identity = self.main_parser._get_stix_object(identity_ref) + if getattr(identity, 'identity_class', None) == 'class': + self._parse_galaxy(identity, 'sector') + else: + name = self._fetch_identity_object_name(identity) + getattr(self, f'_parse_{name}_object')(identity) + misp_object = self._create_misp_object(name, identity) + for attribute in self._generic_parser(identity, name): + misp_object.add_attribute(**attribute) + self.main_parser._add_misp_object(misp_object, identity) @staticmethod def _fetch_identity_object_name(identity: _IDENTITY_TYPING) -> str: From 765bc26beed31395f66fdaf2a75a0ee0c7d128c4 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Fri, 29 Nov 2024 17:46:58 +0100 Subject: [PATCH 6/6] add: [stix2 import] Adding to the Event the information on the producer using the `producer` galaxy - Looks for now at the `created_by_ref` identity when it is provided with the report or grouping, or at the list of creators gathered through all the loaded STIX objects when there is no report or grouping in a given Bundle - Would need additional processing in some complex cases with multiple producers and no producer is set by the user using the corresponding argument --- misp_stix_converter/stix2misp/stix2_to_misp.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index 4275d66..22c03ab 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -962,6 +962,9 @@ def _create_generic_event(self) -> MISPEvent: misp_event.from_dict(**event_args) if self.producer is not None: misp_event.add_tag(f'misp-galaxy:producer="{self.producer}"') + elif len(self._creators) == 1: + producer = self._handle_creator(tuple(self._creators)[0]) + misp_event.add_tag(f'misp-galaxy:producer="{producer}"') return misp_event def _create_misp_event( @@ -989,6 +992,9 @@ def _create_misp_event( self._add_analyst_data(misp_event, reference) if self.producer is not None: misp_event.add_tag(f'misp-galaxy:producer="{self.producer}"') + elif hasattr(stix_object, 'created_by_ref'): + producer = self._handle_creator(stix_object.created_by_ref) + misp_event.add_tag(f'misp-galaxy:producer="{producer}"') self._handle_misp_event_tags(misp_event, stix_object) return misp_event