You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
6.7 KiB
203 lines
6.7 KiB
# -*- coding: utf-8 -*-
|
|
|
|
import re
|
|
import string
|
|
import requests
|
|
import ast
|
|
import os
|
|
import random
|
|
import twitter
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
import gspread
|
|
from oauth2client.service_account import ServiceAccountCredentials
|
|
from stopwords import stop_words
|
|
from companies import all_companies
|
|
|
|
|
|
def find_companies(word, dictionary):
|
|
'''
|
|
search dictionary of company names for a match with word
|
|
returns array of company names
|
|
'''
|
|
search_ex = "(^|\s)" + word + "(?=,|\s)"
|
|
results = []
|
|
for company in dictionary:
|
|
if re.search(search_ex, company['Name'], re.IGNORECASE) is not None:
|
|
results.append(company)
|
|
return results
|
|
|
|
def find_all_companies(text):
|
|
'''
|
|
Returns an array of matched companies for each word in the text
|
|
[[word, [company, ..., company]],[word, [company, ..., company]]]
|
|
'''
|
|
for c in string.punctuation:
|
|
text = text.replace(c,"")
|
|
text = " ".join(text.split())
|
|
words = text.split(" ")
|
|
results = []
|
|
for word in words:
|
|
# don't process digits
|
|
if word.isdigit() == False:
|
|
if word not in stop_words and word.lower() not in stop_words and word.title() not in stop_words:
|
|
companies = find_companies(word, all_companies)
|
|
if companies:
|
|
results.append([word, companies])
|
|
return results
|
|
|
|
def test_sentiment(text):
|
|
'''
|
|
returns sentiment analysis label: pos, neg or neutral
|
|
using http://text-processing.com/docs/sentiment.html
|
|
'''
|
|
r = requests.post("http://text-processing.com/api/sentiment/", data = {"text": text})
|
|
sentiment = ast.literal_eval(r.text)
|
|
|
|
pos_prob = sentiment['probability']['pos']
|
|
neg_prob = sentiment['probability']['neg']
|
|
neutral_prob = sentiment['probability']['neutral']
|
|
if neutral_prob > 0.9:
|
|
label = "neutral"
|
|
elif pos_prob > neg_prob:
|
|
label = "pos"
|
|
elif neg_prob > pos_prob:
|
|
label = "neg"
|
|
return label
|
|
|
|
def exclamation(n):
|
|
''' Returns a string of n random ?s and !s'''
|
|
exclamation = ""
|
|
for i in range(n):
|
|
exclamation += random.choice("?!")
|
|
return exclamation
|
|
|
|
def log_tip(spreadsheet, tip, symbol, name):
|
|
list_of_lists = spreadsheet.get_all_values()
|
|
next_row_nb = len(list_of_lists)+1
|
|
time_stamp = str(datetime.now())
|
|
|
|
row_values = [time_stamp, tip, symbol, name]
|
|
|
|
for value in row_values:
|
|
row_cell = row_values.index(value) + 1
|
|
spreadsheet.update_cell(next_row_nb, row_cell, value)
|
|
|
|
def tweet_to_tip(tweet, spreadsheet):
|
|
'''
|
|
Returns a stock market tip from an input tweet (if any matches are found)
|
|
logs the tip to a google spreadsheet using
|
|
'''
|
|
names = ["He", "Trump", "POTUS"]
|
|
name = random.choice(names)
|
|
hits = find_all_companies(tweet)
|
|
if hits:
|
|
chosen_hit = random.choice(hits)
|
|
sentiment = test_sentiment(tweet)
|
|
|
|
word = chosen_hit[0].strip()
|
|
stocks = chosen_hit[1]
|
|
random.shuffle(stocks)
|
|
Symbol = "$" + stocks[0]['Symbol']
|
|
Name = stocks[0]['Name']
|
|
if sentiment == 'pos':
|
|
excl = exclamation(3)
|
|
intro = name + " loves " + "\"" + word + "\""
|
|
tip = "BUY "
|
|
stock_tip = tip + Symbol + " ↗️ " + Name
|
|
elif sentiment == 'neg':
|
|
excl = exclamation(3)
|
|
intro = name + " hates " + "\"" + word + "\""
|
|
tip = "SELL "
|
|
stock_tip = tip + Symbol + " ↘️ " + Name
|
|
elif sentiment == 'neutral':
|
|
excl = exclamation(1)
|
|
intro = name + " says " + "\"" + word + "\""
|
|
tip = "WATCH "
|
|
stock_tip = tip + Symbol + " 🔎 " + Name
|
|
log_tip(spreadsheet, tip, Symbol, Name)
|
|
tip_tweet = intro + excl + "\n" + "TIP: " + stock_tip
|
|
return tip_tweet
|
|
else:
|
|
print "no hits"
|
|
pass
|
|
|
|
def twitter_api_auth(is_prod):
|
|
''' authenticate twitter api '''
|
|
|
|
if is_prod:
|
|
api = twitter.Api(consumer_key=os.environ.get('consumer_key'), consumer_secret=os.environ.get('consumer_secret'), access_token_key=os.environ.get('access_token_key'), access_token_secret=os.environ.get('access_token_secret'))
|
|
else:
|
|
from twitter_creds import twitter_creds
|
|
api = twitter.Api(consumer_key=twitter_creds['consumer_key'], consumer_secret=twitter_creds['consumer_secret'], access_token_key=twitter_creds['access_token_key'], access_token_secret=twitter_creds['access_token_secret'])
|
|
|
|
return api
|
|
|
|
def gsheets_api_keys(is_prod):
|
|
''' Returns credentails for Google API Oauth '''
|
|
|
|
scope = ['https://spreadsheets.google.com/feeds']
|
|
|
|
if is_prod:
|
|
with open('gsheets_creds.json', 'w') as file:
|
|
json.dump(ast.literal_eval(os.environ.get('gsheets_creds')), file)
|
|
gsheets_credentials = ServiceAccountCredentials.from_json_keyfile_name('gsheets_creds.json', scope)
|
|
os.remove('gsheets_creds.json')
|
|
|
|
else:
|
|
gsheets_credentials = ServiceAccountCredentials.from_json_keyfile_name('trump-bot-0b1e240571d7.json', scope)
|
|
|
|
return gsheets_credentials
|
|
|
|
def main():
|
|
import sys;
|
|
reload(sys);
|
|
sys.setdefaultencoding("utf8")
|
|
|
|
is_prod = os.environ.get('IS_HEROKU', None)
|
|
twitter_api = twitter_api_auth(is_prod)
|
|
gsheets_credentials = gsheets_api_keys(is_prod)
|
|
|
|
timer = 60*10
|
|
|
|
while True:
|
|
print "--- start ---"
|
|
|
|
# auth runs out after a while and throws error
|
|
gsheets_api = gspread.authorize(gsheets_credentials)
|
|
id_worksheet = gsheets_api.open("trump-bot").sheet1
|
|
logs_worksheet = gsheets_api.open("trump-bot").worksheet('logs')
|
|
|
|
last_id = int(id_worksheet.acell('A1').value)
|
|
print "last_id:", last_id
|
|
|
|
tweets = twitter_api.GetUserTimeline(screen_name='realdonaldtrump', since_id=last_id)
|
|
|
|
tweet_queue = []
|
|
if tweets:
|
|
for tweet in tweets:
|
|
parsed_tweet = json.loads(str(tweet))
|
|
tweet_queue.append([parsed_tweet['id'], parsed_tweet['text']])
|
|
|
|
if tweet_queue:
|
|
# reversed list to process most recent tweet last and prevent last_id mess
|
|
for tweet in reversed(tweet_queue):
|
|
tweet_id = tweet[0]
|
|
tweet_text = tweet[1]
|
|
print "new tweet:", tweet_text, tweet_id
|
|
try:
|
|
top_tip = tweet_to_tip(tweet_text, logs_worksheet) #+ "\n" + tweet_link
|
|
print "tweeting: " + top_tip
|
|
status = twitter_api.PostUpdate(top_tip)
|
|
except:
|
|
pass
|
|
|
|
id_worksheet.update_acell('A1', str(tweet_id))
|
|
print "last_id updated to: ", tweet_id, "\n"
|
|
|
|
print "--- done --- \n"
|
|
time.sleep(timer)
|
|
|
|
if __name__ == '__main__':
|
|
main() |