forked from lichess-bot-devs/lichess-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
205 lines (161 loc) · 7.13 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import argparse
import chess
import engine_wrapper
import model
import json
import lichess
import logging
import multiprocessing
import queue
import os
import traceback
import yaml
import logging_pool
from conversation import Conversation, ChatLine
CONFIG = {}
def upgrade_account(li):
if li.upgrade_to_bot_account() is None:
return False
print("Succesfully upgraded to Bot Account!")
return True
def watch_control_stream(control_queue, li):
with logging_pool.LoggingPool(CONFIG['max_concurrent_games']+1) as pool:
for evnt in li.get_event_stream().iter_lines():
if evnt:
event = json.loads(evnt.decode('utf-8'))
control_queue.put_nowait(event)
def start(li, user_profile, engine_path, weights=None, threads=None):
# init
username = user_profile.get("username")
print("Welcome {}!".format(username))
manager = multiprocessing.Manager()
challenge_queue = []
control_queue = manager.Queue()
control_stream = multiprocessing.Process(target=watch_control_stream, args=[control_queue, li])
control_stream.start()
busy_processes = 0
queued_processes = 0
with logging_pool.LoggingPool(CONFIG['max_concurrent_games']+1) as pool:
events = li.get_event_stream().iter_lines()
quit = False
while not quit:
event = control_queue.get()
if event["type"] == "local_game_done":
busy_processes -= 1
print("+++ Process Free. Total Queued: {}. Total Used: {}".format(queued_processes, busy_processes))
elif event["type"] == "challenge":
chlng = model.Challenge(event["challenge"])
if len(challenge_queue) < CONFIG["max_queued_challenges"] and can_accept_challenge(chlng):
challenge_queue.append(chlng)
print(" Queue {}".format(chlng.show()))
else:
print(" Decline {}".format(chlng.show()))
li.decline_challenge(chlng.id)
elif event["type"] == "gameStart":
if queued_processes <= 0:
print("Something went wrong. Game is starting and we don't have a queued process")
else:
queued_processes -= 1
game_id = event["game"]["id"]
pool.apply_async(play_game, [li, game_id, engine_path, weights, threads, control_queue])
busy_processes += 1
print("--- Process Used. Total Queued: {}. Total Used: {}".format(queued_processes, busy_processes))
if (queued_processes + busy_processes) < CONFIG["max_concurrent_games"] and challenge_queue :
chlng = challenge_queue.pop(0)
print(" Accept {}".format(chlng.show()))
response = li.accept_challenge(chlng.id)
if response is not None:
# TODO: Probably warrants better checking.
queued_processes += 1
print("--- Process Queue. Total Queued: {}. Total Used: {}".format(queued_processes, busy_processes))
control_stream.terminate()
control_stream.join()
def play_game(li, game_id, engine_path, weights, threads, control_queue):
username = li.get_profile()["username"]
updates = li.get_game_stream(game_id).iter_lines()
#Initial response of stream will be the full game info. Store it
game = model.Game(json.loads(next(updates).decode('utf-8')), username, li.baseUrl)
board = setup_board(game.state)
engine = setup_engine(engine_path, board, weights, threads)
conversation = Conversation(game, engine, li)
print("+++ {}".format(game.show()))
engine.pre_game(game)
board = play_first_move(game, engine, board, li)
for binary_chunk in updates:
upd = json.loads(binary_chunk.decode('utf-8')) if binary_chunk else None
u_type = upd["type"] if upd else "ping"
if u_type == "chatLine":
conversation.react(ChatLine(upd))
elif u_type == "gameState":
moves = upd.get("moves").split()
board = update_board(board, moves[-1])
if is_engine_move(game.is_white, moves):
best_move = engine.search(board, upd.get("wtime"), upd.get("btime"), upd.get("winc"), upd.get("binc"))
li.make_move(game.id, best_move)
if CONFIG.get("print_engine_stats"):
engine.print_stats()
print("--- {} Game over".format(game.url()))
engine.quit()
# This can raise queue.NoFull, but that should only happen if we're not processing
# events fast enough and in this case I believe the exception should be raised
control_queue.put_nowait({"type": "local_game_done"})
def can_accept_challenge(chlng):
return chlng.is_supported(CONFIG)
def play_first_move(game, engine, board, li):
moves = game.state["moves"].split()
if is_engine_move(game.is_white, moves):
# need to hardcode first movetime since Lichess has 30 sec limit.
best_move = engine.first_search(board, 2000)
li.make_move(game.id, best_move)
return board
def setup_board(state):
board = chess.Board()
moves = state["moves"].split()
for move in moves:
board = update_board(board, move)
return board
def setup_engine(engine_path, board, weights=None, threads=None):
# print("Loading Engine!")
commands = [engine_path]
if weights:
commands.append("-w")
commands.append(weights)
if threads:
commands.append("-t")
commands.append(threads)
global CONFIG
if CONFIG["engine"].get("protocol") == "xboard":
return engine_wrapper.XBoardEngine(board, commands)
return engine_wrapper.UCIEngine(board, commands, CONFIG.get("ucioptions"))
def is_white_to_move(moves):
return (len(moves) % 2) == 0
def is_engine_move(is_white, moves):
is_w = (is_white and is_white_to_move(moves))
is_b = (is_white is False and is_white_to_move(moves) is False)
return (is_w or is_b)
def update_board(board, move):
uci_move = chess.Move.from_uci(move)
board.push(uci_move)
return board
def load_config():
global CONFIG
with open("./config.yml", 'r') as stream:
CONFIG = yaml.load(stream)
if __name__ == "__main__":
logger = logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description='Play on Lichess with a bot')
parser.add_argument('-u', action='store_true', help='Add this flag to upgrade your account to a bot account.')
args = parser.parse_args()
load_config()
li = lichess.Lichess(CONFIG["token"], CONFIG["url"])
user_profile = li.get_profile()
is_bot = user_profile.get("title") == "BOT"
if args.u is True and is_bot is False:
is_bot = upgrade_account(li)
if is_bot:
cfg = CONFIG["engine"]
engine_path = os.path.join(cfg["dir"], cfg["name"])
weights_path = os.path.join(cfg["dir"], cfg["weights"]) if "weights" in cfg else None
start(li, user_profile, engine_path, weights_path, cfg.get("threads"))
else:
print("{} is not a bot account. Please upgrade your it to a bot account!".format(user_profile["username"]))