Skip to content

Commit

Permalink
feat: add helper methods for (almost?) all actions. (#101)
Browse files Browse the repository at this point in the history
Closes #23, closes #26
  • Loading branch information
stanvanrooy authored Nov 16, 2020
1 parent c3b01d1 commit 1c5bb94
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 0 deletions.
15 changes: 15 additions & 0 deletions instauto/helpers/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json

from requests import Response


def _is_resp_ok(resp: Response) -> bool:
if not resp.ok:
return False
if not resp.content:
return False
try:
d = resp.json()
except json.JSONDecodeError:
return False
return d['status'] == 'ok'
112 changes: 112 additions & 0 deletions instauto/helpers/post.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from typing import List

from instauto.api.client import ApiClient
from instauto.api.actions import post as ps

from .common import _is_resp_ok
from .search import get_user_id_from_username

import logging
logger = logging.getLogger(__name__)


def upload_image_to_feed(client: ApiClient, image_path: str, caption: str = None, location: ps.Location = None) -> bool:
post = ps.PostFeed(
path=image_path,
caption=caption or '',
location=location,
)
resp = client.post_post(post, 80).ok
return _is_resp_ok(resp)


def upload_image_to_story(client: ApiClient, image_path: str) -> bool:
post = ps.PostStory(
path=image_path
)
resp = client.post_post(post)
return _is_resp_ok(resp)


def update_caption(client: ApiClient, media_id: str, new_caption: str) -> bool:
caption = ps.UpdateCaption(
media_id=media_id,
caption_text=new_caption
)
resp = client.post_update_caption(caption)
return _is_resp_ok(resp)


def unlike_post(client: ApiClient, media_id: str) -> bool:
like = ps.Unlike(
media_id=media_id
)
resp = client.post_unlike(like)
return _is_resp_ok(resp)


def save_post(client: ApiClient, media_id: str) -> bool:
save = ps.Save(
media_id=media_id
)
resp = client.post_save(save)
return _is_resp_ok(resp)


def retrieve_posts_from_user(client: ApiClient, limit: int, username: str = None, user_id: str = None) -> List[dict]:
if username is None and user_id is None:
raise ValueError("Either `username` or `user_id` param need to be provider")

if username is not None and user_id is None:
user_id = get_user_id_from_username(client, username)
else:
logger.warning("user_id is always being used.")

obj = ps.RetrieveByUser(
user_id=user_id
)
obj, result = client.post_retrieve_by_user(obj)
retrieved_items = []

while result and len(retrieved_items) < limit:
retrieved_items.extend(result)
obj, result = client.post_retrieve_by_user(obj)
return retrieved_items[:limit:]


def retrieve_posts_from_tag(client: ApiClient, tag: str, limit: int) -> List[dict]:
obj = ps.RetrieveByTag(
tag_name=tag
)
obj, result = client.post_retrieve_by_tag(obj)
retrieved_items = []

while result and len(retrieved_items) < limit:
retrieved_items.extend(result)
obj, result = client.post_retrieve_by_tag(obj)
return retrieved_items[:limit:]


def get_likers_of_post(client: ApiClient, media_id: str) -> List[dict]:
return client.post_get_likers(media_id)


def get_commenters_of_post(client: ApiClient, media_id: str) -> List[dict]:
return client.post_get_commenters(media_id)


def like_post(client: ApiClient, media_id: str) -> bool:
like = ps.Like(
media_id=media_id
)
resp = client.post_like(like)
return _is_resp_ok(resp)


def comment_post(client: ApiClient, media_id: str, comment: str) -> bool:
comment = ps.Comment(
media_id=media_id,
comment_text=comment
)
resp = client.post_comment(comment)
return _is_resp_ok(resp)
30 changes: 30 additions & 0 deletions instauto/helpers/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import List

from instauto.api.client import ApiClient
from instauto.api.actions import search as se


def search_username(client: ApiClient, username, count: int) -> List[dict]:
username = se.Username(
q=username,
count=count
)
resp = client.search_username(username)
return resp.json()['users']


def get_user_by_username(client: ApiClient, username: str) -> dict:
users = search_username(client, username, 1)
if users[0]['username'] == username:
return users[0]


def get_user_id_from_username(client: ApiClient, username: str):
user = get_user_by_username(client, username)
return user['pk']


def search_tags(client: ApiClient, tag: str, limit: int) -> List[dict]:
s = se.Tag(tag, limit)
resp: dict = client.search_tag(s).json()
return resp['results']

0 comments on commit 1c5bb94

Please sign in to comment.