-
Notifications
You must be signed in to change notification settings - Fork 0
/
pir_telegram.py
126 lines (104 loc) · 4.16 KB
/
pir_telegram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from telegram import ReplyKeyboardMarkup, KeyboardButton, ChatAction
from telegram.ext import Updater, CommandHandler
import logging
import os
import RPi.GPIO as GPIO
import picamera
import datetime
#Modifier la borne GPIO correspodant à l'endroit où est branché le capteur pyroélectrique.
sensor = 6
GPIO.setmode(GPIO.BCM)
GPIO.setup(sensor, GPIO.IN, GPIO.PUD_DOWN)
previous_state = False
current_state = False
service = "hors"
start, stop, ping, photo, video, quit = ("/start","/stop","/ping","/photo","/video","/quit")
reply = ReplyKeyboardMarkup([[KeyboardButton(start),KeyboardButton(stop)],[KeyboardButton(ping),KeyboardButton(photo),KeyboardButton(video)]],one_time_keyboard=False)
upphoto = ChatAction.UPLOAD_PHOTO
upvideo = ChatAction.UPLOAD_VIDEO
chatid = 'none'
job = None
camera = picamera.PiCamera()
#Ajuster les paramètres de caméra ici
camera.resolution = (1280, 720)
camera.framerate = 25
camera.rotation = 180
camera.exposure_mode = 'night'
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
def utilisateur(user):
#Modifier les noms d'utilisateurs pour votre bot
if (user == "utilisateur 1" or user == "utilisateur 2"):
return True
else:
return False
def start(bot, update):
global service, chatid, job
if (utilisateur(update.message.from_user.username) and service == "hors"):
service = "en"
chatid = update
bot.sendMessage(chatid.message.chat_id, text="Mise en service de l'alarme.", reply_markup=reply)
job.put(alarme, 1, repeat=True)
def stop(bot, update):
global service, job
if (utilisateur(update.message.from_user.username) and service == "en"):
service = "hors"
bot.sendMessage(update.message.chat_id, text="Mise hors service de l'alarme.", reply_markup=reply)
job.stop()
def ping(bot, update):
if (utilisateur(update.message.from_user.username)):
bot.sendMessage(update.message.chat_id, text="L'alarme est %s service !" % service, reply_markup=reply)
def quitter(bot, update):
if (utilisateur(update.message.from_user.username)):
bot.sendMessage(update.message.chat_id, text="Good Bye !")
os._exit(0)
def alarme(bot):
global previous_state, current_state, service
if (service == "en" and not camera.recording):
previous_state = current_state
current_state = GPIO.input(sensor)
if (current_state != previous_state):
new_state = "Présence détectée" if current_state else "Pas de présence"
bot.sendMessage(chatid.message.chat_id, text=new_state)
if (new_state == "Présence détectée"):
sendimage(bot, chatid)
def sendimage(bot, update):
if (utilisateur(update.message.from_user.username)):
bot.sendChatAction(update.message.chat_id, action=upphoto)
ladate = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H-%M-%S')
filename = "%s.jpg" % ladate
camera.annotate_text = ladate
camera.capture(filename)
bot.sendPhoto(update.message.chat_id, photo=open(filename, 'rb'), caption = ladate)
def sendvideo(bot, update):
if (utilisateur(update.message.from_user.username)):
bot.sendMessage(update.message.chat_id, text="Enregistrement de la vidéo...")
ladate = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d_%H-%M-%S')
filename = "%s.mp4" % ladate
camera.start_recording('Video.h264', format='h264')
camera.wait_recording(10)
camera.stop_recording()
os.system("MP4Box -add Video.h264 " + filename)
bot.sendChatAction(update.message.chat_id, action=upvideo)
bot.sendVideo(update.message.chat_id, video=open(filename, 'rb'), caption = ladate)
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
def main():
global job
#Remplacer par votre Token
updater = Updater("Token")
job = updater.job_queue
dp = updater.dispatcher
dp.addHandler(CommandHandler("start", start))
dp.addHandler(CommandHandler("stop", stop))
dp.addHandler(CommandHandler("ping", ping))
dp.addHandler(CommandHandler("photo", sendimage))
dp.addHandler(CommandHandler("video", sendvideo))
dp.addHandler(CommandHandler("quit", quitter))
dp.addErrorHandler(error)
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()