backend/app/utils/blockchain.py

151 lines
3.9 KiB
Python
Raw Permalink Normal View History

2022-10-08 12:38:54 +03:00
from pydantic import BaseModel
import requests as r
2022-10-08 14:39:52 +03:00
import json
from time import sleep
from typing import List
2022-10-08 12:38:54 +03:00
2022-10-08 19:57:46 +03:00
URL = "https://hackathon.lsp.team/hk"
2022-10-08 12:38:54 +03:00
2022-10-08 19:57:46 +03:00
base_headers = {"Content-Type": "application/json", "Accept": "application/json"}
2022-10-08 12:38:54 +03:00
2022-10-08 14:39:52 +03:00
class Wallet(BaseModel):
2022-10-08 12:38:54 +03:00
publicKey: str
privateKey: str
2022-10-08 14:39:52 +03:00
class WalletBalance(BaseModel):
matic: float
coins: float
class TransHash(BaseModel):
transaction_hash: str
class NftInfo(BaseModel):
uri: str
tokens: List[int]
class TransactionHistory(BaseModel):
hash: str
timestamp: int
token_name: str
from_wallet: str
to_wallet: str
def create_wallet() -> Wallet:
2022-10-08 19:57:46 +03:00
response = r.post(URL + "/v1/wallets/new")
2022-10-08 12:38:54 +03:00
data = response.json()
2022-10-08 19:57:46 +03:00
return Wallet(publicKey=data["publicKey"], privateKey=data["privateKey"])
2022-10-08 14:39:52 +03:00
def check_transaction(trans_hash: str) -> str:
2022-10-08 19:57:46 +03:00
res = r.get(
URL + "/v1/transfers/status/" + trans_hash,
data=json.dumps({"transactionHash": trans_hash}),
headers=base_headers,
2022-10-08 14:39:52 +03:00
)
2022-10-08 19:57:46 +03:00
return res.json()["status"]
2022-10-08 14:39:52 +03:00
2022-10-08 19:57:46 +03:00
def transfer_rubbles(
my_wallet_private_key, transfer_publick_key: str, amount: float
) -> TransHash:
response = r.post(
URL + "/v1/transfers/ruble",
data=json.dumps(
2022-10-08 14:39:52 +03:00
{
2022-10-08 19:57:46 +03:00
"fromPrivateKey": my_wallet_private_key,
"toPublicKey": transfer_publick_key,
"amount": amount,
2022-10-08 14:39:52 +03:00
}
),
2022-10-08 19:57:46 +03:00
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
return TransHash(transaction_hash=response.json()["transaction"])
def create_nft(wallet_public_key: str, string_url: str) -> TransHash:
response = r.post(
URL + "/v1/nft/generate",
data=json.dumps(
{"toPublicKey": wallet_public_key, "uri": string_url, "nftCount": 1}
),
headers=base_headers,
2022-10-08 14:39:52 +03:00
)
2022-10-08 19:57:46 +03:00
return TransHash(transaction_hash=response.json()["transaction_hash"])
2022-10-08 14:39:52 +03:00
def get_nfts(wallet_public_key: str) -> List[NftInfo]:
2022-10-08 19:57:46 +03:00
res = r.get(URL + f"/v1/wallets/{wallet_public_key}/nft/balance")
2022-10-08 14:39:52 +03:00
return list(
map(
2022-10-08 19:57:46 +03:00
lambda res: NftInfo(uri=res["uri"], tokens=res["tokens"]),
res.json()["balance"],
),
)
2022-10-08 14:39:52 +03:00
2022-10-08 19:57:46 +03:00
def get_history(wallet_public_key: str) -> List[TransactionHistory]:
res = r.post(
URL + f"/v1/wallets/{wallet_public_key}/history",
data=json.dumps({"page": 0, "offset": 100, "sort": "asc"}),
headers=base_headers,
)
2022-10-08 14:39:52 +03:00
return list(
map(
lambda res: TransactionHistory(
2022-10-08 19:57:46 +03:00
hash=res["hash"],
timestamp=res["timeStamp"],
token_name=res["tokenName"],
from_wallet=res["from"],
to_wallet=res["to"],
2022-10-08 14:39:52 +03:00
),
2022-10-08 19:57:46 +03:00
res.json()["history"],
),
)
def transfer_nft(
my_private_wallet_key: str, transfer_publick_key: str, token_id: int
) -> TransHash:
res = r.post(
URL + "/v1/transfers/nft",
data=json.dumps(
{
"fromPrivateKey": my_private_wallet_key,
"toPublicKey": transfer_publick_key,
"tokenId": token_id,
}
),
headers=base_headers,
)
return TransHash(transaction_hash=res.json()["transaction_hash"])
def get_balance(my_public_wallet_key: str) -> WalletBalance:
res = r.get(URL + f"/v1/wallets/{my_public_wallet_key}/balance")
return WalletBalance(
matic=res.json()["maticAmount"], coins=res.json()["coinsAmount"]
)
def send_matic(
my_private_wallet_key: str, transfer_publick_key: str, amount: float
) -> TransHash:
res = r.post(
URL + "/v1/transfers/matic",
data=json.dumps(
{
"fromPrivateKey": my_private_wallet_key,
"toPublicKey": transfer_publick_key,
"amount": amount,
}
),
headers=base_headers,
)
return TransHash(transaction_hash=res.json()["transaction"])