Skip to content

SQLAlchemy Repositories

fastauth.contrib.sqlalchemy.repositories.SQLAlchemyUserRepository

SQLAlchemyUserRepository(session)

Bases: AbstractUserRepository[UP, ID], Generic[UP, ID]

Source code in fastauth/contrib/sqlalchemy/repositories.py
17
18
def __init__(self, session: AsyncSession):
    self.session = session

user_model instance-attribute

user_model

session instance-attribute

session = session

get_by_id async

get_by_id(pk)
Source code in fastauth/contrib/sqlalchemy/repositories.py
20
21
async def get_by_id(self, pk: ID) -> UP | None:
    return await self.session.get(self.user_model, pk)

get_by_email async

get_by_email(email)
Source code in fastauth/contrib/sqlalchemy/repositories.py
23
24
25
async def get_by_email(self, email: str) -> UP | None:
    qs = select(self.user_model).where(self.user_model.email == email).limit(1)
    return await self.session.scalar(qs)

get_by_username async

get_by_username(username)
Source code in fastauth/contrib/sqlalchemy/repositories.py
27
28
29
30
31
async def get_by_username(self, username: str) -> UP | None:
    qs = (
        select(self.user_model).where(self.user_model.username == username).limit(1)
    )
    return await self.session.scalar(qs)

get_by_fields async

get_by_fields(username, fields)
Source code in fastauth/contrib/sqlalchemy/repositories.py
33
34
35
36
37
38
39
40
41
async def get_by_fields(self, username: str, fields: list[str]) -> UP | None:
    qs = (
        select(self.user_model)
        .filter(
            or_(*[getattr(self.user_model, field) == username for field in fields])
        )
        .limit(1)
    )
    return await self.session.scalar(qs)

get_by_field async

get_by_field(value, field)
Source code in fastauth/contrib/sqlalchemy/repositories.py
43
44
45
async def get_by_field(self, value: Any, field: str) -> UP | None:
    qs = select(self.user_model).filter_by(**{field: value}).limit(1)
    return await self.session.scalar(qs)

create async

create(data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
47
48
49
50
51
52
async def create(self, data: dict[str, Any]) -> UP:
    instance = self.user_model(**data)
    self.session.add(instance)
    await self.session.commit()
    await self.session.refresh(instance)
    return instance

update async

update(user, data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
54
55
56
57
58
59
async def update(self, user: UP, data: dict[str, Any]) -> UP:
    for key, val in data.items():
        setattr(user, key, val)
    await self.session.commit()
    await self.session.refresh(user)
    return user

delete async

delete(user)
Source code in fastauth/contrib/sqlalchemy/repositories.py
61
62
63
async def delete(self, user: UP):
    await self.session.delete(user)
    return None

fastauth.contrib.sqlalchemy.repositories.SQLAlchemyRBACRepository

SQLAlchemyRBACRepository(session)

Bases: AbstractRolePermissionRepository[RP, PP], Generic[RP, PP]

Source code in fastauth/contrib/sqlalchemy/repositories.py
72
73
def __init__(self, session: AsyncSession):
    self.session = session

role_model instance-attribute

role_model

permission_model instance-attribute

permission_model

session instance-attribute

session = session

get_role async

get_role(role_id)
Source code in fastauth/contrib/sqlalchemy/repositories.py
75
76
async def get_role(self, role_id: int) -> RP | None:
    return await self.session.get(self.role_model, role_id)

get_role_by_codename async

get_role_by_codename(codename)
Source code in fastauth/contrib/sqlalchemy/repositories.py
78
79
80
81
82
async def get_role_by_codename(self, codename: str) -> RP | None:
    qs = (
        select(self.role_model).where(self.role_model.codename == codename).limit(1)
    )
    return await self.session.scalar(qs)

create_role async

create_role(data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
84
85
86
87
88
89
async def create_role(self, data: dict[str, Any]) -> RP:
    role = self.role_model(**data)
    self.session.add(role)
    await self.session.commit()
    await self.session.refresh(role)
    return role

update_role async

update_role(role, data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
91
92
93
94
95
96
async def update_role(self, role: RP, data: dict[str, Any]) -> RP:
    for key, val in data.items():
        setattr(role, key, val)
    await self.session.commit()
    await self.session.refresh(role)
    return role

delete_role async

delete_role(role)
Source code in fastauth/contrib/sqlalchemy/repositories.py
 98
 99
100
async def delete_role(self, role: RP) -> None:
    await self.session.delete(role)
    return None

list_roles async

list_roles()
Source code in fastauth/contrib/sqlalchemy/repositories.py
102
103
104
async def list_roles(self) -> list[RP]:
    qs = select(self.role_model)
    return list((await self.session.scalars(qs)).all())

get_permission async

get_permission(permission_id)
Source code in fastauth/contrib/sqlalchemy/repositories.py
106
107
async def get_permission(self, permission_id: int) -> PP | None:
    return await self.session.get(self.permission_model, permission_id)

get_permission_by_codename async

get_permission_by_codename(codename)
Source code in fastauth/contrib/sqlalchemy/repositories.py
109
110
111
112
113
114
115
async def get_permission_by_codename(self, codename: str) -> PP | None:
    qs = (
        select(self.permission_model)
        .where(self.permission_model.codename == codename)
        .limit(1)
    )
    return await self.session.scalar(qs)

create_permission async

create_permission(data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
117
118
119
120
121
122
async def create_permission(self, data: dict[str, Any]) -> PP:
    permission = self.permission_model(**data)
    self.session.add(permission)
    await self.session.commit()
    await self.session.refresh(permission)
    return permission

update_permission async

update_permission(permission, data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
124
125
126
127
128
129
async def update_permission(self, permission: PP, data: dict[str, Any]) -> PP:
    for key, val in data.items():
        setattr(permission, key, val)
    await self.session.commit()
    await self.session.refresh(permission)
    return permission

delete_permission async

delete_permission(permission)
Source code in fastauth/contrib/sqlalchemy/repositories.py
131
132
133
async def delete_permission(self, permission: PP) -> None:
    await self.session.delete(permission)
    return permission

list_permissions async

list_permissions()
Source code in fastauth/contrib/sqlalchemy/repositories.py
135
136
137
async def list_permissions(self) -> list[PP]:
    qs = select(self.permission_model)
    return list((await self.session.scalars(qs)).all())

fastauth.contrib.sqlalchemy.repositories.SQLAlchemyOAuthRepository

SQLAlchemyOAuthRepository(session)

Bases: AbstractOAuthRepository[UOAP, OAP], Generic[UOAP, OAP]

Source code in fastauth/contrib/sqlalchemy/repositories.py
144
145
def __init__(self, session: AsyncSession):
    self.session = session

user_model instance-attribute

user_model

oauth_model instance-attribute

oauth_model

session instance-attribute

session = session

get_user async

get_user(oauth_name, account_id)
Source code in fastauth/contrib/sqlalchemy/repositories.py
147
148
149
150
151
152
153
154
155
async def get_user(self, oauth_name: str, account_id: str) -> UOAP | None:
    qs = (
        select(self.user_model)
        .join(self.oauth_model)
        .where(self.oauth_model.oauth_name == oauth_name)
        .where(self.oauth_model.account_id == account_id)
    )
    res = await self.session.execute(qs)
    return res.unique().scalar_one_or_none()

add_oauth_account async

add_oauth_account(user, data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
157
158
159
160
161
162
163
164
async def add_oauth_account(self, user: UOAP, data: dict[str, Any]) -> UOAP:
    oauth = self.oauth_model(**data)
    self.session.add(oauth)
    user.oauth_accounts.append(oauth)
    self.session.add(user)
    await self.session.commit()
    await self.session.refresh(user)
    return user

update_oauth_account async

update_oauth_account(user, oauth, data)
Source code in fastauth/contrib/sqlalchemy/repositories.py
166
167
168
169
170
171
172
173
174
async def update_oauth_account(
    self, user: UOAP, oauth: OAP, data: dict[str, Any]
) -> UOAP:
    for key, val in data.items():
        setattr(oauth, key, val)
    self.session.add(oauth)
    await self.session.commit()
    await self.session.refresh(user)
    return user