-
Notifications
You must be signed in to change notification settings - Fork 436
/
Copy pathTeraSdk.py
1145 lines (915 loc) · 38.8 KB
/
TeraSdk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
Tera Python SDK. It needs a libtera_c.so
TODO(taocipian) __init__.py
"""
from ctypes import CFUNCTYPE, POINTER
from ctypes import byref, cdll, string_at
from ctypes import c_bool, c_char_p, c_void_p
from ctypes import c_uint32, c_int32, c_int64, c_ubyte, c_uint64
class Status(object):
""" status code """
# C++ tera.h ErrorCode
OK = 0
NotFound = 1
BadParam = 2
System = 3
Timeout = 4
Busy = 5
NoQuota = 6
NoAuth = 7
Unknown = 8
NotImpl = 9
reason_list_ = ["ok", "not found", "bad parameter",
"unknown error", "request timeout", "busy",
"no quota", "operation not permitted", "unknown error",
"not implemented"]
def __init__(self, c):
""" init """
self.c_ = c
if c < 0 or c > len(Status.reason_list_) - 1:
self.reason_ = "bad status code"
else:
self.reason_ = Status.reason_list_[c]
def GetReasonString(self):
"""
Returns:
(string) status string
"""
return Status.reason_list_[self.c_]
def GetReasonNumber(self):
"""
Returns:
(long) status code
"""
return self.c_
class ScanDescriptor(object):
""" scan操作描述符
scan出[start_key, end_key)范围内的所有数据,每个cell默认返回最新的1个版本
"""
def __init__(self, start_key):
"""
Args:
start_key(string): scan操作的起始位置,scan结果包含start_key
"""
self.desc = lib.tera_scan_descriptor(start_key,
c_uint64(len(start_key)))
def Destroy(self):
"""
销毁这个scan_descriptor,释放底层资源,以后不得再使用这个对象
"""
lib.tera_scan_descriptor_destroy(self.desc)
def SetEnd(self, end_key):
"""
不调用此函数时,end_key被置为“无穷大”
Args:
end_key(string): scan操作的终止位置,scan结果不包含end_key
"""
lib.tera_scan_descriptor_set_end(self.desc, end_key,
c_uint64(len(end_key)))
def SetMaxVersions(self, versions):
"""
不调用此函数时,默认每个cell只scan出最新版本
Args:
versions(long): scan时某个cell最多被选出多少个版本
"""
lib.tera_scan_descriptor_set_max_versions(self.desc, versions)
def SetBufferSize(self, buffer_size):
"""
服务端将读取的数据攒到buffer里,最多积攒到达buffer_size以后返回一次,
也有可能因为超时或者读取到达终点而buffer没有满就返回,默认值 64 * 1024
这个选项对scan性能有非常明显的影响,
我们的测试显示,1024*1024(1MB)在很多场景下都有比较好的表现,
建议根据自己的场景进行调优
Args:
buffer_size: scan操作buffer的size,单位Byte
"""
lib.tera_scan_descriptor_set_buffer_size(self.desc, buffer_size)
def SetPackInterval(self, interval):
"""
设置scan操作的超时时长,单位ms
服务端在scan操作达到约 interval 毫秒后尽快返回给client结果
Args:
iinterval(long): 一次scan的超时时长,单位ms
"""
lib.tera_scan_descriptor_set_pack_interval(self.desc, interval)
def AddColumn(self, cf, qu):
"""
scan时选择某个Column(ColumnFamily + Qualifier),其它Column过滤掉不返回给客户端
Args:
cf(string): 需要的ColumnFamily名
qu(string): 需要的Qualifier名
"""
lib.tera_scan_descriptor_add_column(self.desc, cf,
qu, c_uint64(len(qu)))
def AddColumnFamily(self, cf):
"""
类同 AddColumn, 这里选择整个 ColumnFamily
Args:
cf(string): 需要的ColumnFamily名
"""
lib.tera_scan_descriptor_add_column_family(self.desc, cf)
def SetTimeRange(self, start, end):
"""
设置返回版本的时间范围
C++接口用户注意:C++的这个接口里start和end参数的顺序和这里相反!
Args:
start(long): 开始时间戳(结果包含该值),
Epoch (00:00:00 UTC, January 1, 1970), measured in us
end(long): 截止时间戳(结果包含该值),
Epoch (00:00:00 UTC, January 1, 1970), measured in us
"""
lib.tera_scan_descriptor_set_time_range(self.desc, start, end)
class ResultStream(object):
""" scan操作返回的输出流
"""
def __init__(self, stream):
""" init """
self.stream = stream
def Destroy(self):
"""
销毁这个result_stream,释放底层资源,以后不得再使用这个对象
"""
lib.tera_result_stream_destroy(self.stream)
def Done(self):
""" 此stream是否已经读完
Returns:
(bool) 如果已经读完,则返回 true, 否则返回 false.
"""
err = c_char_p()
return lib.tera_result_stream_done(self.stream, byref(err))
def Next(self):
""" 迭代到下一个cell
"""
lib.tera_result_stream_next(self.stream)
def RowName(self):
"""
Returns:
(string) 当前cell对应的Rowkey
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_result_stream_row_name(self.stream,
byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def Family(self):
"""
Returns:
(string) 当前cell对应的ColumnFamily
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_result_stream_family(self.stream, byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def Qualifier(self):
"""
Returns:
(string) 当前cell对应的Qulifier
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_result_stream_qualifier(self.stream,
byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def ColumnName(self):
"""
Returns:
(string) 当前cell对应的 ColumnName(即 ColumnFamily:Qulifier)
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_result_stream_column_name(self.stream,
byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def Value(self):
"""
Returns:
(string) 当前cell对应的value
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_result_stream_value(self.stream, byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def ValueInt64(self):
"""
Returns:
(long) 当前cell为一个int64计数器,取出该计数器的数值
对一个非int64计数器调用该方法,属未定义行为
"""
return lib.tera_result_stream_value_int64(self.stream)
def Timestamp(self):
"""
Returns:
(long) 当前cell对应的时间戳,
Epoch (00:00:00 UTC, January 1, 1970), measured in us
"""
return lib.tera_result_stream_timestamp(self.stream)
class Client(object):
""" 通过Client对象访问一个tera集群
使用建议:一个集群对应一个Client即可,如需访问多个Client,需要创建多个
"""
def __init__(self, conf_path, log_prefix):
"""
Raises:
TeraSdkException: 创建一个Client对象失败
"""
err = c_char_p()
self.client = lib.tera_client_open(conf_path, log_prefix, byref(err))
if self.client is None:
raise TeraSdkException("open client failed:" + str(err.value))
def Close(self):
"""
销毁这个client,释放底层资源,以后不得再使用这个对象
"""
lib.tera_client_close(self.client)
def OpenTable(self, name):
""" 打开名为<name>的表
Args:
name(string): 表名
Returns:
(Table) 打开的Table指针
Raises:
TeraSdkException: 打开table时出错
"""
err = c_char_p()
table_ptr = lib.tera_table_open(self.client, name, byref(err))
if table_ptr is None:
raise TeraSdkException("open table failed:" + err.value)
return Table(table_ptr)
MUTATION_CALLBACK = CFUNCTYPE(None, c_void_p)
class RowMutation(object):
""" 对某一行的变更
在Table.ApplyMutation()调用之前,
RowMutation的所有操作(如Put/DeleteColumn)都不会立即生效
"""
def __init__(self, mutation):
""" init """
self.mutation = mutation
def PutKV(self, value, ttl):
""" 写入(修改)值为<value>
Args:
value(string): cell的值
ttl: value 过期时间
"""
lib.tera_row_mutation_put_kv(self.mutation, value,
c_uint64(len(value)), c_int32(ttl))
def Put(self, cf, qu, value):
""" 写入(修改)这一行上
ColumnFamily为<cf>, Qualifier为<qu>的cell值为<value>
Args:
cf(string): ColumnFamily名
qu(string): Qualifier名
value(string): cell的值
"""
lib.tera_row_mutation_put(self.mutation, cf,
qu, c_uint64(len(qu)),
value, c_uint64(len(value)))
def PutWithTimestamp(self, cf, qu, timestamp, value):
""" 写入(修改)这一行上
ColumnFamily为<cf>, Qualifier为<qu>的cell值为<value>
指定版本(时间戳)为timestamp
Args:
cf(string): ColumnFamily名
qu(string): Qualifier名
timestamp(long): 版本号/时间戳
value(string): cell的值
"""
lib.tera_row_mutation_put_with_timestamp(self.mutation, cf,
qu, c_uint64(len(qu)),
timestamp,
value, c_uint64(len(value)))
def DeleteColumnAllVersions(self, cf, qu):
""" 删除这一行上
ColumnFamily为<cf>, Qualifier为<qu>的cell的所有版本
如果没有用到多版本机制或本列只存储了一个版本(默认情况),
那么使用`DeleteColumnAllVersions`而不是`DeleteColumnWithVersion`来删除本列会更方便,
因为不用指定timestamp作为版本号。
Args:
cf(string): ColumnFamily名
qu(string): Qualifier名
"""
lib.tera_row_mutation_delete_column_all_versions(self.mutation, cf,
qu, c_uint64(len(qu)))
def DeleteColumnWithVersion(self, cf, qu, ts):
""" 删除这一行上
ColumnFamily为<cf>, Qualifier为<qu>的cell中Timestamp为<ts>的那个版本
Args:
cf(string): ColumnFamily名
qu(string): Qualifier名
ts(long): Timestamp(版本号)
"""
lib.tera_row_mutation_delete_column_with_version(self.mutation, cf,
qu, c_uint64(len(qu)),
ts)
def DeleteFamily(self, cf):
""" 删除ColumnFamily下所有列的所有版本
Args:
cf(string): ColumnFamily名
"""
lib.tera_row_mutation_delete_family(self.mutation, cf)
def DeleteRow(self):
""" 删除整行
"""
lib.tera_row_mutation_delete_row(self.mutation)
def RowKey(self):
"""
Returns:
(string): 此RowMutation对象的rowkey,例如可用在回调中
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_row_mutation_rowkey(self.mutation,
byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def SetCallback(self, callback):
""" 设置回调
调用此函数则本次变更为异步(Table.ApplyMutation()立即返回);
否则本次变更为同步(Table.ApplyMutation()等待写入操作完成后返回)。
Args:
callback(MUTATION_CALLBACK): 用户回调,不论任何情况,最终都会被调用
"""
lib.tera_row_mutation_set_callback(self.mutation, callback)
def GetStatus(self):
"""
返回本次Mutation的结果状态
Returns:
(class Status) 操作结果状态,可以获知成功或失败,若失败,具体原因
"""
return Status(lib.tera_row_mutation_get_status_code(self.mutation))
def Destroy(self):
"""
销毁这个mutation,释放底层资源,以后不得再使用这个对象
"""
lib.tera_row_mutation_destroy(self.mutation)
# Deprecated
def DeleteColumn(self, cf, qu):
""" 删除这一行上
ColumnFamily为<cf>, Qualifier为<qu>的cell
Args:
cf(string): ColumnFamily名
qu(string): Qualifier名
"""
lib.tera_row_mutation_delete_column(self.mutation, cf,
qu, c_uint64(len(qu)))
# Deprecated
def PutInt64(self, cf, qu, value):
""" 写入(修改)这一行上
ColumnFamily为<cf>, Qualifier为<qu>的cell值为<value>
Args:
cf(string): ColumnFamily名
qu(string): Qualifier名
value(long): cell的值
"""
lib.tera_row_mutation_put_int64(self.mutation, cf,
qu, c_uint64(len(qu)), value)
class Table(object):
""" 对表格的所有增删查改操作由此发起
通过Client.OpenTable()获取一个Table对象
"""
def __init__(self, table):
""" init """
self.table = table
def Close(self):
"""
销毁这个table,释放底层资源,以后不得再使用这个对象
"""
lib.tera_table_close(self.table)
def NewRowMutation(self, rowkey):
""" 生成一个对 rowkey 的RowMutation对象(修改一行)
一个RowMutation对某一行的操作(例如多列修改)是原子的
Args:
rowkey(string): 待变更的rowkey
Returns:
(class RowMutation): RowMutation对象
"""
return RowMutation(lib.tera_row_mutation(self.table, rowkey,
c_uint64(len(rowkey))))
def ApplyMutation(self, mutation):
""" 应用一次变更,
如果之前调用过 SetCallback() 则本次调用为异步,否则为同步
Args:
mutation(class RowMutation): RowMutation对象
"""
lib.tera_table_apply_mutation(self.table, mutation.mutation)
def NewRowReader(self, rowkey):
""" 生成一个对 rowkey 的RowReader对象(读取一行)
一个RowReader对某一行的操作(例如读取多列)是原子的
Args:
rowkey(string): 待读取的rowkey
Returns:
(class RowReader): RowReader对象
"""
return RowReader(lib.tera_row_reader(self.table, rowkey,
c_uint64(len(rowkey))))
def ApplyReader(self, reader):
""" 应用一次读取,
如果之前调用过 SetCallback() 则本次调用为异步,否则为同步
Args:
reader(class RowReader): RowReader对象
"""
lib.tera_table_apply_reader(self.table, reader.reader)
def IsPutFinished(self):
""" table的异步写操作是否*全部*完成
Returns:
(bool) 全部完成则返回true,否则返回false.
"""
return lib.tera_table_is_put_finished(self.table)
def IsGetFinished(self):
""" table的异步读操作是否*全部*完成
Returns:
(bool) 全部完成则返回true,否则返回false.
"""
return lib.tera_table_is_get_finished(self.table)
def BatchGet(self, row_reader_list):
""" 批量get
用法类似 ApplyReader
Args:
row_reader_list(RowReader): 预先构造好的RowReader列表
每一行的读取结果存储在row_reader_list里对应的每个RowReader内,
如果该行读取成功(即返回的状态码是OK),
那么可以调用诸如RowReader.Value()访问读取结果
否则读取出错,通过状态码确定原因。
用法详见sample.py
"""
num = len(row_reader_list)
r = list()
for i in row_reader_list:
r.append(i.reader)
reader_array = (c_void_p * num)(*r)
lib.tera_table_apply_reader_batch(self.table, reader_array, num)
def Get(self, rowkey, cf, qu, snapshot=0):
""" 同步get一个cell的值
Args:
rowkey(string): Rowkey的值
cf(string): ColumnFamily名
qu(string): Qualifier名
snapshot(long): 快照,不关心的用户设置为0即可
Returns:
(string) cell的值
Raises:
TeraSdkException: 读操作失败
"""
err = c_char_p()
value = POINTER(c_ubyte)()
vallen = c_uint64()
result = lib.tera_table_get(
self.table, rowkey, c_uint64(len(rowkey)), cf,
qu, c_uint64(len(qu)), byref(value), byref(vallen), byref(err),
c_uint64(snapshot)
)
if not result:
raise TeraSdkException("get record failed:" + err.value)
return copy_string_to_user(value, long(vallen.value))
def GetInt64(self, rowkey, cf, qu, snapshot):
""" 类同Get()接口,区别是将cell的内容作为int64计数器返回
对非int64计数器的cell调用此方法属于未定义行为
Args:
rowkey(string): Rowkey的值
cf(string): ColumnFamily名
qu(string): Qualifier名
snapshot(long): 快照,不关心的用户设置为0即可
Returns:
(long) cell的数值
Raises:
TeraSdkException: 读操作失败
"""
err = c_char_p()
value = c_int64()
result = lib.tera_table_getint64(
self.table, rowkey, c_uint64(len(rowkey)), cf,
qu, c_uint64(len(qu)), byref(value), byref(err),
c_uint64(snapshot)
)
if not result:
raise TeraSdkException("get record failed:" + err.value)
return long(value.value)
def Put(self, rowkey, cf, qu, value):
""" 同步put一个cell的值
Args:
rowkey(string): Rowkey的值
cf(string): ColumnFamily名
qu(string): Qualifier名
value(string): cell的值
Raises:
TeraSdkException: 写操作失败
"""
err = c_char_p()
result = lib.tera_table_put(
self.table, rowkey, c_uint64(len(rowkey)), cf,
qu, c_uint64(len(qu)), value, c_uint64(len(value)), byref(err)
)
if not result:
raise TeraSdkException("put record failed:" + err.value)
def BatchPut(self, row_mutation_list):
""" 批量put
用法类似 ApplyMutation
Args:
row_mutation_list(RowMutation): 预先构造好的RowMutation列表
每一行的写入操作返回状态存储在row_mutation_list里对应的每个RowMutation内,
如果写入失败,通过状态码确定原因。
用法详见sample.py
"""
num = len(row_mutation_list)
r = list()
for i in row_mutation_list:
r.append(i.mutation)
mutation_array = (c_void_p * num)(*r)
lib.tera_table_apply_mutation_batch(self.table, mutation_array, num)
def PutInt64(self, rowkey, cf, qu, value):
""" 类同Put()方法,区别是这里的参数value可以是一个数字(能够用int64表示)计数器
Args:
rowkey(string): Rowkey的值
cf(string): ColumnFamily名
qu(string): Qualifier名
value(long): cell的数值,能够用int64表示
Raises:
TeraSdkException: 写操作失败
"""
err = c_char_p()
result = lib.tera_table_putint64(
self.table, rowkey, c_uint64(len(rowkey)), cf,
qu, c_uint64(len(qu)), value, byref(err)
)
if not result:
raise TeraSdkException("put record failed:" + err.value)
def Delete(self, rowkey, cf, qu):
""" 同步删除某个cell
Args:
rowkey(string): Rowkey的值
cf(string): ColumnFamily名
qu(string): Qualifier名
"""
lib.tera_table_delete(
self.table, rowkey, c_uint64(len(rowkey)),
cf, qu, c_uint64(len(qu))
)
def Scan(self, desc):
""" 发起一次scan操作
Args:
desc(ScanDescriptor): scan操作描述符
Raises:
TeraSdkException: scan失败
"""
err = c_char_p()
stream = lib.tera_table_scan(
self.table,
desc.desc,
byref(err)
)
if stream is None:
raise TeraSdkException("scan failed:" + err.value)
return ResultStream(stream)
READER_CALLBACK = CFUNCTYPE(None, c_void_p)
class RowReader(object):
""" 提供随机读取一行的功能
"""
def __init__(self, reader):
""" init """
self.reader = reader
def AddColumnFamily(self, cf):
""" 添加期望读取的ColumnFamily
默认读取一行(row)的全部ColumnFamily
Args:
cf(string): 期望读取的ColumnFamily
"""
lib.tera_row_reader_add_column_family(self.reader, cf)
def AddColumn(self, cf, qu):
""" 添加期望读取的Column
默认读取一行(row)的全部Column(ColumnFamily + Qualifier)
Args:
cf(string): 期望读取的ColumnFamily
qu(string): 期望读取的Qualifier
"""
lib.tera_row_reader_add_column(self.reader, cf, qu, c_uint64(len(qu)))
def SetCallback(self, callback):
""" 设置回调
调用此函数则本次随机读为异步(Table.ApplyReader()立即返回);
否则本次随机读为同步(Table.ApplyReader()等待读取操作完成后返回)
可以在回调中执行 Done() 和 Next() 对返回的结果进行迭代处理
Args:
callback(READER_CALLBACK): 用户回调,不论任何情况,最终都会被调用
"""
lib.tera_row_reader_set_callback(self.reader, callback)
def SetTimestamp(self, ts):
""" set timestamp """
lib.tera_row_reader_set_timestamp(self.reader, ts)
def SetTimeRange(self, start, end):
""" set time range """
lib.tera_row_reader_set_time_range(self.reader, start, end)
def SetSnapshot(self, snapshot):
""" set snapshot """
lib.tera_row_reader_set_snapshot(self.reader, snapshot)
def SetMaxVersions(self, versions):
""" set max versions """
lib.tera_row_reader_set_max_versions(self.reader, versions)
def SetTimeout(self, timeout):
""" set timeout """
lib.tera_row_reader_set_timeout(self.reader, timeout)
def Done(self):
""" 结果是否已经读完
Returns:
(bool) 如果已经读完,则返回 true, 否则返回 false.
"""
return lib.tera_row_reader_done(self.reader)
def Next(self):
""" 迭代到下一个cell
"""
lib.tera_row_reader_next(self.reader)
def RowKey(self):
"""
Returns:
(string) 当前cell对应的rowkey
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_row_reader_rowkey(self.reader,
byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def Value(self):
"""
Returns:
(string) 当前cell对应的value
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_row_reader_value(self.reader, byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def ValueInt64(self):
"""
Returns:
(long) 当前cell对应的value
"""
return long(lib.tera_row_reader_value_int64(self.reader))
def Family(self):
"""
Returns:
(string) 当前cell对应的ColumnFamily
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_row_reader_family(self.reader, byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def Qualifier(self):
"""
Returns:
(string) 当前cell对应的Qulifier
"""
value = POINTER(c_ubyte)()
vallen = c_uint64()
lib.tera_row_reader_qualifier(self.reader, byref(value), byref(vallen))
return copy_string_to_user(value, long(vallen.value))
def Timestamp(self):
"""
Returns:
(long) 当前cell对应的时间戳,Unix time
"""
return lib.tera_row_reader_timestamp(self.reader)
def GetStatus(self):
"""
返回本次RowReader读取的结果状态
Returns:
(class Status) 操作结果状态,可以获知成功或失败,若失败,具体原因
"""
return Status(lib.tera_row_reader_get_status_code(self.reader))
def Destroy(self):
"""
销毁这个mutation,释放底层资源,以后不得再使用这个对象
"""
lib.tera_row_reader_destroy(self.reader)
class TeraSdkException(Exception):
""" exception """
def __init__(self, reason):
""" init """
self.reason = reason
def __str__(self):
""" str """
return self.reason
##########################
# 以下代码用户不需要关心 #
##########################
def init_function_prototype_for_scan():
""" scan """
######################
# scan result stream #
######################
lib.tera_result_stream_done.argtypes = [c_void_p,
POINTER(c_char_p)]
lib.tera_result_stream_done.restype = c_bool
lib.tera_result_stream_destroy.argtypes = [c_void_p]
lib.tera_result_stream_destroy.restype = None
lib.tera_result_stream_timestamp.argtypes = [c_void_p]
lib.tera_result_stream_timestamp.restype = c_int64
lib.tera_result_stream_column_name.argtypes = [c_void_p,
POINTER(POINTER(c_ubyte)),
POINTER(c_uint64)]
lib.tera_result_stream_column_name.restype = None
lib.tera_result_stream_family.argtypes = [c_void_p,
POINTER(POINTER(c_ubyte)),
POINTER(c_uint64)]
lib.tera_result_stream_family.restype = None
lib.tera_result_stream_next.argtypes = [c_void_p]
lib.tera_result_stream_next.restype = None
lib.tera_result_stream_qualifier.argtypes = [c_void_p,
POINTER(POINTER(c_ubyte)),
POINTER(c_uint64)]
lib.tera_result_stream_qualifier.restype = None
lib.tera_result_stream_row_name.argtypes = [c_void_p,
POINTER(POINTER(c_ubyte)),
POINTER(c_uint64)]
lib.tera_result_stream_row_name.restype = None
lib.tera_result_stream_value.argtypes = [c_void_p,
POINTER(POINTER(c_ubyte)),
POINTER(c_uint64)]
lib.tera_result_stream_value.restype = None
lib.tera_result_stream_value_int64.argtypes = [c_void_p]
lib.tera_result_stream_value_int64.restype = c_int64
###################
# scan descriptor #
###################
lib.tera_scan_descriptor.argtypes = [c_char_p, c_uint64]
lib.tera_scan_descriptor.restype = c_void_p
lib.tera_scan_descriptor_destroy.argtypes = [c_void_p]
lib.tera_scan_descriptor_destroy.restype = None
lib.tera_scan_descriptor_add_column.argtypes = [c_void_p, c_char_p,
c_char_p, c_uint64]
lib.tera_scan_descriptor_add_column.restype = None
lib.tera_scan_descriptor_add_column_family.argtypes = [c_void_p, c_char_p]
lib.tera_scan_descriptor_add_column_family.restype = None
lib.tera_scan_descriptor_set_buffer_size.argtypes = [c_void_p, c_int64]
lib.tera_scan_descriptor_set_buffer_size.restype = None
lib.tera_scan_descriptor_set_end.argtypes = [c_void_p, c_char_p, c_uint64]
lib.tera_scan_descriptor_set_end.restype = None
lib.tera_scan_descriptor_set_pack_interval.argtypes = [c_char_p, c_int64]
lib.tera_scan_descriptor_set_pack_interval.restype = None
lib.tera_scan_descriptor_set_max_versions.argtypes = [c_void_p, c_int32]
lib.tera_scan_descriptor_set_max_versions.restype = None
lib.tera_scan_descriptor_set_snapshot.argtypes = [c_void_p, c_uint64]
lib.tera_scan_descriptor_set_snapshot.restype = None
lib.tera_scan_descriptor_set_time_range.argtypes = [c_void_p,
c_int64, c_int64]
lib.tera_scan_descriptor_set_time_range.restype = None
def init_function_prototype_for_client():
""" client """
lib.tera_client_open.argtypes = [c_char_p, c_char_p, POINTER(c_char_p)]
lib.tera_client_open.restype = c_void_p
lib.tera_client_close.argtypes = [c_void_p]
lib.tera_client_close.restype = None
lib.tera_table_open.argtypes = [c_void_p, c_char_p, POINTER(c_char_p)]
lib.tera_table_open.restype = c_void_p
lib.tera_table_close.argtypes = [c_void_p]
lib.tera_table_close.restype = None
def init_function_prototype_for_table():
""" table """
lib.tera_table_get.argtypes = [c_void_p, c_char_p, c_uint64,
c_char_p, c_char_p, c_uint64,
POINTER(POINTER(c_ubyte)),
POINTER(c_uint64),
POINTER(c_char_p), c_uint64]
lib.tera_table_get.restype = c_bool
lib.tera_table_getint64.argtypes = [c_void_p, c_char_p, c_uint64,
c_char_p, c_char_p, c_uint64,
POINTER(c_int64), POINTER(c_char_p),
c_uint64]
lib.tera_table_getint64.restype = c_bool
lib.tera_table_put.argtypes = [c_void_p, c_char_p, c_uint64, c_char_p,
c_char_p, c_uint64, c_char_p, c_uint64,
POINTER(c_char_p)]
lib.tera_table_put.restype = c_bool
lib.tera_table_put_kv.argtypes = [c_void_p, c_char_p, c_uint64,
c_char_p, c_uint64, c_int32,
POINTER(c_char_p)]
lib.tera_table_put_kv.restype = c_bool
lib.tera_table_putint64.argtypes = [c_void_p, c_char_p, c_uint64, c_char_p,
c_char_p, c_uint64, c_int64,
POINTER(c_char_p)]
lib.tera_table_putint64.restype = c_bool
lib.tera_table_scan.argtypes = [c_void_p, c_void_p, POINTER(c_char_p)]
lib.tera_table_scan.restype = c_void_p
lib.tera_table_delete.argtypes = [c_void_p, c_char_p, c_uint64,
c_char_p, c_char_p, c_uint64]
lib.tera_table_delete.restype = c_bool
lib.tera_table_apply_mutation.argtypes = [c_void_p, c_void_p]
lib.tera_table_apply_mutation.restype = None
lib.tera_table_apply_mutation_batch.argtypes = [c_void_p,
c_void_p,
c_int64]
lib.tera_table_apply_mutation_batch.restype = None
lib.tera_table_is_put_finished.argtypes = [c_void_p]
lib.tera_table_is_put_finished.restype = c_bool
lib.tera_table_apply_reader.argtypes = [c_void_p, c_void_p]
lib.tera_table_apply_reader.restype = None
lib.tera_table_apply_reader_batch.argtypes = [c_void_p, c_void_p, c_int64]
lib.tera_table_apply_reader_batch.restype = None
lib.tera_table_is_get_finished.argtypes = [c_void_p]
lib.tera_table_is_get_finished.restype = c_bool
lib.tera_row_mutation.argtypes = [c_void_p, c_char_p, c_uint64]
lib.tera_row_mutation.restype = c_void_p
lib.tera_row_mutation_get_status_code.argtypes = [c_void_p]
lib.tera_row_mutation_get_status_code.restype = c_int64
lib.tera_row_mutation_destroy.argtypes = [c_void_p]
lib.tera_row_mutation_destroy.restype = None