Skip to content

BaseAuthManager

fastauth.manager.BaseAuthManager

BaseAuthManager(config, user_repository, rp_repository=None, oauth_repository=None, *, password_helper=None, token_encoder=None)

Bases: Generic[UP, ID, RP, PP, OAP], AuthManagerEvents[UP]

Source code in fastauth/manager.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __init__(
    self,
    config: FastAuthConfig,
    user_repository: AbstractUserRepository[UP, ID],
    rp_repository: AbstractRolePermissionRepository[RP, PP] | None = None,
    oauth_repository: AbstractOAuthRepository[UP, OAP] | None = None,
    *,
    password_helper: PasswordHelperProtocol | None = None,
    token_encoder: TokenHelperProtocol | None = None,
):
    self._config = config
    self.user_repo = user_repository
    self.rp_repo = rp_repository
    self.oauth_repo = oauth_repository
    self.password_helper = password_helper or PasswordHelper()
    self.token_encoder = token_encoder or JWTHelper(
        config.JWT_SECRET, config.JWT_ALGORITHM
    )

user_repo instance-attribute

user_repo = user_repository

rp_repo instance-attribute

rp_repo = rp_repository

oauth_repo instance-attribute

oauth_repo = oauth_repository

password_helper instance-attribute

password_helper = password_helper or PasswordHelper()

token_encoder instance-attribute

token_encoder = token_encoder or JWTHelper(JWT_SECRET, JWT_ALGORITHM)

on_after_register async

on_after_register(user, request=None)

Event called after user registration

PARAMETER DESCRIPTION
user

User model

TYPE: UP

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
25
26
27
28
29
30
31
async def on_after_register(self, user: UP, request: Request | None = None) -> None:
    """
    Event called after user registration
    :param user: User model
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_user_update async

on_after_user_update(user, update_dict, request=None)

Event called after user update

PARAMETER DESCRIPTION
user

User model

TYPE: UP

update_dict

Updated fields

TYPE: dict[str, Any]

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
33
34
35
36
37
38
39
40
41
42
async def on_after_user_update(
    self, user: UP, update_dict: dict[str, Any], request: Request | None = None
) -> None:
    """
    Event called after user update
    :param user: User model
    :param update_dict: Updated fields
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_request_verify async

on_after_request_verify(user, token, request=None)

Event called after request verification token

PARAMETER DESCRIPTION
user

User model

TYPE: UP

token

Verification token

TYPE: str

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
44
45
46
47
48
49
50
51
52
53
async def on_after_request_verify(
    self, user: UP, token: str, request: Request | None = None
) -> None:
    """
    Event called after request verification token
    :param user: User model
    :param token: Verification token
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_verify async

on_after_verify(user, request=None)

Event called after user verification

PARAMETER DESCRIPTION
user

User model

TYPE: UP

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
55
56
57
58
59
60
61
async def on_after_verify(self, user: UP, request: Request | None = None) -> None:
    """
    Event called after user verification
    :param user: User model
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_forgot_password async

on_after_forgot_password(user, token, request=None)

Event called after request forgot password token

PARAMETER DESCRIPTION
user

User model

TYPE: UP

token

Reset token

TYPE: str

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
63
64
65
66
67
68
69
70
71
72
async def on_after_forgot_password(
    self, user: UP, token: str, request: Request | None = None
) -> None:
    """
    Event called after request forgot password token
    :param user: User model
    :param token: Reset token
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_reset_password async

on_after_reset_password(user, request=None)

Event called after user reset password

PARAMETER DESCRIPTION
user

User model

TYPE: UP

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
74
75
76
77
78
79
80
81
82
async def on_after_reset_password(
    self, user: UP, request: Request | None = None
) -> None:
    """
    Event called after user reset password
    :param user: User model
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_login async

on_after_login(user, request=None, response=None)

Event called after user login

PARAMETER DESCRIPTION
user

User model

TYPE: UP

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

response

Optional fastapi response

TYPE: Response | None DEFAULT: None

Source code in fastauth/manager.py
84
85
86
87
88
89
90
91
92
93
94
95
96
async def on_after_login(
    self,
    user: UP,
    request: Request | None = None,
    response: Response | None = None,
) -> None:
    """
    Event called after user login
    :param user: User model
    :param request: Optional fastapi request
    :param response: Optional fastapi response
    """
    return  # pragma: no cover

on_before_user_delete async

on_before_user_delete(user, request=None)

Event called before user deletion

PARAMETER DESCRIPTION
user

User model

TYPE: UP

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
 98
 99
100
101
102
103
104
105
106
async def on_before_user_delete(
    self, user: UP, request: Request | None = None
) -> None:
    """
    Event called before user deletion
    :param user: User model
    :param request: Optional fastapi request
    """
    return  # pragma: no cover

on_after_user_delete async

on_after_user_delete(user, request=None)

Event called after user deletion

PARAMETER DESCRIPTION
user

User model

TYPE: UP

request

Optional fastapi request

TYPE: Request | None DEFAULT: None

Source code in fastauth/manager.py
108
109
110
111
112
113
114
115
116
async def on_after_user_delete(
    self, user: UP, request: Request | None = None
) -> None:
    """
    Event called after user deletion
    :param user: User model
    :param request: Optional fastapi request
    """
    return

parse_id abstractmethod

parse_id(pk)

Override this method to convert pk to ID type

Source code in fastauth/manager.py
120
121
122
123
@abstractmethod
def parse_id(self, pk: str):
    """Override this method to convert pk to ID type"""
    return pk

create_token async

create_token(uid, token_type, strategy, *, max_age=None, headers=None, extra_data=None, **kwargs)
Source code in fastauth/manager.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def create_token(
    self,
    uid: str,
    token_type: TokenType,
    strategy: TokenStrategy[UP, ID],
    *,
    max_age: int | None = None,
    headers: str | None = None,
    extra_data: dict[str, Any] | None = None,
    **kwargs,
):
    conf = kwargs.copy()
    if max_age:
        conf["max_age"] = max_age
    if headers:
        conf["headers"] = headers
    if extra_data:
        conf["extra_data"] = extra_data

    user = await self.get_user(uid)
    return await strategy.write_token(user, token_type, **conf)

password_login async

password_login(credentials, strategy)
Source code in fastauth/manager.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
async def password_login(
    self, credentials: OAuth2PasswordRequestForm, strategy: TokenStrategy[UP, ID]
):
    if isinstance(self._config.USER_LOGIN_FIELDS, str):
        if self._config.USER_LOGIN_FIELDS == "email":
            user = await self.user_repo.get_by_email(credentials.username)
        elif self._config.USER_LOGIN_FIELDS == "username":
            user = await self.user_repo.get_by_username(credentials.username)
        else:
            user = await self.user_repo.get_by_field(
                credentials.username, self._config.USER_LOGIN_FIELDS
            )
    else:
        user = await self.user_repo.get_by_fields(
            credentials.username, self._config.USER_LOGIN_FIELDS
        )

    if user is None:
        raise exceptions.UserNotFound

    is_valid, new_hash = self.password_helper.verify_and_update(
        credentials.password, user.hashed_password
    )
    if not is_valid:
        raise exceptions.UserNotFound
    if new_hash:
        user = self.user_repo.update(user, {"hashed_password": new_hash})

    access_token = await strategy.write_token(user, "access")
    refresh_token = (
        await strategy.write_token(user, "refresh")
        if self._config.ENABLE_REFRESH_TOKEN
        else None
    )
    return TokenResponse(access_token=access_token, refresh_token=refresh_token)

check_access async

check_access(user, roles=None, permissions=None)

Check if user has at least one role or permission to access resource

Source code in fastauth/manager.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def check_access(
    self,
    user: URPP,
    roles: list[str] | None = None,
    permissions: list[str] | None = None,
):
    """Check if user has at least one role or permission to access resource"""
    if permissions is None:
        permissions = []
    if roles is None:
        roles = []
    if self.rp_repo is None:
        msg = "RolePermission repository not set"
        raise NotImplementedError(msg)

    required_roles_set = set(roles)
    required_permissions_set = set(permissions)

    user_permissions = {perm.codename for perm in user.permissions}
    role_permissions = {perm.codename for perm in user.role.permissions}
    total_permissions = role_permissions | user_permissions

    check = bool(
        user.role.codename in required_roles_set
        or total_permissions & required_permissions_set
    )
    if check is False:
        raise exceptions.AccessDenied
    return user

register async

register(data, safe=True)
Source code in fastauth/manager.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
async def register(self, data: type[UC_S], safe: bool = True):
    user = None
    if data.model_fields.get("email", None):
        user = await self.user_repo.get_by_email(data.email)
    if data.model_fields.get("username", None):
        user = await self.user_repo.get_by_username(data.username)

    # TODO: add multifield check for user

    # else:
    #     user = None
    #     for field in self._config.USER_LOGIN_FIELDS:
    #         user = await self.user_repo.get_by_field(getattr(data, field), field)
    #         if user:
    #             break

    if user is not None:
        raise exceptions.UserAlreadyExists

    valid_data = data.model_dump()
    password = valid_data.pop("password")
    valid_data["hashed_password"] = self.password_helper.hash(password)
    if safe:
        valid_data["is_active"] = self._config.USER_DEFAULT_IS_ACTIVE
        valid_data["is_verified"] = self._config.USER_DEFAULT_IS_VERIFIED

    if role_id := valid_data.get("role_id", False):
        if safe:
            role = await self.get_role_by_codename(self._config.USER_DEFAULT_ROLE)
        else:
            role = await self.get_role(role_id)

        valid_data["role_id"] = role.id

    return await self.user_repo.create(valid_data)

request_verify async

request_verify(email)
Source code in fastauth/manager.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
async def request_verify(self, email: str):
    user = await self.user_repo.get_by_email(email)
    if not user.is_active:
        raise exceptions.UserNotFound
    if user.is_verified:
        raise HTTPException(403, "User already verified")

    payload = {
        "sub": str(user.id),
        "email": user.email,
        "aud": self._config.JWT_DEFAULT_VERIFICATION_AUDIENCE,
    }

    token = self.token_encoder.encode_token(
        payload, "verification", max_age=self._config.JWT_VERIFY_TOKEN_MAX_AGE
    )
    # TODO: add event
    return token

verify async

verify(token)
Source code in fastauth/manager.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
async def verify(self, token: str):
    try:
        data = self.token_encoder.decode_token(
            token, self._config.JWT_DEFAULT_VERIFICATION_AUDIENCE
        )
    except PyJWTError as e:
        raise exceptions.InvalidToken("verification") from e

    try:
        user_id = data["sub"]
        email = data["email"]
    except KeyError as e:
        raise exceptions.InvalidToken("verification") from e

    user = await self.user_repo.get_by_email(email)
    if user is None:
        raise exceptions.InvalidToken("verification")

    parsed_id = self.parse_id(user_id)
    if parsed_id != user_id:
        exceptions.InvalidToken("verification")

    if user.is_verified:
        raise HTTPException(403, "User already verified")

    verified_user = await self._update_user(user, {"is_verified": True})
    # TODO: add event
    return verified_user

forgot_password async

forgot_password(email)
Source code in fastauth/manager.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
async def forgot_password(self, email: str):
    user = await self.user_repo.get_by_email(email)
    if user is None:
        raise exceptions.UserNotFound
    if not user.is_active:
        raise exceptions.UserNotFound

    payload = {
        "sub": str(user.id),
        "password_fgpt": self.password_helper.hash(user.hashed_password),
        "aud": self._config.JWT_DEFAULT_RESET_AUDIENCE,
    }
    token = self.token_encoder.encode_token(
        payload, "reset", max_age=self._config.JWT_RESET_TOKEN_MAX_AGE
    )
    # TODO: add event
    return token

reset_password async

reset_password(token, new_password)
Source code in fastauth/manager.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
async def reset_password(self, token: str, new_password: str):
    try:
        data = self.token_encoder.decode_token(
            token, self._config.JWT_DEFAULT_RESET_AUDIENCE
        )
    except PyJWTError as e:
        raise exceptions.InvalidToken("reset") from e

    try:
        user_id = data["sub"]
        password_fingerprint = data["password_fgpt"]
    except KeyError as e:
        raise exceptions.InvalidToken("reset") from e

    user = await self.get_user(user_id, True, False)

    valid_password_fingerprint, _ = self.password_helper.verify_and_update(
        user.hashed_password, password_fingerprint
    )
    if not valid_password_fingerprint:
        raise exceptions.InvalidToken("reset")

    updated_user = await self._update_user(user, {"password": new_password})

    # TODO: add event
    return updated_user

get_user async

get_user(uid, is_active=None, is_verified=None)

Get user by uid and check if user is active or verified

Source code in fastauth/manager.py
363
364
365
366
367
368
369
370
371
372
373
374
async def get_user(
    self,
    uid: str,
    is_active: bool | None = None,
    is_verified: bool | None = None,
) -> UP:
    """Get user by uid and check if user is active or verified"""
    user_id: ID = self.parse_id(uid)
    user: UP = await self.user_repo.get_by_id(user_id)
    if user is None:
        raise exceptions.UserNotFound
    return await self._check_user_verification(user, is_active, is_verified)

get_user_by_email async

get_user_by_email(email, is_active=None, is_verified=None)
Source code in fastauth/manager.py
376
377
378
379
380
381
382
async def get_user_by_email(
    self, email: str, is_active: bool | None = None, is_verified: bool | None = None
):
    user: UP = await self.user_repo.get_by_email(email)
    if user is None:
        raise exceptions.UserNotFound
    return await self._check_user_verification(user, is_active, is_verified)

patch_user async

patch_user(user_id, data)
Source code in fastauth/manager.py
403
404
405
406
async def patch_user(self, user_id: ID, data: type[UU_S]):
    instance = await self.get_user(user_id)
    valid_data = data.model_dump()
    return self._update_user(instance, valid_data)

delete_user async

delete_user(id)
Source code in fastauth/manager.py
427
428
429
async def delete_user(self, id: ID):
    instance = await self.get_user(id)
    await self.user_repo.delete(instance)

assign_role_to_user async

assign_role_to_user(role_id, user_id)
Source code in fastauth/manager.py
431
432
433
434
async def assign_role_to_user(self, role_id: int, user_id: str):
    role: RP = await self.get_role(role_id)
    user: URPP = await self.get_user(user_id)
    return await self._update_user(user, {"role_id": role.id})

get_role async

get_role(role_id)
Source code in fastauth/manager.py
437
438
439
440
441
async def get_role(self, role_id: int):
    role = await self.rp_repo.get_role(role_id)
    if role is None:
        raise exceptions.RoleNotFound
    return role

get_role_by_codename async

get_role_by_codename(role_name)
Source code in fastauth/manager.py
443
444
445
446
447
async def get_role_by_codename(self, role_name: str):
    role = await self.rp_repo.get_role_by_codename(role_name)
    if role is None:
        raise exceptions.RoleNotFound
    return role

create_role async

create_role(data)
Source code in fastauth/manager.py
449
450
async def create_role(self, data: type[RC_S]):
    return await self.rp_repo.create_role(data.model_dump())

update_role async

update_role(role_id, data)
Source code in fastauth/manager.py
452
453
454
455
async def update_role(self, role_id: int, data: type[RU_S]):
    role = await self.get_role(role_id)
    valid_data = data.model_dump(exclude_unset=True, exclude_defaults=True)
    return await self.rp_repo.update_role(role, valid_data)

delete_role async

delete_role(role_id)
Source code in fastauth/manager.py
457
458
459
async def delete_role(self, role_id: int):
    role = await self.get_role(role_id)
    return await self.rp_repo.delete_role(role)

list_role async

list_role()
Source code in fastauth/manager.py
461
462
async def list_role(self):
    return await self.rp_repo.list_roles()

get_permission async

get_permission(permission_id)
Source code in fastauth/manager.py
466
467
468
469
async def get_permission(self, permission_id: int):
    perm = await self.rp_repo.get_permission(permission_id)
    if perm is None:
        raise exceptions.PermissionNotFound

get_permission_by_codename async

get_permission_by_codename(permission_name)
Source code in fastauth/manager.py
471
472
473
474
async def get_permission_by_codename(self, permission_name: str):
    perm = await self.rp_repo.get_permission_by_codename(permission_name)
    if perm is None:
        raise exceptions.PermissionNotFound

create_permission async

create_permission(data)
Source code in fastauth/manager.py
476
477
async def create_permission(self, data: type[PC_S]):
    return await self.rp_repo.create_permission(data.model_dump())

update_permission async

update_permission(permission_id, data)
Source code in fastauth/manager.py
479
480
481
482
async def update_permission(self, permission_id: int, data: type[PU_S]):
    perm = await self.get_permission(permission_id)
    valid_data = data.model_dump(exclude_unset=True, exclude_defaults=True)
    return await self.rp_repo.update_permission(perm, valid_data)

delete_permission async

delete_permission(permission_id)
Source code in fastauth/manager.py
484
485
486
async def delete_permission(self, permission_id: int):
    perm = await self.get_permission(permission_id)
    return await self.rp_repo.delete_permission(perm)

list_permission async

list_permission()
Source code in fastauth/manager.py
488
489
async def list_permission(self):
    return await self.rp_repo.list_permissions()

get_user_by_oauth_account async

get_user_by_oauth_account(oauth_name, account_id)
Source code in fastauth/manager.py
493
494
495
496
497
async def get_user_by_oauth_account(self, oauth_name: str, account_id: str) -> UOAP:
    user = await self.oauth_repo.get_user(oauth_name, account_id)
    if user is None:
        raise exceptions.UserNotFound
    return user

oauth_callback async

oauth_callback(oauth_name, access_token, account_id, account_email, expires_at=None, refresh_token=None, request=None, *, associate_by_email=False, is_verified_by_default=False, default_role=None)
Source code in fastauth/manager.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
async def oauth_callback(
    self: "BaseAuthManager[UOAP, ID]",
    oauth_name: str,
    access_token: str,
    account_id: str,
    account_email: str,
    expires_at: int | None = None,
    refresh_token: str | None = None,
    request: Request | None = None,
    *,
    associate_by_email: bool = False,
    is_verified_by_default: bool = False,
    default_role: RP | None = None,
):
    oauth_account_dict = {
        "oauth_name": oauth_name,
        "access_token": access_token,
        "account_id": account_id,
        "account_email": account_email,
        "expires_at": expires_at,
        "refresh_token": refresh_token,
    }

    try:
        user = await self.get_user_by_oauth_account(oauth_name, account_id)
    except Exception:
        try:
            # Associate account
            user = await self.get_user_by_email(account_email)
            if not associate_by_email:
                raise exceptions.UserAlreadyExists
            user = await self.oauth_repo.add_oauth_account(user, oauth_account_dict)
        except exceptions.UserNotFound:
            # Create account
            password = self.password_helper.generate()
            user_dict = {
                "email": account_email,
                "hashed_password": self.password_helper.hash(password),
                "is_verified": is_verified_by_default,
            }

            if default_role:
                user_dict.update({"role_id": default_role.id})

            user = await self.user_repo.create(user_dict)
            user = await self.oauth_repo.add_oauth_account(user, oauth_account_dict)
            # TODO: Add event
    else:
        # Update oauth
        for existing_oauth_account in user.oauth_accounts:
            if (
                existing_oauth_account.account_id == account_id
                and existing_oauth_account.oauth_name == oauth_name
            ):
                user = await self.oauth_repo.update_oauth_account(
                    user, existing_oauth_account, oauth_account_dict
                )

    return user

oauth_login async

oauth_login(user, strategy, security)
Source code in fastauth/manager.py
559
560
561
562
563
564
565
566
567
568
async def oauth_login(self, user: UP, strategy: TokenStrategy[UP, ID], security):
    access_token = await strategy.write_token(user, "access")
    refresh_token = (
        await strategy.write_token(user, "refresh")
        if self._config.ENABLE_REFRESH_TOKEN
        else None
    )
    tokens = TokenResponse(access_token=access_token, refresh_token=refresh_token)
    response = await get_login_response(security=security, tokens=tokens)
    return response

fastauth.manager.AuthManagerDependency module-attribute

AuthManagerDependency = DependencyCallable[Union[BaseAuthManager[UP, ID, None, None, None], BaseAuthManager[UP, ID, RP, PP, None], BaseAuthManager[UP, ID, RP, PP, OAP], BaseAuthManager[UP, ID, None, None, OAP]]]