From 56139aa17b8703c0c17eca280f8c20faa5bc0d03 Mon Sep 17 00:00:00 2001 From: taloric Date: Wed, 26 Jun 2024 10:32:00 +0800 Subject: [PATCH] fix: link grpc frame by request_id --- app/app/application/l7_flow_tracing.py | 66 ++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index ca04158..18b3211 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -1180,6 +1180,19 @@ def set_parent_relation(self): # 这里做一个简单的处理,当相邻两个 Span 都是同类 SYS Span 时不要按照 TCP Seq 来设置他们的 Parent 关系。 continue else: + # if self.spans[i].parent has parent, possibly it's c-p attach to s-p in `try_attach_client_sys_span_via_sys_span` + # usually, c-p in [0] index and will not try to attach parent here + # but in grpc _RESPONSE_X mode, sort order would reverse and make c-p not in [0] index + # for those scenarios, prioritize tcp_seq connection, and clean c-p's index in s-p's childs + + # 如果 self.spans[i] 已有 parent,很大概率是 c-p 在 `try_attach_client_sys_span_via_sys_span` 过程中关联上了 s-p + # 通常情况下,c-p 一般在[0]索引,不会在这里尝试关联 parent + # 而目前在 grpc _RESPONSE_X 模式下,会反转顺序,让 c-p 排序在末端,导致在这里尝试再关联 parent + # 对此类情况,认为 tcp_seq 关联优先级更高,允许关联,并清理 s-p childs 中的 c-p,否则结果中会重复 + if self.spans[i].parent is not None: + self.spans[i].parent.flow['childs'].remove( + self.spans[i].get_flow_index()) + self.spans[i].set_parent(self.spans[i - 1], "trace mounted due to tcp_seq") @@ -1188,6 +1201,7 @@ def _sort_network_spans(self): 对网络span进行排序,排序规则: 1. 按照TAP_SIDE_RANKS进行排序 2. 按采集器分组排序,与入口 span 同一个采集器的前移,出口 span 同一个采集器的后移,组内按 start_time 排序 + 通常情况下 client-side 是 ingress, server-side 是 egress """ sorted_spans = sorted( self.spans, @@ -1211,11 +1225,31 @@ def _sort_network_spans(self): egress_agent = sorted_spans[i].flow['vtap_id'] break + # sort rank for ingress & egress agent + ingress_rank = 0 # up for ingress + egreass_rank = 2 # down for egress + # `flow_field_conflict` confirm `l7_protocol` and `request_type` are same in a network_span_set, so get first is enough + # `flow_field_conflict` 确保了 `l7_protocol` `request_type` 在同一个 network_span_set 中一定相等,取首个即可 + if len(sorted_spans) > 0 and sorted_spans[0].flow['l7_protocol'] in [ + L7_PROTOCOL_GRPC, L7_PROTOCOL_HTTP2 + ]: + # in `grpc` protocol, _HEADER and _DATA frame is unidirectional flow which identified as `session` + # but in fact, when 'req_tcp_seq'=0, it's a 'response', from server-side to client-side, request_type=_RESPONSE_DATA/HEADER + # so we need to reverse ingress and egress here to re-sort network spans + + # 在 grpc 中, _HEADER 和 _DATA frame 是被标记为 session 的单向流 + # 但实际上,如果 req_tcp_seq=0,说明这其实是一个 response,方向为 server-side -> client-side,request_type=_RESPONSE_DATA/HEADER + # 对此类情况,应要反转 ingress 和 egress 排序 + if not sorted_spans[0].flow['req_tcp_seq'] and \ + sorted_spans[0].flow['type'] == L7_FLOW_TYPE_SESSION: + ingress_rank = 2 + egress_agent = 0 + for i in range(len(sorted_spans)): if sorted_spans[i].flow['vtap_id'] == ingress_agent: - sorted_spans[i].flow['agent_rank'] = 0 + sorted_spans[i].flow['agent_rank'] = ingress_rank elif sorted_spans[i].flow['vtap_id'] == egress_agent: - sorted_spans[i].flow['agent_rank'] = 2 + sorted_spans[i].flow['agent_rank'] = egreass_rank else: sorted_spans[i].flow['agent_rank'] = 1 @@ -1300,6 +1334,9 @@ def get_req_tcp_seq(self) -> int: def get_resp_tcp_seq(self) -> int: return self.flow.get('resp_tcp_seq', 0) + def get_request_id(self) -> int: + return self.flow.get('request_id', 0) + def time_range_cover(self, other_sys_span: 'SpanNode') -> bool: return self.flow['start_time_us'] <= other_sys_span.flow[ 'start_time_us'] and self.flow[ @@ -2222,6 +2259,8 @@ def _connect_process_and_networks(process_roots: List[SpanNode], ps_child_span_id = ps_child.get_span_id() ps_child_parent_span_id = ps_child.get_parent_span_id() for net_parent in network_leafs: + if ps_child.get_parent_id() >= 0: + continue if _same_span_set(ps_child, net_parent, 'network_span_set') \ or _same_span_set(ps_child, net_parent, 'process_span_set'): continue @@ -2253,6 +2292,8 @@ def _connect_process_and_networks(process_roots: List[SpanNode], ps_child_span_id = ps_child.get_span_id() ps_child_parent_span_id = ps_child.get_parent_span_id() for ps_parent in process_leafs: + if ps_child.get_parent_id() >= 0: + continue if _same_span_set(ps_child, ps_parent, 'network_span_set') \ or _same_span_set(ps_child, ps_parent, 'process_span_set'): continue @@ -2304,10 +2345,14 @@ def _connect_process_and_networks(process_roots: List[SpanNode], continue net_child_index = net_child.get_flow_index() net_child_span_id = net_child.get_span_id() - net_child_x_request_id_0 = net_child.get_x_request_id_0 + net_child_x_request_id_0 = net_child.get_x_request_id_0() net_child_x_request_id_1 = net_child.get_x_request_id_1() + net_child_request_id = net_child.get_request_id() + net_child_l7_protocol = net_child.flow['l7_protocol'] net_child_response_duration = net_child.flow['response_duration'] for net_parent in network_leafs: + if net_child.get_parent_id() >= 0: + continue if _same_span_set(net_child, net_parent, 'network_span_set') \ or _same_span_set(net_child, net_parent, 'process_span_set'): continue @@ -2324,7 +2369,6 @@ def _connect_process_and_networks(process_roots: List[SpanNode], # net_child, net_parent) # process_span_list.append(fake_pss) # flows.extend(fake_pss.spans) - elif (net_child_x_request_id_0 and net_child_x_request_id_0 == net_parent.get_x_request_id_0()) \ or (net_child_x_request_id_1 and net_child_x_request_id_1 == net_parent.get_x_request_id_1()) \ or (net_child_span_id and net_child_span_id == net_parent.get_span_id()): @@ -2348,6 +2392,20 @@ def _connect_process_and_networks(process_roots: List[SpanNode], network_match_parent[ net_child_index] = net_parent.get_flow_index() + elif net_child_request_id and net_child_request_id == net_parent.get_request_id() \ + and net_child_l7_protocol in [L7_PROTOCOL_HTTP2, L7_PROTOCOL_GRPC] \ + and net_child_l7_protocol == net_parent.flow['l7_protocol'] \ + and net_child_response_duration <= net_parent.flow['response_duration']: + # grpc protocol: request_id get from `stream_id`, means different network_span_set share same stream, it should be connected + # but other protocol may re-use request_id, so only support grpc now + # net_child.response_duration <= net_parent.response_duration for case both duration is 0 + # grpc 的 request_id 来源于 `stream_id`, 意味着不同的 network_span_set 在同一个 stream 里,应被连接 + # 但其他协议的 request_id 有可能短时内被多次重用,容易误连接,比如 MySQL 的 StatementID,所以目前仅支持 grpc + # net_child.response_duration <= net_parent.response_duration 用于双方时延为0 的情况 + # ref: https://www.deepflow.io/docs/zh/features/l7-protocols/http/#http2 + net_child.set_parent( + net_parent, "net_span mounted due to grpc request_id") + for child, parent in network_match_parent.items(): # FIXME: 生成一个 pseudo net span,待前端修改后再开放此代码,注意处理时延计算 # fake_pss = _generate_pseudo_process_span_set(flow_index_to_span[child],