forked from borsik/telegram2twitter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
telegram2twitter.py
85 lines (61 loc) · 2.34 KB
/
telegram2twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import logging
import settings as cfg
import tweepy
from telegram.ext import Updater
from telegram.ext import MessageHandler, Filters
# Enable logging
logging.basicConfig(filename="telegram_bot.log", format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
def error(bot, update, error):
"""Log Errors caused by Updates."""
logger.warning('Update "%s" caused error "%s"', update, error)
def make_twitter_api():
auth = tweepy.OAuthHandler(cfg.twitter["consumer_key"], cfg.twitter["consumer_secret"])
auth.set_access_token(cfg.twitter["access_token"], cfg.twitter["access_token_secret"])
return tweepy.API(auth)
def slice_text(text):
text = text[0:130]
regs = [r"\n([^\n]*)$", r"\:\s([^:]*)$", r"\.\s([^.]*)$", r"\s([^\s]*)$"]
for r in regs:
text = re.sub(r, "", text)
if len(text) < 130:
break
return text.strip() + "\n\nRead more:"
def tweet_post(id, text, username=None):
channel = cfg.telegram["channel"]
twitter_api = make_twitter_api()
if username is None or username == channel:
link = "https://telegram.me/%s/%d " % (channel, id)
post = "%s %s" % (slice_text(text), link)
# print(tweet)
try:
twitter_api.update_status(post)
except tweepy.TweepError as e:
print(e.reason)
def tweet_post_update(bot, update):
username = update.channel_post.chat.username
id = update.channel_post.message_id
text = update.channel_post.text
tweet_post(id, text, username)
def main():
# Create the EventHandler and pass it your bot's token.
updater = Updater(token=cfg.telegram["token"])
# Get the dispatcher to register handlers
dp = updater.dispatcher
# Add message handler
dp.add_handler(MessageHandler(Filters.text, tweet_post_update, channel_post_updates=True))
# log all errors
dp.add_error_handler(error)
# Start the Bot# S
updater.start_polling()
logger.info("Bot started")
# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()