General

Python ssh shell autocompletion

  1. Create a file .pythonrc
    # ~/.pythonrc
    # enable syntax completion
    try:
        import readline
    except ImportError:
        print("Module readline not available.")
    else:
        import rlcompleter
        readline.parse_and_bind("tab: complete")
    
  2. In your .bashrc file, add
    export PYTHONSTARTUP=~/.pythonrc
    

Reference: https://stackoverflow.com/a/246779/1955346

Date & Time operations

import datetime as dt

Current date

dt.datetime.now()
#datetime.datetime(2017, 5, 15, 15, 41, 38, 177617)

dt.datetime.now().isoformat()
#'2017-05-15T15:41:47.738485'

Add / subtract time

dt.datetime.now() - dt.timedelta(days=1,minutes=1,seconds=1)
 
#datetime.datetime(2017, 5, 14, 15, 42, 40, 227572)

Parse date in custom format

d = "20170101"
date = dt.datetime.strptime(d,"%Y%m%d")

Reference: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior

Parse date without format

from dateutils import parser
datetime = parser.parse("2017-04-24T00:00:00")
#output:
#datetime.datetime(2017, 4, 24, 0, 0)

log_time decorator

Simple decorator that logs execution time of an annotated function

import logging
from time import time
logger = logging.getLogger(__name__)
def log_time():
    def decorator(fn):
        @functools.wraps(fn)
        def with_timer(*args, **kwargs):
            start = time()
            result = fn(*args, **kwargs)
            logger.info(f"[PID: {os.getpid()}] {fn.__name__} execution time: {time() - start}s")
            return result

        return with_timer

    return decorator

Pandas

import pandas as pd

Reading dataframe from BigQuery

bigquery_service_account = open("bigquery-service-account.json").read()
project_id = "my-project"
query = """
select * from `table`
order by name 
limit {lim}
""".format(lim=500000)

print(query)

all_data: pd.DataFrame = pd.read_gbq(query=query, project_id=project_id,
                                        private_key=bigquery_service_account,
                                        dialect="standard")
# THIS IS IMPORTANT -- when you read a lot of rows, order in dataframe might change!!! (WTF, right?)
all_data.sort_values(by=["name"], inplace=True)
all_data.reset_index(inplace=True)

Create dataframe

When using default constructor, data is inputed row-by-row:

df = pd.DataFrame([[1,1,1],[2,2,2],[3,3,3]],columns=["MyCol1","MyCol2","MyCol3"])
df
   MyCol1  MyCol2  MyCol3
0       1       1       1
1       2       2       2
2       3       3       3

Loading from CSV

df = pd.read_csv("train.csv",sep=";")
print(df.head()[["Name","Age","Fare"]])

                                                Name   Age     Fare
0                            Braund, Mr. Owen Harris  22.0   7.2500
1  Cumings, Mrs. John Bradley (Florence Briggs Th...  38.0  71.2833
2                             Heikkinen, Miss. Laina  26.0   7.9250
3       Futrelle, Mrs. Jacques Heath (Lily May Peel)  35.0  53.1000
4                           Allen, Mr. William Henry  35.0   8.0500

Loading from string (read string as file)

import sys
if sys.version_info[0] < 3: 
    from StringIO import StringIO
else:
    from io import StringIO

pd.read_csv(StringIO(my_string_with_csv))

Exporting to string

import sys
if sys.version_info[0] < 3: 
    from StringIO import StringIO
else:
    from io import StringIO
raw_csv = StringIO()
dataframe.to_csv(raw_csv, index=False)
raw_csv.getvalue()

Add column

By string transformation

df["FirstName"], df["SecondName"] = df["Name"].str.split(",").str
df["Title"] = df["SecondName"].str.split(".").str[0]

Output:
   FirstName                                   SecondName  Title
0     Braund                              Mr. Owen Harris     Mr
1    Cumings   Mrs. John Bradley (Florence Briggs Thayer)    Mrs
2  Heikkinen                                  Miss. Laina   Miss
3   Futrelle           Mrs. Jacques Heath (Lily May Peel)    Mrs
4      Allen                            Mr. William Henry     Mr

By mapping values

df.assign returns new dataframe!

df = df.assign(FareInEuro=lambda frame: frame.Fare * 0.89)
      Fare  FareInEuro
0   7.2500    6.452500
1  71.2833   63.442137
2   7.9250    7.053250
3  53.1000   47.259000
4   8.0500    7.164500

By mapping column-per-row (EASIEST)

df.loc[:, "date"] = df["date_raw"].map(lambda raw_date: dt.datetime.strptime(raw_date,"%Y%m%d"))

By adding column from another dataframe (HARD)

  1. You have one dataframe, with the same number of values and matching columns
  2. You need to index the dataframe, from which you want to take column with index from the first one
  3. Add with simple assignment:
df = df.set_index(predictions.index)
predictions["y"] = df["y"]

By mapping index to a different value

This example involves groupby, which will introduce new DataFrame with index being the key for which the grouping was done:

df = users.groupby("name").agg({"action_count":"sum"})
# now df has index being string (name)

# the clue here is to use .to_series()
df["email"] = df.index.to_series().map(lambda name: lookup_email(name))

Expand dataframe using concat dataframes

Idea: Create another dataframe with the same columns, but next values (i.e when you have dates of specific range in first dataframe and want to expand by adding some dates)

# initial dataframe, [ds] column is date
df = pd.read_csv("something.csv") # ds column is date

last_date = # end_date to expand to
last_date_in_input = df["ds"].max() + dt.timedelta(days=1)

missing_dates = pd.date_range(start=last_date_in_input, end=last_date).map(lambda ts: ts.date())

df = pd.concat([df, pd.DataFrame(np.array(list(zip(missing_dates, np.zeros([missing_dates.shape[0]])))), columns=["ds", "y"])])

Explanation:

np.array will create n dimentional array from tuples created by zip function. zip function is lazy, so it needs to be materialized using list() invocation.

Drop column

df.drop("date_raw",inplace=True,axis=1)

Reindex dataframe (a.k.a create & fill missing rows)

Let's say you have data consisting of observations like this:

i     value
0      0
1      1
2      2
3      3
5      5

Now, let's say you want to analyze data which is in range 0-10 inclusive. In order to fit this into dataframe, you need to reindex it.

  1. Drop default index (which will be [0,1,2,3,4] in this example)
  2. Set index to column "i":
  3. Create new index, with range 0-10
  4. Reindex dataframe

Code:

df.reset_index(drop=True)
df.set_index("i",drop=True, inplace=True)
idx = pd.RangeIndex(start=0,stop=7,step=1)
df2 = df.reindex(idx, fill_value=-1)

#result:
df2 value
0      0
1      1
2      2
3      3
4     -1
5      5
6     -1

Scan / iterate dataframe row-by-row

# assumes df is pandas DataFrame:
for i, row in df.iterrows():
      print(i,row)

Fill holes in data (reindex)

Let's say you have data consisting of observations like this:

i     value
0      0
1      1
2      2
3      3
5      5

Now, let's say you want to analyze data which is in range 0-10 inclusive. In order to fit this into dataframe, you need to reindex it.

  1. Drop default index (which will be [0,1,2,3,4] in this example)
  2. Set index to column "i":
  3. Create new index, with range 0-10
  4. Reindex dataframe

Code:

df.reset_index(drop=True)
df.set_index("i",drop=True, inplace=True)
idx = pd.RangeIndex(start=0,stop=7,step=1)
df2 = df.reindex(idx, fill_value=-1)

#result:
df2 value
0      0
1      1
2      2
3      3
4     -1
5      5
6     -1

Access columns after group by aggregations

df = users.groupby("user_email").agg({"events": ["mean", "count"], "actions": "sum"})
df[df[("actions", "sum")]>0]

Object Oriented Programming

@staticmethod vs @classmethod

@staticmethod is a plain function that can be called with class as a prefix. @classmethod is a method, that is called with CLASS as a first argument instead of instance. Example:

class Yolo(object):
    def __init__(self, yo: int):
        self.yo = yo
    
    @classmethod
    def from_string(__class__, s: str):
        # here, __class__ will be a Yolo class reference,
        # so that __class__(arg) will call Yolo class constructor
        return __class__(int(s))
    
    @staticmethod
    def explain(s: str):
        return f"You only live once, {s}"

# USAGE:
my_yolo_object: Yolo = Yolo.from_string("666") # constructor-like function
Yolo.explain("M") # static method call

__getattr__ vs __getattribute__

This one is tricky. __getattr__ is called when attribute is not found in object, while the __getattribute__ is always called. The __getattribute__ acts as an interceptor, so you need to be careful when using it, because you can easily create infinite recursion. If AttributeError is raised in __getattribute__, the exception is swallowed by Python interpreter and then __getattr__ is called.

Utility functions

RMSE & MAE for numpy

def rmse(predicted, actual):
      return np.sqrt(((predicted - actual) ** 2).mean())

def mae(predicted, actual):
      return np.abs((predicted - actual)).mean()

String formatting

Format integer with sign

print("{:+}".format(567))
print("{:+}".format(-666))

# output:
+567
-666

print without new line

print("Gimme fuel gimme fire!", end="")

# for Python 2.7:
from __future__ import print_function

format float with string interpolation

cnt = 10
total_members = 11
print(f"Percentage is: {cnt/total_members*100.0:2.3f}%")

scikit-learn

Classifiers compatible with AdaBoost

import inspect
from sklearn.utils.testing import all_estimators
for name, clf in all_estimators(type_filter='classifier'):
    if 'sample_weight' in inspect.getargspec(clf().fit)[0]:
       print(name)

Python LINQ

DEPRECATED, use RxPy instead

Only started working on it :)

class Linq(object):
    def __init__(self, collection):
        self.collection = collection

    def select(self, lambda_):
        self.collection = map(lambda_, self.collection)
        return self

    def where(self, lambda_):
        self.collection = filter(lambda_, self.collection)
        return self

    def append(self, collection2):
        collection1 = self.collection
        def generator():
            for i in collection1:
                yield i
            for i in collection2:
                yield i
        self.collection = generator()
        return self

    def __str__(self):
        return str(self.collection)

_ = Linq

Usage:

_([1234]).select(lambda x: x**2).where(lambda x: x%3==1)

argparse

Boolean flag

parser = argparse.ArgumentParser()
parser.add_argument("--keep-new-lines", dest="keep_new_lines", action="store_true")
parser.set_defaults(keep_new_lines=False)

Transform argparse to dict / JSON

arguments = parser.parse_args()
logger.info("Using arguments:\n" + json.dumps(vars(arguments), indent=4))

Iterators and friends

Take every pair from flat list

Input: [1,2,3,4,5,6] Desired output: [(1,2), (3,4), (5,6)] Code:

input = [1,2,3,4,5,6]
iterator = iter(input)
output = [(x,y) for x,y in zip(iterator, iterator)]

Jupyter Notebook

Set DataFrame column width

When displaying in Jupyter Notebook, DataFrames are cropped and you cannot see i.e full strings when they are long. In order to overcome this, just use:

pd.set_option("display.max_colwidth", 500)

HBox(children=(HTML(value='Training') instead of standard progress bar in Jupyter Lab

Install this in bash:

jupyter labextension install @jupyter-widgets/jupyterlab-manager

SQLAlchemy

Use SQL variable in query

Use single session and execute multiple SQL statements.

query_parts = [
    "SET @lastRow = '';",
    "SET @myVariable = :myVar;",
    "select @lastRow as lastRow, @lastRow:=val as val where column = @myVariable;"
]
for i, q in enumerate(query_parts):
    results = session.execute(q, {"myVar": 123})
    if i == len(query_parts) - 1:
        for row in results:
            print(row)

Requests

Stream download large files

from pathlib import Path
import requests
from shutil import copyfileobj
def download(url):
    name = url.split("/")[-1]
    with requests.get(url, stream=True) as r:
        target = Path(f"bingmaps/{name}")
        with target.open("wb") as f:
            copyfileobj(r.raw, f)

        return target

pytest

VSCode debugger does not attach to pytest

Use the following launch.json:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Module",
            "type": "python",
            "request": "launch",
            "module": "pytest",
            "console": "integratedTerminal",
            "justMyCode": false,
            "aras": [
                "src/tests",
                "-k",
                "test multi runner",
                "--no-cov"  // <---------------- this is the main issue
            ],
            "cwd": "§{workspaceFolder}",
            "purpose": [
                "debug-test"
            ]
        }
    ]
}

fsspec

Upload file to AWS with S3 checksums

with fsspec.open("s3://<path>", "wb", s3_additional_kwargs={"ChecksumAlgorithm": "SHA256"}) as fs3:
    p = "localpath"
    with Path(p).open("rb") as fl:
        for chunk in iter(lambda: fl.read(10 * 1024**2), b""): 
            fs3.write(chunk)

Async

Problem with uvicorn / FastAPI debugging in PyCharm

Error shows up when you try to launch debugger for a Python module with asyncio.

TypeError: 'Task' object is not callable

Solution:

  1. open Actions search window (press shift twice and switch to Actions tab)
  2. type Registry, choose the Registry... item
  3. switch off (deselect) the python.debug.asyncio.repl property

Thanks to Jetbrains support for help: PY-65970

Source of the solution: https://stackoverflow.com/a/77908985/1955346

No matches...