diff --git a/misp_stix_converter/__init__.py b/misp_stix_converter/__init__.py index 9a8c6ec..08838e8 100644 --- a/misp_stix_converter/__init__.py +++ b/misp_stix_converter/__init__.py @@ -159,6 +159,10 @@ def main(): '-cg', '--cluster_sharing_group', type=int, default=None, help='Galaxy Clusters sharing group ID in case of External STIX 2 content.' ) + import_parser.add_argument( + '-t', '--title', type=str, default=None, + help='Title prefix to add to the MISP Event `info` field.' + ) import_parser.add_argument( '-p', '--producer', help=( diff --git a/misp_stix_converter/misp2stix/misp_to_stix21.py b/misp_stix_converter/misp2stix/misp_to_stix21.py index 3d3741e..5c66ef5 100644 --- a/misp_stix_converter/misp2stix/misp_to_stix21.py +++ b/misp_stix_converter/misp2stix/misp_to_stix21.py @@ -132,7 +132,7 @@ def _parse_event_data(self): list(object_refs) if object_refs else self._handle_empty_note_refs() ) - self._append_SDO(Note(**note_args)) + self._append_SDO(self._create_note(note_args)) self._handle_analyst_data(note_args['id'], event_report) else: self._id_parsing_function = { @@ -199,7 +199,7 @@ def _handle_empty_object_refs(self, object_id: str, timestamp: datetime): 'created_by_ref': self.identity_id, 'object_refs': [object_id], 'content': 'This MISP Event is empty and contains no attribute, object, galaxy or tag.' } - self._append_SDO(Note(**note_args)) + self._append_SDO(self._create_note(note_args)) def _handle_markings(self, object_args: dict, markings: tuple): marking_ids = [] @@ -236,7 +236,9 @@ def _handle_note_data(self, note, object_id: str): } if note.get('language'): note_args['lang'] = note['language'] - getattr(self, self._results_handling_function)(Note(**note_args)) + getattr(self, self._results_handling_function)( + self._create_note(note_args) + ) def _handle_object_analyst_data( self, misp_object: Union[MISPObject, dict], object_id: str): @@ -799,7 +801,7 @@ def _parse_annotation_object( values[0] if isinstance(values, list) and len(values) == 1 else values ) - self._append_SDO(Note(**note_args)) + self._append_SDO(self._create_note(note_args)) self._handle_object_analyst_data(misp_object, note_id) def _parse_asn_object_observable( @@ -1750,6 +1752,12 @@ def _create_malware(malware_args: dict) -> Malware: malware_args['is_family'] = False return Malware(**malware_args) + @staticmethod + def _create_note(note_args: dict) -> Note: + if any(ref.startswith('x-misp-') for ref in note_args['object_refs']): + note_args['allow_custom'] = True + return Note(**note_args) + def _create_observed_data(self, args: dict, observables: list): args['object_refs'] = [observable.id for observable in observables] getattr(self, self._results_handling_function)(ObservedData(**args)) diff --git a/misp_stix_converter/misp_stix_converter.py b/misp_stix_converter/misp_stix_converter.py index 7f968d4..08918c7 100644 --- a/misp_stix_converter/misp_stix_converter.py +++ b/misp_stix_converter/misp_stix_converter.py @@ -677,7 +677,8 @@ def stix_2_to_misp(filename: _files_type, output_name: Optional[_files_type]=None, producer: Optional[str] = None, sharing_group_id: Optional[int] = None, - single_event: Optional[bool] = False) -> dict: + single_event: Optional[bool] = False, + title: Optional[str] = None) -> dict: if isinstance(filename, str): filename = Path(filename).resolve() try: @@ -686,7 +687,7 @@ def stix_2_to_misp(filename: _files_type, return {'errors': [f'{filename} - {error.__str__()}']} parser, args = _get_stix2_parser( _from_misp(bundle.objects), distribution, sharing_group_id, - producer, galaxies_as_tags, organisation_uuid, + title, producer, galaxies_as_tags, organisation_uuid, cluster_distribution, cluster_sharing_group_id ) stix_parser = parser(*args) @@ -720,7 +721,8 @@ def stix2_to_misp_instance( organisation_uuid: Optional[str] = MISP_org_uuid, producer: Optional[str] = None, sharing_group_id: Optional[int] = None, - single_event: Optional[bool] = False) -> dict: + single_event: Optional[bool] = False, + title: Optional[str] = None) -> dict: if isinstance(filename, str): filename = Path(filename).resolve() try: @@ -729,7 +731,7 @@ def stix2_to_misp_instance( return {'errors': [f'{filename} - {error.__str__()}']} parser, args = _get_stix2_parser( _from_misp(bundle.objects), distribution, sharing_group_id, - producer, galaxies_as_tags, organisation_uuid, + title, producer, galaxies_as_tags, organisation_uuid, cluster_distribution, cluster_sharing_group_id ) stix_parser = parser(*args) @@ -1104,7 +1106,7 @@ def _process_stix_to_misp_files(args) -> dict: galaxies_as_tags=args.galaxies_as_tags, output_dir=args.output_dir, organisation_uuid=args.org_uuid, output_name=args.output_name, producer=args.producer, sharing_group_id=args.sharing_group, - single_event=args.single_event + single_event=args.single_event, title=args.title ) if traceback.pop('success', 0) == 1: success.extend(traceback.pop('results')) @@ -1141,7 +1143,8 @@ def _process_stix_to_misp_instance(misp: PyMISP, args) -> dict: debug=args.debug, distribution=args.distribution, galaxies_as_tags=args.galaxies_as_tags, organisation_uuid=args.org_uuid, producer=args.producer, - sharing_group_id=args.sharing_group, single_event=args.single_event + sharing_group_id=args.sharing_group, + single_event=args.single_event, title=args.title ) if traceback.pop('success', 0) == 1: success.extend(traceback.pop('results')) diff --git a/misp_stix_converter/stix2misp/external_stix2_to_misp.py b/misp_stix_converter/stix2misp/external_stix2_to_misp.py index 5625b1e..7d238d8 100644 --- a/misp_stix_converter/stix2misp/external_stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/external_stix2_to_misp.py @@ -22,13 +22,14 @@ class ExternalSTIX2toMISPParser(STIX2toMISPParser): def __init__(self, distribution: Optional[int] = 0, sharing_group_id: Optional[int] = None, + title: Optional[str] = None, producer: Optional[str] = None, galaxies_as_tags: Optional[bool] = False, organisation_uuid: Optional[str] = MISP_org_uuid, cluster_distribution: Optional[int] = 0, cluster_sharing_group_id: Optional[int] = None): super().__init__( - distribution, sharing_group_id, producer, galaxies_as_tags + distribution, sharing_group_id, title, producer, galaxies_as_tags ) self._set_cluster_distribution( self._sanitise_distribution(cluster_distribution), diff --git a/misp_stix_converter/stix2misp/importparser.py b/misp_stix_converter/stix2misp/importparser.py index cd86d92..31ddaf6 100644 --- a/misp_stix_converter/stix2misp/importparser.py +++ b/misp_stix_converter/stix2misp/importparser.py @@ -72,7 +72,8 @@ def _load_json_file(path): class STIXtoMISPParser(metaclass=ABCMeta): def __init__(self, distribution: int, sharing_group_id: Union[int, None], - producer: Union[str, None], galaxies_as_tags: bool): + title: Union[str, None], producer: Union[str, None], + galaxies_as_tags: bool): self._identifier: str self.__relationship_types: dict @@ -83,6 +84,7 @@ def __init__(self, distribution: int, sharing_group_id: Union[int, None], self.__sharing_group_id = self._sanitise_sharing_group_id( sharing_group_id ) + self.__title = title self.__producer = producer self.__galaxies_as_tags = self._sanitise_galaxies_as_tags( galaxies_as_tags @@ -133,6 +135,10 @@ def _sanitise_sharing_group_id( def distribution(self) -> int: return self.__distribution + @property + def event_title(self) -> Union[str, None]: + return self.__title + @property def errors(self) -> dict: return self.__errors diff --git a/misp_stix_converter/stix2misp/internal_stix2_to_misp.py b/misp_stix_converter/stix2misp/internal_stix2_to_misp.py index 869301c..d4247c2 100644 --- a/misp_stix_converter/stix2misp/internal_stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/internal_stix2_to_misp.py @@ -28,10 +28,11 @@ class InternalSTIX2toMISPParser(STIX2toMISPParser): def __init__(self, distribution: Optional[int] = 0, sharing_group_id: Optional[int] = None, + title: Optional[str] = None, producer: Optional[str] = None, galaxies_as_tags: Optional[bool] = False): super().__init__( - distribution, sharing_group_id, producer, galaxies_as_tags + distribution, sharing_group_id, title, producer, galaxies_as_tags ) self._mapping = InternalSTIX2toMISPMapping # parsers diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index 9b87c69..2d7f497 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -157,9 +157,6 @@ _MARKING_DEFINITION_TYPING = Union[ MarkingDefinition_v20, MarkingDefinition_v21 ] -_MISP_FEATURES_TYPING = Union[ - MISPAttribute, MISPEvent, MISPObject -] _OBSERVED_DATA_PARSER_TYPING = Union[ ExternalSTIX2ObservedDataConverter, InternalSTIX2ObservedDataConverter ] @@ -206,9 +203,10 @@ class STIX2toMISPParser(STIXtoMISPParser, metaclass=ABCMeta): def __init__(self, distribution: int, sharing_group_id: Union[int, None], - producer: Union[str, None], galaxies_as_tags: bool): + title: Union[str, None], producer: Union[str, None], + galaxies_as_tags: bool): super().__init__( - distribution, sharing_group_id, producer, galaxies_as_tags + distribution, sharing_group_id, title, producer, galaxies_as_tags ) self._creators: set = set() self._mapping: Union[ @@ -319,10 +317,12 @@ def event_tags(self) -> list: @property def generic_info_field(self) -> str: - return ( - f'STIX {self.stix_version} Bundle ({self._identifier}) ' - 'imported with the MISP-STIX import feature.' - ) + message = f'STIX {self.stix_version} Bundle ({self._identifier})' + if self.event_title is not None: + message = f'{self.event_title} {message}' + if self.producer is not None: + message += f' produced by {self.producer}' + return f'{message} and converted with the MISP-STIX import feature.' @property def identity_parser(self) -> _IDENTITY_PARSER_TYPING: @@ -1154,7 +1154,7 @@ def _create_misp_event( misp_event = MISPEvent(force_timestamps=True) self._sanitise_object_uuid(misp_event, stix_object.id) event_args = { - 'info': getattr(stix_object, 'name', self.generic_info_field), + 'info': self._generate_info_field(stix_object), 'distribution': self.distribution, 'timestamp': self._timestamp_from_date(stix_object.modified) } @@ -1202,6 +1202,14 @@ def _handle_tags_from_stix_fields(self, stix_object: _SDO_TYPING): def _extract_uuid(object_id: str) -> str: return object_id.split('--')[-1] + def _generate_info_field(self, stix_object: _GROUPING_REPORT_TYPING) -> str: + if hasattr(stix_object, 'name'): + title = stix_object.name + if self.event_title is not None: + title = f'{self.event_title} {title}' + return title + return self.generic_info_field + def _handle_creator(self, reference: str) -> str: if reference in getattr(self, '_identity', {}): return self._identity[reference].name