From 56704d9a543f858652a3a35a1e94d6fa4cc8ae68 Mon Sep 17 00:00:00 2001 From: nmred Date: Fri, 5 May 2017 10:08:09 +0000 Subject: [PATCH] add Administrative API --- .php_cs.cache | 2 +- example/protocol/DescribeGroups.php | 117 ++++++++++ example/protocol/LeaveGroup.php | 118 ++++++++++ example/protocol/ListGroup.php | 116 ++++++++++ src/Kafka/Protocol.php | 6 +- src/Kafka/Protocol/DescribeGroups.php | 201 ++++++++++++++++++ src/Kafka/Protocol/LeaveGroup.php | 79 +++++++ src/Kafka/Protocol/ListGroup.php | 99 +++++++++ .../KafkaTest/Protocol/DescribeGroupsTest.php | 133 ++++++++++++ tests/KafkaTest/Protocol/LeaveGroupTest.php | 136 ++++++++++++ tests/KafkaTest/Protocol/ListGroupTest.php | 96 +++++++++ tests/run-tests.php | 6 +- 12 files changed, 1102 insertions(+), 7 deletions(-) create mode 100644 example/protocol/DescribeGroups.php create mode 100644 example/protocol/LeaveGroup.php create mode 100644 example/protocol/ListGroup.php create mode 100644 src/Kafka/Protocol/DescribeGroups.php create mode 100644 src/Kafka/Protocol/LeaveGroup.php create mode 100644 src/Kafka/Protocol/ListGroup.php create mode 100644 tests/KafkaTest/Protocol/DescribeGroupsTest.php create mode 100644 tests/KafkaTest/Protocol/LeaveGroupTest.php create mode 100644 tests/KafkaTest/Protocol/ListGroupTest.php diff --git a/.php_cs.cache b/.php_cs.cache index bed2b7d7..154e568f 100644 --- a/.php_cs.cache +++ b/.php_cs.cache @@ -1 +1 @@ -{"php":"5.5.26","version":"2.1.0","rules":{"encoding":true,"full_opening_tag":true,"blank_line_after_namespace":true,"braces":true,"class_definition":true,"elseif":true,"function_declaration":true,"indentation_type":true,"line_ending":true,"lowercase_constants":true,"lowercase_keywords":true,"method_argument_space":true,"no_closing_tag":true,"no_spaces_after_function_name":true,"no_spaces_inside_parenthesis":true,"no_trailing_whitespace":true,"no_trailing_whitespace_in_comment":true,"single_blank_line_at_eof":true,"single_class_element_per_statement":["property"],"single_import_per_statement":true,"single_line_after_imports":true,"switch_case_semicolon_to_colon":true,"switch_case_space":true,"visibility_required":true},"hashes":{"src\/Kafka\/Exception.php":1887794156,"src\/Kafka\/Exception\/NotSupported.php":2083554785,"src\/Kafka\/Exception\/OutOfRange.php":613717588,"src\/Kafka\/Exception\/Protocol.php":2787269416,"src\/Kafka\/Exception\/Socket.php":2743402340,"src\/Kafka\/Exception\/SocketConnect.php":2883015980,"src\/Kafka\/Exception\/SocketEOF.php":1195314501,"src\/Kafka\/Exception\/SocketTimeout.php":1079612001,"src\/Kafka\/Exception\/Config.php":2153825859,"src\/Kafka\/Protocol\/CommitOffset.php":3711823135,"src\/Kafka\/Protocol\/Fetch.php":4166817029,"src\/Kafka\/Protocol\/FetchOffset.php":2615769477,"src\/Kafka\/Protocol\/GroupCoordinator.php":3030713974,"src\/Kafka\/Protocol\/Heartbeat.php":3269486857,"src\/Kafka\/Protocol\/JoinGroup.php":332592611,"src\/Kafka\/Protocol\/Metadata.php":1609652106,"src\/Kafka\/Protocol\/Offset.php":3262177831,"src\/Kafka\/Protocol\/Produce.php":1894065123,"src\/Kafka\/Protocol\/Protocol.php":476565724,"src\/Kafka\/Protocol\/SyncGroup.php":3584049845,"src\/Kafka\/Protocol.php":87023264,"src\/Kafka\/SingletonTrait.php":3424400384,"src\/Kafka\/Socket.php":1512067958,"src\/Kafka\/ConsumerConfig.php":2320744655,"src\/Kafka\/ProducerConfig.php":1925922788,"src\/Kafka\/Config.php":913412482,"src\/Kafka\/Broker.php":598094486,"src\/Kafka\/Consumer.php":674526766,"src\/Kafka\/Consumer\/Assignment.php":626810662,"src\/Kafka\/Consumer\/Process.php":511720065,"src\/Kafka\/Consumer\/State.php":1179310365,"src\/Kafka\/LoggerTrait.php":1234965748,"src\/Kafka\/Producer.php":2558655402,"src\/Kafka\/Producer\/State.php":3349306988,"src\/Kafka\/Producer\/Process.php":3384630694,"tests\/KafkaMock\/Protocol\/Encoder.php":1383970351,"tests\/KafkaTest\/ClientTest.php":347003966,"tests\/KafkaTest\/Produce\/ProduceTest.php":868593207,"tests\/KafkaTest\/Protocol\/DecoderTest.php":1953905898,"tests\/KafkaTest\/Protocol\/EncoderTest.php":118406869,"tests\/TestConfiguration.php":1872544484,"tests\/_autoload.php":2725147420,"tests\/run-tests.php":3681452361,"tests\/Bootstrap.php":3397345411,"tests\/KafkaTest\/Protocol\/GroupCoordinatorTest.php":3938006454,"tests\/KafkaTest\/Base\/ProtocolTest.php":3926873193,"tests\/KafkaTest\/Base\/BrokerTest.php":3957314909,"tests\/KafkaTest\/Base\/ConsumerConfigTest.php":901088941,"tests\/KafkaTest\/Base\/ProducerConfigTest.php":1821493122,"tests\/KafkaTest\/Protocol\/ProduceTest.php":3670305798,"tests\/KafkaTest\/Protocol\/OffsetTest.php":3799352786,"tests\/KafkaTest\/Protocol\/FetchTest.php":2392957613,"tests\/KafkaTest\/Protocol\/CommitOffsetTest.php":3999089971,"tests\/KafkaTest\/Protocol\/SyncGroupTest.php":1695142498,"tests\/KafkaTest\/Protocol\/HeartbeatTest.php":3012473440,"tests\/KafkaTest\/Protocol\/FetchOffsetTest.php":1945252662,"tests\/KafkaTest\/Protocol\/MetadataTest.php":1723320331,"tests\/KafkaTest\/Protocol\/JoinGroupTest.php":3735777277}} \ No newline at end of file +{"php":"5.5.26","version":"2.1.0","rules":{"encoding":true,"full_opening_tag":true,"blank_line_after_namespace":true,"braces":true,"class_definition":true,"elseif":true,"function_declaration":true,"indentation_type":true,"line_ending":true,"lowercase_constants":true,"lowercase_keywords":true,"method_argument_space":true,"no_closing_tag":true,"no_spaces_after_function_name":true,"no_spaces_inside_parenthesis":true,"no_trailing_whitespace":true,"no_trailing_whitespace_in_comment":true,"single_blank_line_at_eof":true,"single_class_element_per_statement":["property"],"single_import_per_statement":true,"single_line_after_imports":true,"switch_case_semicolon_to_colon":true,"switch_case_space":true,"visibility_required":true},"hashes":{"src\/Kafka\/Exception.php":1887794156,"src\/Kafka\/Exception\/NotSupported.php":2083554785,"src\/Kafka\/Exception\/OutOfRange.php":613717588,"src\/Kafka\/Exception\/Protocol.php":2787269416,"src\/Kafka\/Exception\/Socket.php":2743402340,"src\/Kafka\/Exception\/SocketConnect.php":2883015980,"src\/Kafka\/Exception\/SocketEOF.php":1195314501,"src\/Kafka\/Exception\/SocketTimeout.php":1079612001,"src\/Kafka\/Exception\/Config.php":2153825859,"src\/Kafka\/Protocol\/CommitOffset.php":3711823135,"src\/Kafka\/Protocol\/Fetch.php":4166817029,"src\/Kafka\/Protocol\/FetchOffset.php":2615769477,"src\/Kafka\/Protocol\/GroupCoordinator.php":3030713974,"src\/Kafka\/Protocol\/Heartbeat.php":3269486857,"src\/Kafka\/Protocol\/JoinGroup.php":332592611,"src\/Kafka\/Protocol\/Metadata.php":1609652106,"src\/Kafka\/Protocol\/Offset.php":3262177831,"src\/Kafka\/Protocol\/Produce.php":1894065123,"src\/Kafka\/Protocol\/Protocol.php":476565724,"src\/Kafka\/Protocol\/SyncGroup.php":3584049845,"src\/Kafka\/Protocol.php":338507625,"src\/Kafka\/SingletonTrait.php":3424400384,"src\/Kafka\/Socket.php":1512067958,"src\/Kafka\/ConsumerConfig.php":2320744655,"src\/Kafka\/ProducerConfig.php":1925922788,"src\/Kafka\/Config.php":913412482,"src\/Kafka\/Broker.php":598094486,"src\/Kafka\/Consumer.php":674526766,"src\/Kafka\/Consumer\/Assignment.php":626810662,"src\/Kafka\/Consumer\/Process.php":511720065,"src\/Kafka\/Consumer\/State.php":1179310365,"src\/Kafka\/LoggerTrait.php":1234965748,"src\/Kafka\/Producer.php":2558655402,"src\/Kafka\/Producer\/State.php":3349306988,"src\/Kafka\/Producer\/Process.php":3384630694,"tests\/KafkaMock\/Protocol\/Encoder.php":1383970351,"tests\/KafkaTest\/ClientTest.php":347003966,"tests\/KafkaTest\/Produce\/ProduceTest.php":868593207,"tests\/KafkaTest\/Protocol\/DecoderTest.php":1953905898,"tests\/KafkaTest\/Protocol\/EncoderTest.php":118406869,"tests\/TestConfiguration.php":1872544484,"tests\/_autoload.php":2725147420,"tests\/run-tests.php":4228663547,"tests\/Bootstrap.php":3397345411,"tests\/KafkaTest\/Protocol\/GroupCoordinatorTest.php":3938006454,"tests\/KafkaTest\/Base\/ProtocolTest.php":3926873193,"tests\/KafkaTest\/Base\/BrokerTest.php":3957314909,"tests\/KafkaTest\/Base\/ConsumerConfigTest.php":901088941,"tests\/KafkaTest\/Base\/ProducerConfigTest.php":1821493122,"tests\/KafkaTest\/Protocol\/ProduceTest.php":3670305798,"tests\/KafkaTest\/Protocol\/OffsetTest.php":3799352786,"tests\/KafkaTest\/Protocol\/FetchTest.php":2392957613,"tests\/KafkaTest\/Protocol\/CommitOffsetTest.php":3999089971,"tests\/KafkaTest\/Protocol\/SyncGroupTest.php":1695142498,"tests\/KafkaTest\/Protocol\/HeartbeatTest.php":3012473440,"tests\/KafkaTest\/Protocol\/FetchOffsetTest.php":1945252662,"tests\/KafkaTest\/Protocol\/MetadataTest.php":1723320331,"tests\/KafkaTest\/Protocol\/JoinGroupTest.php":3735777277,"src\/Kafka\/Protocol\/LeaveGroup.php":3263763308,"src\/Kafka\/Protocol\/ListGroup.php":3673950913,"src\/Kafka\/Protocol\/DescribeGroups.php":1369242879,"tests\/KafkaTest\/Protocol\/LeaveGroupTest.php":1567634470,"tests\/KafkaTest\/Protocol\/DescribeGroupsTest.php":759837642,"tests\/KafkaTest\/Protocol\/ListGroupTest.php":3375737744}} \ No newline at end of file diff --git a/example/protocol/DescribeGroups.php b/example/protocol/DescribeGroups.php new file mode 100644 index 00000000..29a83e64 --- /dev/null +++ b/example/protocol/DescribeGroups.php @@ -0,0 +1,117 @@ + 'test', + 'session_timeout' => 6000, + 'rebalance_timeout' => 6000, + 'member_id' => '', + 'data' => array( + array( + 'protocol_name' => 'group', + 'version' => 0, + 'subscription' => array('test'), + 'user_data' => '', + ), + ), + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::JOIN_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4)); + $this->group = $result; + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // {{{ protected function syncGroup() + + protected function syncGroup() { + $this->joinGroup(); + $data = array( + 'group_id' => 'test', + 'generation_id' => $this->group['generationId'], + 'member_id' => $this->group['memberId'], + 'data' => array( + array( + 'version' => 0, + 'member_id' => $this->group['memberId'], + 'assignments' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + 0 + ), + ), + ), + ), + ), + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::SYNC_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4)); + //echo json_encode($result); + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // {{{ public function run() + + public function run() { + $this->joinGroup(); + $this->syncGroup(); + $data = array( + 'test' + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::DESCRIBE_GROUPS_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::DESCRIBE_GROUPS_REQUEST, substr($data, 4)); + echo json_encode($result); + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // }}} +} + +$describe = new DescribeGroups(); +$describe->run(); + diff --git a/example/protocol/LeaveGroup.php b/example/protocol/LeaveGroup.php new file mode 100644 index 00000000..de7801a9 --- /dev/null +++ b/example/protocol/LeaveGroup.php @@ -0,0 +1,118 @@ + 'test', + 'session_timeout' => 6000, + 'rebalance_timeout' => 6000, + 'member_id' => '', + 'data' => array( + array( + 'protocol_name' => 'group', + 'version' => 0, + 'subscription' => array('test'), + 'user_data' => '', + ), + ), + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::JOIN_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4)); + $this->group = $result; + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // {{{ protected function syncGroup() + + protected function syncGroup() { + $this->joinGroup(); + $data = array( + 'group_id' => 'test', + 'generation_id' => $this->group['generationId'], + 'member_id' => $this->group['memberId'], + 'data' => array( + array( + 'version' => 0, + 'member_id' => $this->group['memberId'], + 'assignments' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + 0 + ), + ), + ), + ), + ), + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::SYNC_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4)); + //echo json_encode($result); + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // {{{ public function run() + + public function run() { + $this->joinGroup(); + $this->syncGroup(); + $data = array( + 'group_id' => 'test', + 'member_id' => $this->group['memberId'], + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::LEAVE_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::LEAVE_GROUP_REQUEST, substr($data, 4)); + echo json_encode($result); + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // }}} +} + +$leave = new LeaveGroup(); +$leave->run(); + diff --git a/example/protocol/ListGroup.php b/example/protocol/ListGroup.php new file mode 100644 index 00000000..829a4dd6 --- /dev/null +++ b/example/protocol/ListGroup.php @@ -0,0 +1,116 @@ + 'test', + 'session_timeout' => 6000, + 'rebalance_timeout' => 6000, + 'member_id' => '', + 'data' => array( + array( + 'protocol_name' => 'group', + 'version' => 0, + 'subscription' => array('test'), + 'user_data' => '', + ), + ), + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::JOIN_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4)); + $this->group = $result; + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // {{{ protected function syncGroup() + + protected function syncGroup() { + $this->joinGroup(); + $data = array( + 'group_id' => 'test', + 'generation_id' => $this->group['generationId'], + 'member_id' => $this->group['memberId'], + 'data' => array( + array( + 'version' => 0, + 'member_id' => $this->group['memberId'], + 'assignments' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + 0 + ), + ), + ), + ), + ), + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::SYNC_GROUP_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4)); + //echo json_encode($result); + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // {{{ public function run() + + public function run() { + $this->joinGroup(); + $this->syncGroup(); + $data = array( + ); + + $protocol = \Kafka\Protocol::init('0.9.1.0'); + $requestData = \Kafka\Protocol::encode(\Kafka\Protocol::LIST_GROUPS_REQUEST, $data); + + $socket = new \Kafka\Socket('127.0.0.1', '9192'); + $socket->SetonReadable(function($data) { + $coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4)); + $result = \Kafka\Protocol::decode(\Kafka\Protocol::LIST_GROUPS_REQUEST, substr($data, 4)); + echo json_encode($result); + Amp\stop(); + }); + + $socket->connect(); + $socket->write($requestData); + Amp\run(function () use ($socket, $requestData) { + }); + } + + // }}} + // }}} +} + +$list = new ListGroup(); +$list->run(); + diff --git a/src/Kafka/Protocol.php b/src/Kafka/Protocol.php index 96cda55b..9fa5706a 100644 --- a/src/Kafka/Protocol.php +++ b/src/Kafka/Protocol.php @@ -103,10 +103,10 @@ public static function init($version, $logger = null) \Kafka\Protocol\Protocol::GROUP_COORDINATOR_REQUEST => 'GroupCoordinator', \Kafka\Protocol\Protocol::JOIN_GROUP_REQUEST => 'JoinGroup', \Kafka\Protocol\Protocol::HEART_BEAT_REQUEST => 'Heartbeat', - //\Kafka\Protocol\Protocol::LEAVE_GROUP_REQUEST => '', + \Kafka\Protocol\Protocol::LEAVE_GROUP_REQUEST => 'LeaveGroup', \Kafka\Protocol\Protocol::SYNC_GROUP_REQUEST => 'SyncGroup', - //\Kafka\Protocol\Protocol::DESCRIBE_GROUPS_REQUEST => '', - //\Kafka\Protocol\Protocol::LIST_GROUPS_REQUEST => '', + \Kafka\Protocol\Protocol::DESCRIBE_GROUPS_REQUEST => 'DescribeGroups', + \Kafka\Protocol\Protocol::LIST_GROUPS_REQUEST => 'ListGroup', ); $namespace = '\\Kafka\\Protocol\\'; diff --git a/src/Kafka/Protocol/DescribeGroups.php b/src/Kafka/Protocol/DescribeGroups.php new file mode 100644 index 00000000..e44814a1 --- /dev/null +++ b/src/Kafka/Protocol/DescribeGroups.php @@ -0,0 +1,201 @@ +requestHeader('kafka-php', self::DESCRIBE_GROUPS_REQUEST, self::DESCRIBE_GROUPS_REQUEST); + $data = self::encodeArray($payloads, array($this, 'encodeString'), self::PACK_INT16); + + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ public function decode() + + /** + * decode group response + * + * @access public + * @return array + */ + public function decode($data) + { + $offset = 0; + $groups = $this->decodeArray(substr($data, $offset), array($this, 'describeGroup')); + $offset += $groups['length']; + + return $groups['data']; + } + + // }}} + // {{{ protected function describeGroup() + + /** + * decode describe group response + * + * @access protected + * @return array + */ + protected function describeGroup($data) + { + $offset = 0; + $errorCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2)); + $offset += 2; + $groupId = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $groupId['length']; + $state = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $state['length']; + $protocolType = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $protocolType['length']; + $protocol = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $protocol['length']; + + $members = $this->decodeArray(substr($data, $offset), array($this, 'describeMember')); + $offset += $members['length']; + + return array( + 'length' => $offset, + 'data' => array( + 'errorCode' => $errorCode, + 'groupId' => $groupId['data'], + 'state' => $state['data'], + 'protocolType' => $protocolType['data'], + 'protocol' => $protocol['data'], + 'members' => $members['data'] + ) + ); + } + + // }}} + // {{{ protected function describeMember() + + /** + * decode describe members response + * + * @access protected + * @return array + */ + protected function describeMember($data) + { + $offset = 0; + $memberId = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $memberId['length']; + $clientId = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $clientId['length']; + $clientHost = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $clientHost['length']; + $metadata = $this->decodeString(substr($data, $offset), self::BIT_B32); + $offset += $metadata['length']; + $assignment = $this->decodeString(substr($data, $offset), self::BIT_B32); + $offset += $assignment['length']; + + $memberAssignment = $assignment['data']; + $memberAssignmentOffset = 0; + $version = self::unpack(self::BIT_B16_SIGNED, substr($memberAssignment, $memberAssignmentOffset, 2)); + $memberAssignmentOffset += 2; + $partitionAssignments = $this->decodeArray(substr($memberAssignment, $memberAssignmentOffset), + array($this, 'describeResponsePartition')); + $memberAssignmentOffset += $partitionAssignments['length']; + $userData = $this->decodeString(substr($memberAssignment, $memberAssignmentOffset), self::BIT_B32); + + $metaData = $metadata['data']; + $metaOffset = 0; + $version = self::unpack(self::BIT_B16, substr($metaData, $metaOffset, 2)); + $metaOffset += 2; + $topics = $this->decodeArray(substr($metaData, $metaOffset), array($this, 'decodeString'), self::BIT_B16); + $metaOffset += $topics['length']; + $metaUserData = $this->decodeString(substr($metaData, $metaOffset), self::BIT_B32); + + + return array( + 'length' => $offset, + 'data' => array( + 'memberId' => $memberId['data'], + 'clientId' => $clientId['data'], + 'clientHost' => $clientHost['data'], + 'metadata' => array( + 'version' => $version, + 'topics' => $topics['data'], + 'userData' => $metaUserData['data'], + ), + 'assignment' => array( + 'version' => $version, + 'partitions' => $partitionAssignments['data'], + 'userData' => $userData['data'] + ) + ) + ); + } + + // }}} + // {{{ protected function describeResponsePartition() + + /** + * decode describe group partition response + * + * @access protected + * @return array + */ + protected function describeResponsePartition($data) + { + $offset = 0; + $topicName = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $topicName['length']; + $partitions = $this->decodePrimitiveArray(substr($data, $offset), self::BIT_B32); + $offset += $partitions['length']; + + return array( + 'length' => $offset, + 'data' => array( + 'topicName' => $topicName['data'], + 'partitions' => $partitions['data'], + ) + ); + } + + // }}} + // }}} +} diff --git a/src/Kafka/Protocol/LeaveGroup.php b/src/Kafka/Protocol/LeaveGroup.php new file mode 100644 index 00000000..c6add5cf --- /dev/null +++ b/src/Kafka/Protocol/LeaveGroup.php @@ -0,0 +1,79 @@ +requestHeader('kafka-php', self::LEAVE_GROUP_REQUEST, self::LEAVE_GROUP_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeString($payloads['member_id'], self::PACK_INT16); + + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ public function decode() + + /** + * decode group response + * + * @access public + * @return array + */ + public function decode($data) + { + $errorCode = self::unpack(self::BIT_B16_SIGNED, substr($data, 0, 2)); + + return array( + 'errorCode' => $errorCode, + ); + } + + // }}} + // }}} +} diff --git a/src/Kafka/Protocol/ListGroup.php b/src/Kafka/Protocol/ListGroup.php new file mode 100644 index 00000000..6f710375 --- /dev/null +++ b/src/Kafka/Protocol/ListGroup.php @@ -0,0 +1,99 @@ +requestHeader('kafka-php', self::LIST_GROUPS_REQUEST, self::LIST_GROUPS_REQUEST); + $data = self::encodeString($header, self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ public function decode() + + /** + * decode group response + * + * @access public + * @return array + */ + public function decode($data) + { + $offset = 0; + $errorCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2)); + $offset += 2; + $groups = $this->decodeArray(substr($data, $offset), array($this, 'listGroup')); + + return array( + 'errorCode' => $errorCode, + 'groups' => $groups['data'], + ); + } + + // }}} + // {{{ protected function listGroup() + + /** + * decode list group response + * + * @access protected + * @return array + */ + protected function listGroup($data) + { + $offset = 0; + $groupId = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $groupId['length']; + $protocolType = $this->decodeString(substr($data, $offset), self::BIT_B16); + $offset += $protocolType['length']; + + return array( + 'length' => $offset, + 'data' => array( + 'groupId' => $groupId['data'], + 'protocolType' => $protocolType['data'], + ) + ); + } + + // }}} + // }}} +} diff --git a/tests/KafkaTest/Protocol/DescribeGroupsTest.php b/tests/KafkaTest/Protocol/DescribeGroupsTest.php new file mode 100644 index 00000000..d829cb56 --- /dev/null +++ b/tests/KafkaTest/Protocol/DescribeGroupsTest.php @@ -0,0 +1,133 @@ +describe)) { + $this->describe = new \Kafka\Protocol\DescribeGroups('0.9.0.1'); + } + } + + // }}} + // {{{ public function testEncode() + + /** + * testEncode + * + * @access public + * @return void + */ + public function testEncode() + { + $data = array( + 'test' + ); + + $test = $this->describe->encode($data); + $this->assertEquals(\bin2hex($test), '0000001d000f00000000000f00096b61666b612d70687000000001000474657374'); + } + + // }}} + // {{{ public function testEncodeString() + + /** + * testEncodeString + * + * @access public + * @return void + */ + public function testEncodeString() + { + $data = 'test'; + + $test = $this->describe->encode($data); + $this->assertEquals(\bin2hex($test), '0000001d000f00000000000f00096b61666b612d70687000000001000474657374'); + } + + // }}} + // {{{ public function testEncodeEmptyArray() + + /** + * testEncodeEmptyArray + * + * @access public + * @return void + */ + public function testEncodeEmptyArray() + { + $data = array( + ); + + $test = $this->describe->encode($data); + $this->assertEquals(\bin2hex($test), '00000017000f00000000000f00096b61666b612d70687000000000'); + } + + // }}} + // {{{ public function testDecode() + + /** + * testDecode + * + * @access public + * @return void + */ + public function testDecode() + { + $data = '0000000100000004746573740006537461626c650008636f6e73756d6572000567726f757000000001002e6b61666b612d7068702d34646133393366622d333763662d343263632d393064642d37626636626133316664333000096b61666b612d706870000a2f3132372e302e302e31000000100000000000010004746573740000000000000018000000000001000474657374000000010000000000000000'; + $test = $this->describe->decode(\hex2bin($data)); + $result = '[{"errorCode":0,"groupId":"test","state":"Stable","protocolType":"consumer","protocol":"group","members":[{"memberId":"kafka-php-4da393fb-37cf-42cc-90dd-7bf6ba31fd30","clientId":"kafka-php","clientHost":"\/127.0.0.1","metadata":{"version":0,"topics":["test"],"userData":""},"assignment":{"version":0,"partitions":[{"topicName":"test","partitions":[0]}],"userData":""}}]}]'; + $this->assertEquals(json_encode($test), $result); + } + + // }}} + // }}} +} diff --git a/tests/KafkaTest/Protocol/LeaveGroupTest.php b/tests/KafkaTest/Protocol/LeaveGroupTest.php new file mode 100644 index 00000000..ab936b3a --- /dev/null +++ b/tests/KafkaTest/Protocol/LeaveGroupTest.php @@ -0,0 +1,136 @@ +leave)) { + $this->leave = new \Kafka\Protocol\LeaveGroup('0.9.0.1'); + } + } + + // }}} + // {{{ public function testEncode() + + /** + * testEncode + * + * @access public + * @return void + */ + public function testEncode() + { + $data = array( + 'group_id' => 'test', + 'member_id' => 'kafka-php-eb19c0ea-4b3e-4ed0-bada-c873951c8eea' + ); + + $test = $this->leave->encode($data); + $this->assertEquals(\bin2hex($test), '00000049000d00000000000d00096b61666b612d706870000474657374002e6b61666b612d7068702d65623139633065612d346233652d346564302d626164612d633837333935316338656561'); + } + + // }}} + // {{{ public function testEncodeNoGroupId() + + /** + * testEncodeNoGroupId + * + * @expectedException \Kafka\Exception\Protocol + * @expectedExceptionMessage given leave group data invalid. `group_id` is undefined. + * @access public + * @return void + */ + public function testEncodeNoGroupId() + { + $data = array(); + + $test = $this->leave->encode($data); + } + + // }}} + // {{{ public function testEncodeNoMemberId() + + /** + * testEncodeNoMemberId + * + * @expectedException \Kafka\Exception\Protocol + * @expectedExceptionMessage given leave group data invalid. `member_id` is undefined. + * @access public + * @return void + */ + public function testEncodeNoMemberId() + { + $data = array( + 'group_id' => 'test', + ); + + $test = $this->leave->encode($data); + } + + // }}} + // {{{ public function testDecode() + + /** + * testDecode + * + * @access public + * @return void + */ + public function testDecode() + { + $test = $this->leave->decode(\hex2bin('0000')); + $result = '{"errorCode":0}'; + $this->assertEquals(json_encode($test), $result); + } + + // }}} + // }}} +} diff --git a/tests/KafkaTest/Protocol/ListGroupTest.php b/tests/KafkaTest/Protocol/ListGroupTest.php new file mode 100644 index 00000000..7af67fc2 --- /dev/null +++ b/tests/KafkaTest/Protocol/ListGroupTest.php @@ -0,0 +1,96 @@ +list)) { + $this->list = new \Kafka\Protocol\ListGroup('0.9.0.1'); + } + } + + // }}} + // {{{ public function testEncode() + + /** + * testEncode + * + * @access public + * @return void + */ + public function testEncode() + { + $data = array( + ); + + $test = $this->list->encode($data); + $this->assertEquals(\bin2hex($test), '00000013001000000000001000096b61666b612d706870'); + } + + // }}} + // {{{ public function testDecode() + + /** + * testDecode + * + * @access public + * @return void + */ + public function testDecode() + { + $test = $this->list->decode(\hex2bin('0000000000010004746573740008636f6e73756d6572')); + $result = '{"errorCode":0,"groups":[{"groupId":"test","protocolType":"consumer"}]}'; + $this->assertEquals(json_encode($test), $result); + } + + // }}} + // }}} +} diff --git a/tests/run-tests.php b/tests/run-tests.php index 9ffc4184..f6e952c1 100755 --- a/tests/run-tests.php +++ b/tests/run-tests.php @@ -91,9 +91,9 @@ echo "$component:\n"; system("$phpunit_bin $phpunit_opts $tmp_coverage " . escapeshellarg(__DIR__ . '/' . $component), $c_result); echo "\n\n"; - if ($c_result == 2) { - $c_result = 0; - } + if ($c_result == 2) { + $c_result = 0; + } if ($c_result) { $result = $c_result; }