You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
6.7 KiB

# -*- coding: utf-8 -*-
import re
import string
import requests
import ast
import os
import random
import twitter
import json
import time
from datetime import datetime
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from stopwords import stop_words
from companies import all_companies
def find_companies(word, dictionary):
'''
search dictionary of company names for a match with word
returns array of company names
'''
search_ex = "(^|\s)" + word + "(?=,|\s)"
results = []
for company in dictionary:
if re.search(search_ex, company['Name'], re.IGNORECASE) is not None:
results.append(company)
return results
def find_all_companies(text):
'''
Returns an array of matched companies for each word in the text
[[word, [company, ..., company]],[word, [company, ..., company]]]
'''
for c in string.punctuation:
text = text.replace(c,"")
text = " ".join(text.split())
words = text.split(" ")
results = []
for word in words:
# don't process digits
if word.isdigit() == False:
if word not in stop_words and word.lower() not in stop_words and word.title() not in stop_words:
companies = find_companies(word, all_companies)
if companies:
results.append([word, companies])
return results
def test_sentiment(text):
'''
returns sentiment analysis label: pos, neg or neutral
using http://text-processing.com/docs/sentiment.html
'''
r = requests.post("http://text-processing.com/api/sentiment/", data = {"text": text})
sentiment = ast.literal_eval(r.text)
pos_prob = sentiment['probability']['pos']
neg_prob = sentiment['probability']['neg']
neutral_prob = sentiment['probability']['neutral']
if neutral_prob > 0.9:
label = "neutral"
elif pos_prob > neg_prob:
label = "pos"
elif neg_prob > pos_prob:
label = "neg"
return label
def exclamation(n):
''' Returns a string of n random ?s and !s'''
exclamation = ""
for i in range(n):
exclamation += random.choice("?!")
return exclamation
def log_tip(spreadsheet, tip, symbol, name):
list_of_lists = spreadsheet.get_all_values()
next_row_nb = len(list_of_lists)+1
time_stamp = str(datetime.now())
row_values = [time_stamp, tip, symbol, name]
for value in row_values:
row_cell = row_values.index(value) + 1
spreadsheet.update_cell(next_row_nb, row_cell, value)
def tweet_to_tip(tweet, spreadsheet):
'''
Returns a stock market tip from an input tweet (if any matches are found)
logs the tip to a google spreadsheet using
'''
names = ["He", "Trump", "POTUS"]
name = random.choice(names)
hits = find_all_companies(tweet)
if hits:
chosen_hit = random.choice(hits)
sentiment = test_sentiment(tweet)
word = chosen_hit[0].strip()
stocks = chosen_hit[1]
random.shuffle(stocks)
Symbol = "$" + stocks[0]['Symbol']
Name = stocks[0]['Name']
if sentiment == 'pos':
excl = exclamation(3)
intro = name + " loves " + "\"" + word + "\""
tip = "BUY "
stock_tip = tip + Symbol + " ↗️ " + Name
elif sentiment == 'neg':
excl = exclamation(3)
intro = name + " hates " + "\"" + word + "\""
tip = "SELL "
stock_tip = tip + Symbol + " ↘️ " + Name
elif sentiment == 'neutral':
excl = exclamation(1)
intro = name + " says " + "\"" + word + "\""
tip = "WATCH "
stock_tip = tip + Symbol + " 🔎 " + Name
log_tip(spreadsheet, tip, Symbol, Name)
tip_tweet = intro + excl + "\n" + "TIP: " + stock_tip
return tip_tweet
else:
print "no hits"
pass
def twitter_api_auth(is_prod):
''' authenticate twitter api '''
if is_prod:
api = twitter.Api(consumer_key=os.environ.get('consumer_key'), consumer_secret=os.environ.get('consumer_secret'), access_token_key=os.environ.get('access_token_key'), access_token_secret=os.environ.get('access_token_secret'))
else:
from twitter_creds import twitter_creds
api = twitter.Api(consumer_key=twitter_creds['consumer_key'], consumer_secret=twitter_creds['consumer_secret'], access_token_key=twitter_creds['access_token_key'], access_token_secret=twitter_creds['access_token_secret'])
return api
def gsheets_api_keys(is_prod):
''' Returns credentails for Google API Oauth '''
scope = ['https://spreadsheets.google.com/feeds']
if is_prod:
with open('gsheets_creds.json', 'w') as file:
json.dump(ast.literal_eval(os.environ.get('gsheets_creds')), file)
gsheets_credentials = ServiceAccountCredentials.from_json_keyfile_name('gsheets_creds.json', scope)
os.remove('gsheets_creds.json')
else:
gsheets_credentials = ServiceAccountCredentials.from_json_keyfile_name('trump-bot-0b1e240571d7.json', scope)
return gsheets_credentials
def main():
import sys;
reload(sys);
sys.setdefaultencoding("utf8")
is_prod = os.environ.get('IS_HEROKU', None)
twitter_api = twitter_api_auth(is_prod)
gsheets_credentials = gsheets_api_keys(is_prod)
timer = 60*10
while True:
print "--- start ---"
# auth runs out after a while and throws error
gsheets_api = gspread.authorize(gsheets_credentials)
id_worksheet = gsheets_api.open("trump-bot").sheet1
logs_worksheet = gsheets_api.open("trump-bot").worksheet('logs')
last_id = int(id_worksheet.acell('A1').value)
print "last_id:", last_id
tweets = twitter_api.GetUserTimeline(screen_name='realdonaldtrump', since_id=last_id)
tweet_queue = []
if tweets:
for tweet in tweets:
parsed_tweet = json.loads(str(tweet))
tweet_queue.append([parsed_tweet['id'], parsed_tweet['text']])
if tweet_queue:
# reversed list to process most recent tweet last and prevent last_id mess
for tweet in reversed(tweet_queue):
tweet_id = tweet[0]
tweet_text = tweet[1]
print "new tweet:", tweet_text, tweet_id
try:
top_tip = tweet_to_tip(tweet_text, logs_worksheet) #+ "\n" + tweet_link
print "tweeting: " + top_tip
status = twitter_api.PostUpdate(top_tip)
except:
pass
id_worksheet.update_acell('A1', str(tweet_id))
print "last_id updated to: ", tweet_id, "\n"
print "--- done --- \n"
time.sleep(timer)
if __name__ == '__main__':
main()