General
Python ssh shell autocompletion
- Create a file .pythonrc
# ~/.pythonrc # enable syntax completion try: import readline except ImportError: print("Module readline not available.") else: import rlcompleter readline.parse_and_bind("tab: complete")
- In your .bashrc file, add
export PYTHONSTARTUP=~/.pythonrc
Reference: https://stackoverflow.com/a/246779/1955346
Date & Time operations
import datetime as dt
Current date
dt.datetime.now()
#datetime.datetime(2017, 5, 15, 15, 41, 38, 177617)
dt.datetime.now().isoformat()
#'2017-05-15T15:41:47.738485'
Add / subtract time
dt.datetime.now() - dt.timedelta(days=1,minutes=1,seconds=1)
#datetime.datetime(2017, 5, 14, 15, 42, 40, 227572)
Parse date in custom format
d = "20170101"
date = dt.datetime.strptime(d,"%Y%m%d")
Reference: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
Parse date without format
from dateutils import parser
datetime = parser.parse("2017-04-24T00:00:00")
#output:
#datetime.datetime(2017, 4, 24, 0, 0)
log_time decorator
Simple decorator that logs execution time of an annotated function
import logging
from time import time
logger = logging.getLogger(__name__)
def log_time():
def decorator(fn):
@functools.wraps(fn)
def with_timer(*args, **kwargs):
start = time()
result = fn(*args, **kwargs)
logger.info(f"[PID: {os.getpid()}] {fn.__name__} execution time: {time() - start}s")
return result
return with_timer
return decorator
Pandas
import pandas as pd
Reading dataframe from BigQuery
bigquery_service_account = open("bigquery-service-account.json").read()
project_id = "my-project"
query = """
select * from `table`
order by name
limit {lim}
""".format(lim=500000)
print(query)
all_data: pd.DataFrame = pd.read_gbq(query=query, project_id=project_id,
private_key=bigquery_service_account,
dialect="standard")
# THIS IS IMPORTANT -- when you read a lot of rows, order in dataframe might change!!! (WTF, right?)
all_data.sort_values(by=["name"], inplace=True)
all_data.reset_index(inplace=True)
Create dataframe
When using default constructor, data is inputed row-by-row:
df = pd.DataFrame([[1,1,1],[2,2,2],[3,3,3]],columns=["MyCol1","MyCol2","MyCol3"])
df
MyCol1 MyCol2 MyCol3
0 1 1 1
1 2 2 2
2 3 3 3
Loading from CSV
df = pd.read_csv("train.csv",sep=";")
print(df.head()[["Name","Age","Fare"]])
Name Age Fare
0 Braund, Mr. Owen Harris 22.0 7.2500
1 Cumings, Mrs. John Bradley (Florence Briggs Th... 38.0 71.2833
2 Heikkinen, Miss. Laina 26.0 7.9250
3 Futrelle, Mrs. Jacques Heath (Lily May Peel) 35.0 53.1000
4 Allen, Mr. William Henry 35.0 8.0500
Loading from string (read string as file)
import sys
if sys.version_info[0] < 3:
from StringIO import StringIO
else:
from io import StringIO
pd.read_csv(StringIO(my_string_with_csv))
Exporting to string
import sys
if sys.version_info[0] < 3:
from StringIO import StringIO
else:
from io import StringIO
raw_csv = StringIO()
dataframe.to_csv(raw_csv, index=False)
raw_csv.getvalue()
Add column
By string transformation
df["FirstName"], df["SecondName"] = df["Name"].str.split(",").str
df["Title"] = df["SecondName"].str.split(".").str[0]
Output:
FirstName SecondName Title
0 Braund Mr. Owen Harris Mr
1 Cumings Mrs. John Bradley (Florence Briggs Thayer) Mrs
2 Heikkinen Miss. Laina Miss
3 Futrelle Mrs. Jacques Heath (Lily May Peel) Mrs
4 Allen Mr. William Henry Mr
By mapping values
df.assign returns new dataframe!
df = df.assign(FareInEuro=lambda frame: frame.Fare * 0.89)
Fare FareInEuro
0 7.2500 6.452500
1 71.2833 63.442137
2 7.9250 7.053250
3 53.1000 47.259000
4 8.0500 7.164500
By mapping column-per-row (EASIEST)
df.loc[:, "date"] = df["date_raw"].map(lambda raw_date: dt.datetime.strptime(raw_date,"%Y%m%d"))
By adding column from another dataframe (HARD)
- You have one dataframe, with the same number of values and matching columns
- You need to index the dataframe, from which you want to take column with index from the first one
- Add with simple assignment:
df = df.set_index(predictions.index)
predictions["y"] = df["y"]
By mapping index to a different value
This example involves groupby, which will introduce new DataFrame with index being the key for which the grouping was done:
df = users.groupby("name").agg({"action_count":"sum"})
# now df has index being string (name)
# the clue here is to use .to_series()
df["email"] = df.index.to_series().map(lambda name: lookup_email(name))
Expand dataframe using concat dataframes
Idea: Create another dataframe with the same columns, but next values (i.e when you have dates of specific range in first dataframe and want to expand by adding some dates)
# initial dataframe, [ds] column is date
df = pd.read_csv("something.csv") # ds column is date
last_date = # end_date to expand to
last_date_in_input = df["ds"].max() + dt.timedelta(days=1)
missing_dates = pd.date_range(start=last_date_in_input, end=last_date).map(lambda ts: ts.date())
df = pd.concat([df, pd.DataFrame(np.array(list(zip(missing_dates, np.zeros([missing_dates.shape[0]])))), columns=["ds", "y"])])
Explanation:
np.array
will create n
dimentional array from tuples created by zip
function. zip
function is lazy, so it needs to be materialized using list()
invocation.
Drop column
df.drop("date_raw",inplace=True,axis=1)
Reindex dataframe (a.k.a create & fill missing rows)
Let's say you have data consisting of observations like this:
i value
0 0
1 1
2 2
3 3
5 5
Now, let's say you want to analyze data which is in range 0-10 inclusive. In order to fit this into dataframe, you need to reindex it.
- Drop default index (which will be [0,1,2,3,4] in this example)
- Set index to column "i":
- Create new index, with range 0-10
- Reindex dataframe
Code:
df.reset_index(drop=True)
df.set_index("i",drop=True, inplace=True)
idx = pd.RangeIndex(start=0,stop=7,step=1)
df2 = df.reindex(idx, fill_value=-1)
#result:
df2 value
0 0
1 1
2 2
3 3
4 -1
5 5
6 -1
Scan / iterate dataframe row-by-row
# assumes df is pandas DataFrame:
for i, row in df.iterrows():
print(i,row)
Fill holes in data (reindex)
Let's say you have data consisting of observations like this:
i value
0 0
1 1
2 2
3 3
5 5
Now, let's say you want to analyze data which is in range 0-10 inclusive. In order to fit this into dataframe, you need to reindex it.
- Drop default index (which will be
[0,1,2,3,4]
in this example) - Set index to column "i":
- Create new index, with range 0-10
- Reindex dataframe
Code:
df.reset_index(drop=True)
df.set_index("i",drop=True, inplace=True)
idx = pd.RangeIndex(start=0,stop=7,step=1)
df2 = df.reindex(idx, fill_value=-1)
#result:
df2 value
0 0
1 1
2 2
3 3
4 -1
5 5
6 -1
Access columns after group by aggregations
df = users.groupby("user_email").agg({"events": ["mean", "count"], "actions": "sum"})
df[df[("actions", "sum")]>0]
Object Oriented Programming
@staticmethod
vs @classmethod
@staticmethod
is a plain function that can be called with class as a prefix.
@classmethod
is a method, that is called with CLASS as a first argument instead of instance.
Example:
class Yolo(object):
def __init__(self, yo: int):
self.yo = yo
@classmethod
def from_string(__class__, s: str):
# here, __class__ will be a Yolo class reference,
# so that __class__(arg) will call Yolo class constructor
return __class__(int(s))
@staticmethod
def explain(s: str):
return f"You only live once, {s}"
# USAGE:
my_yolo_object: Yolo = Yolo.from_string("666") # constructor-like function
Yolo.explain("M") # static method call
__getattr__
vs __getattribute__
This one is tricky.
__getattr__
is called when attribute is not found in object, while the __getattribute__
is always called. The __getattribute__
acts as an interceptor, so you need to be careful when using it, because you can easily create infinite recursion. If AttributeError
is raised in __getattribute__
, the exception is swallowed by Python interpreter and then __getattr__
is called.
Utility functions
RMSE & MAE for numpy
def rmse(predicted, actual):
return np.sqrt(((predicted - actual) ** 2).mean())
def mae(predicted, actual):
return np.abs((predicted - actual)).mean()
String formatting
Format integer with sign
print("{:+}".format(567))
print("{:+}".format(-666))
# output:
+567
-666
print without new line
print("Gimme fuel gimme fire!", end="")
# for Python 2.7:
from __future__ import print_function
format float with string interpolation
cnt = 10
total_members = 11
print(f"Percentage is: {cnt/total_members*100.0:2.3f}%")
scikit-learn
Classifiers compatible with AdaBoost
import inspect
from sklearn.utils.testing import all_estimators
for name, clf in all_estimators(type_filter='classifier'):
if 'sample_weight' in inspect.getargspec(clf().fit)[0]:
print(name)
Python LINQ
DEPRECATED, use RxPy instead
Only started working on it :)
class Linq(object):
def __init__(self, collection):
self.collection = collection
def select(self, lambda_):
self.collection = map(lambda_, self.collection)
return self
def where(self, lambda_):
self.collection = filter(lambda_, self.collection)
return self
def append(self, collection2):
collection1 = self.collection
def generator():
for i in collection1:
yield i
for i in collection2:
yield i
self.collection = generator()
return self
def __str__(self):
return str(self.collection)
_ = Linq
Usage:
_([1234]).select(lambda x: x**2).where(lambda x: x%3==1)
argparse
Boolean flag
parser = argparse.ArgumentParser()
parser.add_argument("--keep-new-lines", dest="keep_new_lines", action="store_true")
parser.set_defaults(keep_new_lines=False)
Transform argparse to dict / JSON
arguments = parser.parse_args()
logger.info("Using arguments:\n" + json.dumps(vars(arguments), indent=4))
Iterators and friends
Take every pair from flat list
Input: [1,2,3,4,5,6]
Desired output: [(1,2), (3,4), (5,6)]
Code:
input = [1,2,3,4,5,6]
iterator = iter(input)
output = [(x,y) for x,y in zip(iterator, iterator)]
Jupyter Notebook
Set DataFrame column width
When displaying in Jupyter Notebook, DataFrames are cropped and you cannot see i.e full strings when they are long. In order to overcome this, just use:
pd.set_option("display.max_colwidth", 500)
HBox(children=(HTML(value='Training')
instead of standard progress bar in Jupyter Lab
Install this in bash:
jupyter labextension install @jupyter-widgets/jupyterlab-manager
SQLAlchemy
Use SQL variable in query
Use single session and execute multiple SQL statements.
query_parts = [
"SET @lastRow = '';",
"SET @myVariable = :myVar;",
"select @lastRow as lastRow, @lastRow:=val as val where column = @myVariable;"
]
for i, q in enumerate(query_parts):
results = session.execute(q, {"myVar": 123})
if i == len(query_parts) - 1:
for row in results:
print(row)
Requests
Stream download large files
from pathlib import Path
import requests
from shutil import copyfileobj
def download(url):
name = url.split("/")[-1]
with requests.get(url, stream=True) as r:
target = Path(f"bingmaps/{name}")
with target.open("wb") as f:
copyfileobj(r.raw, f)
return target
pytest
VSCode debugger does not attach to pytest
Use the following launch.json
:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Module",
"type": "python",
"request": "launch",
"module": "pytest",
"console": "integratedTerminal",
"justMyCode": false,
"aras": [
"src/tests",
"-k",
"test multi runner",
"--no-cov" // <---------------- this is the main issue
],
"cwd": "§{workspaceFolder}",
"purpose": [
"debug-test"
]
}
]
}
fsspec
Upload file to AWS with S3 checksums
with fsspec.open("s3://<path>", "wb", s3_additional_kwargs={"ChecksumAlgorithm": "SHA256"}) as fs3:
p = "localpath"
with Path(p).open("rb") as fl:
for chunk in iter(lambda: fl.read(10 * 1024**2), b""):
fs3.write(chunk)
Async
Problem with uvicorn / FastAPI debugging in PyCharm
Error shows up when you try to launch debugger for a Python module with asyncio.
TypeError: 'Task' object is not callable
Solution:
- open Actions search window (press shift twice and switch to Actions tab)
- type
Registry
, choose theRegistry...
item - switch off (deselect) the
python.debug.asyncio.repl
property
Thanks to Jetbrains support for help: PY-65970
Source of the solution: https://stackoverflow.com/a/77908985/1955346