import os
import json
import requests
import threading
import time
import asyncio
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
TOKEN = "boş bıraktım bilerek"
SUBSCRIBERS_FILE = "subscribers.json"
LAST_VINS_FILE = "last_vins.json"
API_URL = ('https://www.tesla.com/inventory/api/v1/inventory-results?query='
'{"model":"my","options":{"trims":[],"condition":"new","markets":["TR"],'
'"language":"tr","super_region":"EMEA"}}')
async def load_json_file(filepath, default):
if os.path.exists(filepath):
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"Dosya okunamadı veya bozuk: {filepath} - {e}")
return default
async def save_json_file(filepath, data):
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Dosya kaydedilemedi: {filepath} - {e}")
async def load_subscribers():
return await load_json_file(SUBSCRIBERS_FILE, [])
async def save_subscribers(subscribers):
await save_json_file(SUBSCRIBERS_FILE, subscribers)
async def load_last_vins():
return await load_json_file(LAST_VINS_FILE, [])
async def save_last_vins(vins):
await save_json_file(LAST_VINS_FILE, vins)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
subscribers = await load_subscribers()
if chat_id not in subscribers:
subscribers.append(chat_id)
await save_subscribers(subscribers)
await update.message.reply_text("✅ Takibe alındınız! Yeni Model Y araçları geldiğinde bildirim alacaksınız.")
else:
await update.message.reply_text("Zaten kayıtlısınız.")
async def test(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("✅ Bot çalışıyor ve sana mesaj gönderebiliyor!")
async def get_model_y_vehicles():
try:
response = requests.get(API_URL, timeout=10)
response.raise_for_status()
data = response.json()
vehicles = []
for item in data.get("results", []):
vehicles.append({
'price': item.get('Price', 'Bilinmiyor'),
'exterior': item.get('PAINT_DESCRIPTION', 'Bilinmiyor'),
'interior': item.get('INTERIOR_TRIM_TYPE', 'Bilinmiyor'),
'vin': item.get('VIN', 'Bilinmiyor'),
'link': f"https://www.tesla.com/tr_tr/inventory/new/my/{item.get('VIN', '')}"
})
return vehicles
except Exception as e:
print(f"API çağrısı hatası: {e}")
return []
async def notify_new_model_y(app):
subscribers = await load_subscribers()
if not subscribers:
return
vehicles = await get_model_y_vehicles()
last_vins = await load_last_vins()
new_vins = []
for vehicle in vehicles:
if vehicle['vin'] not in last_vins:
text = (
f"🚗 *Yeni Model Y Araç*\n\n"
f"💰 Fiyat: {vehicle['price']}\n"
f"🎨 Dış Renk: {vehicle['exterior']}\n"
f"🛋️ İç Renk: {vehicle['interior']}\n"
f"🔢 VIN: `{vehicle['vin']}`"
)
button = InlineKeyboardButton("🚀 Satın Al", url=vehicle['link'])
markup = InlineKeyboardMarkup([[button]])
for chat_id in subscribers:
try:
await app.bot.send_message(chat_id=chat_id, text=text, reply_markup=markup, parse_mode="Markdown")
except Exception as e:
print(f"Mesaj gönderilirken hata: chat_id={chat_id} - {e}")
new_vins.append(vehicle['vin'])
if new_vins:
await save_last_vins(last_vins + new_vins)
def start_periodic_check(app):
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
future = asyncio.run_coroutine_threadsafe(notify_new_model_y(app), loop)
try:
future.result()
except Exception as e:
print(f"Periyodik görev hata: {e}")
time.sleep(300)
thread = threading.Thread(target=run, daemon=True)
thread.start()
def main():
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("test", test))
print("Bot başlatılıyor...")
start_periodic_check(app)
app.run_polling()
if __name__ == "__main__":
main() Python telegram bot problemi
2
●228
- 15-07-2025, 23:55:47arkadaşlar merhaba, python ile telegram botu geliştiriyorum, malumunuz tesla araçların alımı satımı çok ciddi sıkıntılı, ben telegram apileri üstünden bi bot yapmaya çalışıyorum teslanın envanterine araç düştüğü an telegramdan bildirim atsın istiyorum ama kodlarımda sanırım ufak bi sıkıntım var. /test yazdığımda sunucum mesaj atıyor. ama tesla envantere girdiği an bildirim atmıyor. inceleyip dönüş yapabileceklere çok teşekkür ederim.
- 16-07-2025, 00:31:11
import json import os import requests import threading import time import asyncio from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup TOKEN = "BURAYA_BOT_TOKEN" # buraya kendi token’ını koy SUBSCRIBERS_FILE = "subscribers.json" LAST_VINS_FILE = "last_vins.json" API_URL = ( "https://www.tesla.com/inventory/api/v1/inventory-results?query=" '{"model":"my","options":{"trims":[],"condition":"new","markets":["TR"],' '"language":"tr","super_region":"EMEA"}}' ) # JSON dosya işlemleri def load_json_file(filepath, default): if os.path.exists(filepath): try: with open(filepath, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"Dosya okunamadı: {filepath} - {e}") return default def save_json_file(filepath, data): try: with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Dosya kaydedilemedi: {filepath} - {e}") # Abone işlemleri def load_subscribers(): return load_json_file(SUBSCRIBERS_FILE, []) def save_subscribers(subscribers): save_json_file(SUBSCRIBERS_FILE, subscribers) def load_last_vins(): return load_json_file(LAST_VINS_FILE, []) def save_last_vins(vins): save_json_file(LAST_VINS_FILE, vins) # Telegram komutları async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id subscribers = load_subscribers() if chat_id not in subscribers: subscribers.append(chat_id) save_subscribers(subscribers) await update.message.reply_text("✅ Takibe alındınız! Yeni Model Y araçları geldiğinde bildirim alacaksınız.") else: await update.message.reply_text("Zaten kayıtlısınız.") async def test(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("✅ Bot çalışıyor ve sana mesaj gönderebiliyor!") # Tesla API'den veri çekme def get_model_y_vehicles(): try: response = requests.get(API_URL, timeout=10) response.raise_for_status() data = response.json() vehicles = [] for item in data.get("results", []): vehicles.append({ 'price': item.get('Price', 'Bilinmiyor'), 'exterior': item.get('PAINT_DESCRIPTION', 'Bilinmiyor'), 'interior': item.get('INTERIOR_TRIM_TYPE', 'Bilinmiyor'), 'vin': item.get('VIN', 'Bilinmiyor'), 'link': f"https://www.tesla.com/tr_tr/inventory/new/my/{item.get('VIN', '')}" }) return vehicles except Exception as e: print(f"API çağrısı hatası: {e}") return [] # Bildirim gönderme async def notify_new_model_y(app): subscribers = load_subscribers() if not subscribers: return vehicles = get_model_y_vehicles() last_vins = load_last_vins() new_vins = [] for vehicle in vehicles: if vehicle['vin'] not in last_vins: text = ( "🚗 *Yeni Model Y Araç*nn" f"💰 Fiyat: {vehicle['price']}n" f"🎨 Dış Renk: {vehicle['exterior']}n" f"🛋 İç Renk: {vehicle['interior']}n" f"🔢 VIN: `{vehicle['vin']}`" ) button = InlineKeyboardButton("🚀 Satın Al", url=vehicle['link']) markup = InlineKeyboardMarkup([[button]]) for chat_id in subscribers: try: await app.bot.send_message(chat_id=chat_id, text=text, reply_markup=markup, parse_mode="Markdown") except Exception as e: print(f"Mesaj gönderilirken hata: chat_id={chat_id} - {e}") new_vins.append(vehicle['vin']) if new_vins: save_last_vins(last_vins + new_vins) # Periyodik kontrol def start_periodic_check(app): def run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def periodic(): while True: try: await notify_new_model_y(app) except Exception as e: print(f"Periyodik görev hatası: {e}") await asyncio.sleep(300) # 5 dakika loop.create_task(periodic()) loop.run_forever() thread = threading.Thread(target=run, daemon=True) thread.start() # Ana fonksiyon def main(): app = ApplicationBuilder().token(TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("test", test)) print("Bot başlatılıyor...") start_periodic_check(app) app.run_polling() if __name__ == "__main__": main()Şöyle dener misin hocam - 16-07-2025, 00:32:57tabi hocam yarın ki envanter yarın akşam 18:30'da açılacak geri dönüş yaparım test edip.fivefingerx adlı üyeden alıntı: mesajı görüntüle