import asyncio import logging import re import time from typing import Optional, Tuple import discord from discord.ext.tasks import loop from bot import constants, parsers, helpers from bot.constants import PlayOptions logger = logging.getLogger(__file__) logger.setLevel(constants.LOGGING_LEVEL) class UnbelievaClient(discord.Client): def __init__(self, bot_id: int, channel_id: int, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.last_message = -1 self.bot_id, self.channel_id = bot_id, channel_id self.channel: Optional[discord.TextChannel] = None self.bot: Optional[discord.User] = None self.tasks = [ ('$work', 5 * 60 + 5), ('$slut', 13 * 60 + 5), ('$crime', 20 * 60 + 5) ] self.task_times = {} self.money = 0 self.last_deposit = -1 self.last_user_deposit = -1 self.current_blackjack: Optional[discord.Message] = None async def on_ready(self): await self.wait_until_ready() self.channel: discord.TextChannel = self.get_channel(self.channel_id) self.bot = self.bot_id self.check_task_available.start() logger.info(f'Connected to #{self.channel.name} in {self.channel.guild.name}') async def on_message(self, message: discord.Message): # Ignore messages in other channels or sent by myself if message.channel != self.channel or message.author == self.user: return if message.author.id == self.bot and len(message.embeds) > 0: embed = message.embeds[0] is_self = embed.author.name == f'{self.user.name}#{self.user.discriminator}' if is_self: if parsers.TaskCooldownMessage.check_valid(message): tcm = parsers.TaskCooldownMessage(message) logger.debug(f'"{tcm.duration_unparsed}" => {tcm.duration}s') logger.debug(f'Changed {tcm.task_type} to wait {tcm.duration + 2}s instead.') self.task_times[tcm.task_type] = tcm.available_at # helpers.print_embed(embed) # Handling earnings if parsers.TaskResponse.check_valid(message): tr = parsers.TaskResponse(message) self.money += tr.change logger.log(logging.INFO if is_self else logging.DEBUG, tr.log_message(embed.author.name)) # Handling for blackjack if embed.description.startswith('Type `hit` to draw another card'): options = self.parse_options(embed.description) my_cards = self.parse_cards(embed.fields[0]) dealer_cards = self.parse_cards(embed.fields[1]) print(options, my_cards, dealer_cards) # self.current_blackjack = message def parse_options(self, options_str: str) -> PlayOptions: """ Return a tuple of booleans describing what the player can do. Tuple Options: [hit, stand, double_down, split] """ options = [f'`{sub}`' in options_str for sub in ['hit', 'stand', 'double down', 'split']] # noinspection PyProtectedMember return PlayOptions._make(options) def parse_cards(self, card_str: discord.embeds.EmbedProxy) -> Tuple[str, Optional[str], Tuple[int, bool]]: """ Parses a Embed Proxy :param card_str: :return: [Card1, Card2?, [Value, isSoft]] """ emote_pattern = r'<:([A-z0-9]+):\d+>' value_pattern = r'Value: (Soft )?(\d+)' cards = list(re.finditer(emote_pattern, card_str.value)) value = re.search(value_pattern, card_str.value) c2: Optional[str] c1, c2 = cards[0].group(1), cards[1].group(1) c2 = c2 if c2 != 'cardBack' else None return c1, c2, (int(value.group(2)), value.groups()[1] is None) def handle_blackjack(self): embed = self.current_blackjack.embeds[0] options = self.parse_options(embed.description) my_cards = self.parse_cards(embed.fields[0]) dealer_cards = self.parse_cards(embed.fields[1]) print(options, my_cards, dealer_cards) pass def handle_task_wait(self, match: re.Match): task_command = self.task_parsings[match.group(1)] duration_match = re.match( r'(\d+) (hour|minute|second)s?(?: and (\d+) (hour|minute|second)s?)?', match.group(2)) groups = len(list(filter(lambda s: s is not None, duration_match.groups()))) duration = 0 for i in range(0, groups // 2): x, y = (i * 2) + 1, (i * 2) + 2 duration += int(duration_match.group(x)) * self.durations[duration_match.group(y)] epoch = time.time() + duration + 2 logger.debug(f'"{match.group(2)}" => {duration}s') logger.debug(f'Changed {task_command} to wait {duration + 2}s instead.') self.task_times[task_command] = epoch @loop(seconds=1) async def check_task_available(self): await self.wait_until_ready() # Check for available tasks now = time.time() task_completed = False for task, duration in self.tasks: if self.task_times.get(task, 0) <= now: logger.debug(f'Planning to execute {task} {duration}s from now.') self.task_times[task] = now + duration await self.command_sleep() logger.debug(f'Executing {task} task.') await self.channel.send(task) self.last_message = time.time() task_completed = True if not task_completed: if self.last_deposit + (30 * 60) <= now: await self.command_sleep() await self.channel.send('$dep all') self.last_message = time.time() self.last_deposit = time.time() if self.current_blackjack is not None: self.handle_blackjack() async def command_sleep(self): """Sleep right before sending a command.""" now = time.time() time_between = now - self.last_message wait_time = 6 - time_between if wait_time > 0: logger.debug(f'Sleeping for {round(wait_time, 2)}s before sending a command...') await asyncio.sleep(wait_time)