You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
2.7 KiB

#!/usr/bin/python3
from subprocess import run
from textwrap import dedent
import os
import sys
import socket
import re
import signal
config = {
"token": None,
"nickname": 'ianmethyst',
"channel": '#ianmethyst',
"supress_own": True,
"short_notification": True
}
HOST = 'irc.chat.twitch.tv'
PORT = 6667
MESSAGE_REGEX = r':(.*)!.*@.* PRIVMSG #(.*) :(.*)'
sock = socket.socket()
connected = False
def get_token_from_env():
"""Get OAUTH token from environment variables or exit"""
try:
config['token'] = os.environ['TWITCH_OAUTH']
except KeyError:
helpText = """
Get your OAUTH token here: https://twitchapps.com/tmi/
Then pass it as an environment variable:
TWITCH_OAUTH=<token> {__file__}"
"""
print(dedent(helpText))
sys.exit(1)
def connect():
"""Connect to IRC and authenticate"""
sock.connect((HOST, PORT))
sock.send(f"PASS {config['token']}\n".encode('utf-8'))
sock.send(f"NICK {config['nickname']}\n".encode('utf-8'))
sock.send(f"JOIN {config['channel']}\n".encode('utf-8'))
def pong(resp):
"""Answer to server PING to keep socket connection alive"""
if resp.startswith('PING'):
sock.send("PONG\n".encode('utf-8'))
return True
return False
def wait_for_chat(resp):
"""Wait until all welcome and user list messages are received"""
if "End of /NAMES list" in resp:
global connected
connected = True
run(["notify-send",
f"connected to {config['channel']} as {config['nickname']}"])
def handle_message(resp):
"""Parse the message and send it as a notification"""
print(resp)
user, channel, message = re.search(MESSAGE_REGEX, resp).groups()
if user and channel and message:
if config['short_notification']:
notification_text = f"@{user}: {message}"
else:
notification_text = f"@{user} in #{channel}\n{message}"
if not (config['supress_own'] and config['nickname'] == user):
run(["notify-send", notification_text])
def handle_exit(sig, frame):
"""Close socket connection before exit"""
print(f"Got {sig.name}, closing socket")
sock.shutdown(socket.SHUT_RDWR)
sock.close()
sys.exit(0)
if __name__ == "__main__":
# Register signal handlers
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
get_token_from_env()
connect()
while True:
resp = sock.recv(2048).decode('utf-8')
if len(resp) > 0:
if not (pong(resp)):
if connected:
handle_message(resp)
else:
wait_for_chat(resp)