mirror of
https://github.com/subutux/rmapy.git
synced 2025-12-08 14:43:24 +00:00
Fixed typechecking with mypy
This commit is contained in:
50
rmapi/api.py
50
rmapi/api.py
@@ -2,13 +2,17 @@ import requests
|
||||
from logging import getLogger
|
||||
from datetime import datetime
|
||||
import json
|
||||
from typing import TypeVar
|
||||
from typing import Union, Optional
|
||||
from uuid import uuid4
|
||||
from .collections import Collection
|
||||
from .config import load, dump
|
||||
from .document import Document, ZipDocument
|
||||
from .document import Document, ZipDocument, from_request_stream
|
||||
from .folder import Folder
|
||||
from .exceptions import AuthError, DocumentNotFound, ApiError
|
||||
from .exceptions import (
|
||||
AuthError,
|
||||
DocumentNotFound,
|
||||
ApiError,
|
||||
UnsupportedTypeError,)
|
||||
from .const import (RFC3339Nano,
|
||||
USER_AGENT,
|
||||
BASE_URL,
|
||||
@@ -18,7 +22,7 @@ from .const import (RFC3339Nano,
|
||||
|
||||
log = getLogger("rmapipy.rmapi")
|
||||
|
||||
DocOrFolder = TypeVar('DocumentOrFolder', Document, Folder)
|
||||
DocumentOrFolder = Union[Document, Folder]
|
||||
|
||||
|
||||
class Client(object):
|
||||
@@ -33,8 +37,8 @@ class Client(object):
|
||||
"""
|
||||
|
||||
token_set = {
|
||||
"devicetoken": None,
|
||||
"usertoken": None
|
||||
"devicetoken": "",
|
||||
"usertoken": ""
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
@@ -83,17 +87,19 @@ class Client(object):
|
||||
for k in headers.keys():
|
||||
_headers[k] = headers[k]
|
||||
log.debug(url, _headers)
|
||||
print(method, url, json.dumps(body))
|
||||
if method == "PUT":
|
||||
print(method, url, json.dumps(body))
|
||||
r = requests.request(method, url,
|
||||
json=body,
|
||||
data=data,
|
||||
headers=_headers,
|
||||
params=params,
|
||||
stream=stream)
|
||||
print(r.status_code, r.text)
|
||||
if method == "PUT":
|
||||
print(r.status_code, r.text)
|
||||
return r
|
||||
|
||||
def register_device(self, code: str) -> True:
|
||||
def register_device(self, code: str):
|
||||
"""Registers a device to on the Remarkable Cloud.
|
||||
|
||||
This uses a unique code the user gets from
|
||||
@@ -125,7 +131,7 @@ class Client(object):
|
||||
else:
|
||||
raise AuthError("Can't register device")
|
||||
|
||||
def renew_token(self) -> True:
|
||||
def renew_token(self):
|
||||
"""Fetches a new user_token.
|
||||
|
||||
This is the second step of the authentication of the Remarkable Cloud.
|
||||
@@ -184,7 +190,7 @@ class Client(object):
|
||||
|
||||
return collection
|
||||
|
||||
def get_doc(self, ID: str) -> DocOrFolder:
|
||||
def get_doc(self, ID: str) -> Optional[DocumentOrFolder]:
|
||||
"""Get a meta item by ID
|
||||
|
||||
Fetch a meta item from the Remarkable Cloud by ID.
|
||||
@@ -215,6 +221,7 @@ class Client(object):
|
||||
return Document(**data_response[0])
|
||||
else:
|
||||
raise DocumentNotFound(f"Cound not find document {ID}")
|
||||
return None
|
||||
|
||||
def download(self, document: Document) -> ZipDocument:
|
||||
"""Download a ZipDocument
|
||||
@@ -232,13 +239,18 @@ class Client(object):
|
||||
"""
|
||||
|
||||
if not document.BlobURLGet:
|
||||
document = self.get_doc(document.ID)
|
||||
|
||||
doc = self.get_doc(document.ID)
|
||||
if isinstance(doc, Document):
|
||||
document = doc
|
||||
else:
|
||||
raise UnsupportedTypeError(
|
||||
"We expected a document, got {type}"
|
||||
.format(type=type(doc)))
|
||||
log.debug("BLOB", document.BlobURLGet)
|
||||
r = self.request("GET", document.BlobURLGet, stream=True)
|
||||
return ZipDocument.from_request_stream(document.ID, r)
|
||||
return from_request_stream(document.ID, r)
|
||||
|
||||
def upload(self, zipDoc: ZipDocument, document: Document) -> True:
|
||||
def upload(self, zipDoc: ZipDocument, document: Document):
|
||||
"""Upload a document to the cloud.
|
||||
|
||||
Add a new document to the Remarkable Cloud.
|
||||
@@ -253,7 +265,7 @@ class Client(object):
|
||||
|
||||
return True
|
||||
|
||||
def update_metadata(self, docorfolder: DocOrFolder) -> True:
|
||||
def update_metadata(self, docorfolder: DocumentOrFolder):
|
||||
"""Send an update of the current metadata of a meta object
|
||||
|
||||
Update the meta item.
|
||||
@@ -272,7 +284,7 @@ class Client(object):
|
||||
|
||||
return self.check_reponse(res)
|
||||
|
||||
def get_current_version(self, docorfolder: DocOrFolder) -> int:
|
||||
def get_current_version(self, docorfolder: DocumentOrFolder) -> int:
|
||||
"""Get the latest version info from a Document or Folder
|
||||
|
||||
This fetches the latest meta information from the Remarkable Cloud
|
||||
@@ -295,7 +307,7 @@ class Client(object):
|
||||
return 0
|
||||
return int(d.Version)
|
||||
|
||||
def create_folder(self, folder: Folder) -> True:
|
||||
def create_folder(self, folder: Folder):
|
||||
"""Create a new folder meta object.
|
||||
|
||||
This needs to be done in 3 steps:
|
||||
@@ -330,7 +342,7 @@ class Client(object):
|
||||
self.update_metadata(folder)
|
||||
return True
|
||||
|
||||
def check_reponse(self, response: requests.Response) -> True:
|
||||
def check_reponse(self, response: requests.Response):
|
||||
"""Check the response from an API Call
|
||||
|
||||
Does some sanity checking on the Response
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
from .document import Document
|
||||
from .folder import Folder
|
||||
from typing import NoReturn, TypeVar, List
|
||||
from typing import NoReturn, List, Union
|
||||
from .exceptions import FolderNotFound
|
||||
|
||||
DocOrFolder = TypeVar('DocumentOrFolder', Document, Folder)
|
||||
DocumentOrFolder = Union[Document, Folder]
|
||||
|
||||
|
||||
class Collection(object):
|
||||
@@ -15,13 +15,13 @@ class Collection(object):
|
||||
items: A list containing the items.
|
||||
"""
|
||||
|
||||
items = []
|
||||
items: List[DocumentOrFolder] = []
|
||||
|
||||
def __init__(self, *items):
|
||||
for i in items:
|
||||
self.items.append(i)
|
||||
|
||||
def add(self, docdict: dict) -> NoReturn:
|
||||
def add(self, docdict: dict) -> None:
|
||||
"""Add an item to the collection.
|
||||
It wraps it in the correct class based on the Type parameter of the
|
||||
dict.
|
||||
@@ -31,14 +31,14 @@ class Collection(object):
|
||||
"""
|
||||
|
||||
if docdict.get("Type", None) == "DocumentType":
|
||||
return self.add_document(docdict)
|
||||
self.add_document(docdict)
|
||||
elif docdict.get("Type", None) == "CollectionType":
|
||||
return self.add_folder(docdict)
|
||||
self.add_folder(docdict)
|
||||
else:
|
||||
raise TypeError("Unsupported type: {_type}"
|
||||
.format(_type=docdict.get("Type", None)))
|
||||
|
||||
def add_document(self, docdict: dict) -> NoReturn:
|
||||
def add_document(self, docdict: dict) -> None:
|
||||
"""Add a document to the collection
|
||||
|
||||
Args:
|
||||
@@ -47,7 +47,7 @@ class Collection(object):
|
||||
|
||||
self.items.append(Document(**docdict))
|
||||
|
||||
def add_folder(self, dirdict: dict) -> NoReturn:
|
||||
def add_folder(self, dirdict: dict) -> None:
|
||||
"""Add a document to the collection
|
||||
|
||||
Args:
|
||||
@@ -56,7 +56,7 @@ class Collection(object):
|
||||
|
||||
self.items.append(Folder(**dirdict))
|
||||
|
||||
def parent(self, docorfolder: DocOrFolder) -> Folder:
|
||||
def parent(self, docorfolder: DocumentOrFolder) -> Folder:
|
||||
"""Returns the paren of a Document or Folder
|
||||
|
||||
Args:
|
||||
@@ -67,12 +67,12 @@ class Collection(object):
|
||||
"""
|
||||
|
||||
results = [i for i in self.items if i.ID == docorfolder.ID]
|
||||
if len(results) > 0:
|
||||
if len(results) > 0 and isinstance(results[0], Folder):
|
||||
return results[0]
|
||||
else:
|
||||
raise FolderNotFound("Could not found the parent of the document.")
|
||||
|
||||
def children(self, folder: Folder = None) -> List[DocOrFolder]:
|
||||
def children(self, folder: Folder = None) -> List[DocumentOrFolder]:
|
||||
"""Get all the childern from a folder
|
||||
|
||||
Args:
|
||||
@@ -90,5 +90,5 @@ class Collection(object):
|
||||
def __len__(self) -> int:
|
||||
return len(self.items)
|
||||
|
||||
def __getitem__(self, position: int) -> DocOrFolder:
|
||||
def __getitem__(self, position: int) -> DocumentOrFolder:
|
||||
return self.items[position]
|
||||
|
||||
@@ -1,21 +1,23 @@
|
||||
from pathlib import Path
|
||||
from yaml import BaseLoader
|
||||
from yaml import load as yml_load
|
||||
from yaml import dump as yml_dump
|
||||
from typing import Dict
|
||||
|
||||
|
||||
def load() -> dict:
|
||||
"""Load the .rmapi config file"""
|
||||
|
||||
config_file_path = Path.joinpath(Path.home(), ".rmapi")
|
||||
config = {}
|
||||
config: Dict[str, str] = {}
|
||||
if Path.exists(config_file_path):
|
||||
with open(config_file_path, 'r') as config_file:
|
||||
config = dict(yml_load(config_file.read()))
|
||||
config = dict(yml_load(config_file.read(), Loader=BaseLoader))
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def dump(config: dict) -> True:
|
||||
def dump(config: dict) -> None:
|
||||
"""Dump config to the .rmapi config file
|
||||
|
||||
Args:
|
||||
@@ -28,6 +30,4 @@ def dump(config: dict) -> True:
|
||||
with open(config_file_path, 'w') as config_file:
|
||||
config_file.write(yml_dump(config))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
@@ -3,13 +3,44 @@ from zipfile import ZipFile, ZIP_DEFLATED
|
||||
import shutil
|
||||
from uuid import uuid4
|
||||
import json
|
||||
from typing import NoReturn, TypeVar
|
||||
from typing import NoReturn, TypeVar, List
|
||||
from requests import Response
|
||||
from .meta import Meta
|
||||
|
||||
BytesOrString = TypeVar("BytesOrString", BytesIO, str)
|
||||
|
||||
|
||||
class Document(object):
|
||||
class RmPage(object):
|
||||
"""A Remarkable Page
|
||||
|
||||
Contains the metadata, the page itself & thumbnail.
|
||||
|
||||
"""
|
||||
def __init__(self, page, metadata=None, order=0, thumbnail=None, ID=None):
|
||||
self.page = page
|
||||
if metadata:
|
||||
self.metadata = metadata
|
||||
else:
|
||||
self.metadata = {"layers": [{"name": "Layer 1"}]}
|
||||
|
||||
self.order = order
|
||||
if thumbnail:
|
||||
self.thumbnail = thumbnail
|
||||
if ID:
|
||||
self.ID = ID
|
||||
else:
|
||||
self.ID = str(uuid4())
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation of this object"""
|
||||
return f"<rmapi.document.RmPage {self.order} for {self.ID}>"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""String representation of this object"""
|
||||
return self.__str__()
|
||||
|
||||
|
||||
class Document(Meta):
|
||||
""" Document represents a real object expected in most
|
||||
calls by the remarkable API
|
||||
|
||||
@@ -34,51 +65,9 @@ class Document(object):
|
||||
|
||||
"""
|
||||
|
||||
ID = ""
|
||||
Version = 0
|
||||
Message = ""
|
||||
Succes = True
|
||||
BlobURLGet = ""
|
||||
BlobURLGetExpires = ""
|
||||
BlobURLPut = ""
|
||||
BlobURLPutExpires = ""
|
||||
ModifiedClient = ""
|
||||
Type = "DocumentType"
|
||||
VissibleName = ""
|
||||
CurrentPage = 1
|
||||
Bookmarked = False
|
||||
Parent = ""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kkeys = self.to_dict().keys()
|
||||
for k in kkeys:
|
||||
setattr(self, k, kwargs.get(k, getattr(self, k)))
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return a dict representation of this object.
|
||||
|
||||
Used for API Calls.
|
||||
|
||||
Returns
|
||||
a dict of the current object.
|
||||
"""
|
||||
|
||||
return {
|
||||
"ID": self.ID,
|
||||
"Version": self.Version,
|
||||
"Message": self.Message,
|
||||
"Succes": self.Succes,
|
||||
"BlobURLGet": self.BlobURLGet,
|
||||
"BlobURLGetExpires": self.BlobURLGetExpires,
|
||||
"BlobURLPut": self.BlobURLPut,
|
||||
"BlobURLPutExpires": self.BlobURLPutExpires,
|
||||
"ModifiedClient": self.ModifiedClient,
|
||||
"Type": self.Type,
|
||||
"VissibleName": self.VissibleName,
|
||||
"CurrentPage": self.CurrentPage,
|
||||
"Bookmarked": self.Bookmarked,
|
||||
"Parent": self.Parent
|
||||
}
|
||||
super(Document, self).__init__(**kwargs)
|
||||
self.Type = "DocumentType"
|
||||
|
||||
def __str__(self):
|
||||
"""String representation of this object"""
|
||||
@@ -177,7 +166,7 @@ class ZipDocument(object):
|
||||
zipfile = BytesIO()
|
||||
pdf = None
|
||||
epub = None
|
||||
rm = []
|
||||
rm: List[RmPage] = []
|
||||
ID = None
|
||||
|
||||
def __init__(self, ID=None, doc=None, file=None):
|
||||
@@ -220,7 +209,7 @@ class ZipDocument(object):
|
||||
"""string representation of this class"""
|
||||
return self.__str__()
|
||||
|
||||
def dump(self, file: str) -> NoReturn:
|
||||
def dump(self, file: str) -> None:
|
||||
"""Dump the contents of ZipDocument back to a zip file.
|
||||
|
||||
This builds a zipfile to upload back to the Remarkable Cloud.
|
||||
@@ -236,7 +225,7 @@ class ZipDocument(object):
|
||||
json.dumps(self.content))
|
||||
if self.pagedata:
|
||||
zf.writestr(f"{self.ID}.pagedata",
|
||||
self.pagedata.read())
|
||||
self.pagedata)
|
||||
|
||||
if self.pdf:
|
||||
zf.writestr(f"{self.ID}.pdf",
|
||||
@@ -257,7 +246,7 @@ class ZipDocument(object):
|
||||
zf.writestr(f"{self.ID}.thumbnails/{page.order}.jpg",
|
||||
page.thumbnail.read())
|
||||
|
||||
def load(self, file: BytesOrString) -> NoReturn:
|
||||
def load(self, file: BytesOrString) -> None:
|
||||
"""Load a zipfile into this class.
|
||||
|
||||
Extracts the zipfile and reads in the contents.
|
||||
@@ -287,29 +276,29 @@ class ZipDocument(object):
|
||||
pass
|
||||
try:
|
||||
with zf.open(f"{self.ID}.pagedata", 'r') as pagedata:
|
||||
self.pagedata = BytesIO(pagedata.read())
|
||||
self.pagedata = str(pagedata.read())
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
with zf.open(f"{self.ID}.pdf", 'r') as pdf:
|
||||
with zf.open(f"{self.ID}.pdf", 'rb') as pdf:
|
||||
self.pdf = BytesIO(pdf.read())
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
with zf.open(f"{self.ID}.epub", 'r') as epub:
|
||||
with zf.open(f"{self.ID}.epub", 'rb') as epub:
|
||||
self.epub = BytesIO(epub.read())
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Get the RM pages
|
||||
|
||||
content = [x for x in zf.namelist()
|
||||
if x.startswith(f"{self.ID}/") and x.endswith('.rm')]
|
||||
for p in content:
|
||||
pagenumber = p.replace(f"{self.ID}/", "").replace(".rm", "")
|
||||
pagenumber = int(pagenumber)
|
||||
pages = [x for x in zf.namelist()
|
||||
if x.startswith(f"{self.ID}/") and x.endswith('.rm')]
|
||||
for p in pages:
|
||||
pagenumber = int(p.replace(f"{self.ID}/", "")
|
||||
.replace(".rm", ""))
|
||||
page = BytesIO()
|
||||
thumbnail = BytesIO()
|
||||
with zf.open(p, 'r') as rm:
|
||||
@@ -329,36 +318,6 @@ class ZipDocument(object):
|
||||
self.zipfile.seek(0)
|
||||
|
||||
|
||||
class RmPage(object):
|
||||
"""A Remarkable Page
|
||||
|
||||
Contains the metadata, the page itself & thumbnail.
|
||||
|
||||
"""
|
||||
def __init__(self, page, metadata=None, order=0, thumbnail=None, ID=None):
|
||||
self.page = page
|
||||
if metadata:
|
||||
self.metadata = metadata
|
||||
else:
|
||||
self.metadata = {"layers": [{"name": "Layer 1"}]}
|
||||
|
||||
self.order = order
|
||||
if thumbnail:
|
||||
self.thumbnail = thumbnail
|
||||
if ID:
|
||||
self.ID = ID
|
||||
else:
|
||||
self.ID = str(uuid4())
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation of this object"""
|
||||
return f"<rmapi.document.RmPage {self.order} for {self.ID}>"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""String representation of this object"""
|
||||
return self.__str__()
|
||||
|
||||
|
||||
def from_zip(ID: str, file: str) -> ZipDocument:
|
||||
"""Return A ZipDocument from a zipfile.
|
||||
|
||||
|
||||
@@ -10,6 +10,12 @@ class DocumentNotFound(Exception):
|
||||
super(DocumentNotFound, self).__init__(msg)
|
||||
|
||||
|
||||
class UnsupportedTypeError(Exception):
|
||||
"""Not the expected type"""
|
||||
def __init__(self, msg):
|
||||
super(UnsupportedTypeError, self).__init__(msg)
|
||||
|
||||
|
||||
class FolderNotFound(Exception):
|
||||
"""Could not found a requested folder"""
|
||||
def __init__(self, msg):
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from .document import Document
|
||||
from .meta import Meta
|
||||
from datetime import datetime
|
||||
from uuid import uuid4
|
||||
from io import BytesIO
|
||||
from zipfile import ZipFile, ZIP_DEFLATED
|
||||
from .const import RFC3339Nano
|
||||
|
||||
from typing import Tuple, Optional
|
||||
|
||||
class ZipFolder(object):
|
||||
"""A dummy zipfile to create a folder
|
||||
@@ -27,12 +27,12 @@ class ZipFolder(object):
|
||||
self.file.seek(0)
|
||||
|
||||
|
||||
class Folder(Document):
|
||||
class Folder(Meta):
|
||||
"""
|
||||
A Meta type of object used to represent a folder.
|
||||
"""
|
||||
|
||||
def __init__(self, name=None, **kwargs):
|
||||
def __init__(self, name: Optional[str] = None, **kwargs) -> None:
|
||||
"""Create a Folder instance
|
||||
|
||||
Args:
|
||||
@@ -47,7 +47,7 @@ class Folder(Document):
|
||||
if not self.ID:
|
||||
self.ID = str(uuid4())
|
||||
|
||||
def create_request(self) -> (ZipFolder, dict):
|
||||
def create_request(self) -> Tuple[BytesIO, dict]:
|
||||
"""Prepares the nessesary parameters to create this folder.
|
||||
|
||||
This creates a ZipFolder & the nessesary json body to
|
||||
|
||||
Reference in New Issue
Block a user